Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aaron/APPEALS-24678 #19001

Merged
merged 8 commits into from
Jul 14, 2023
31 changes: 31 additions & 0 deletions app/jobs/batch_processes/batch_process_rescue_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# This job will search for and reprocess unfinished Batch Processes nightly.
# Search Criteria is for Batch Processes that are in an unfinished state ('PRE_PROCESSING', 'PROCESSING') &
# have a created_at date/time that is greater than the ERROR_DELAY defined within batch_process.rb
class BatchProcessRescueJob < CaseflowJob
queue_with_priority :low_priority

before_perform do |job|
JOB_ATTR = job
end

def perform
batches = BatchProcess.needs_reprocessing
if batches.any?
batches.each do |batch|
begin
batch.process_batch!
rescue StandardError => error
Rails.logger.error("Error: #{error.inspect}, Job ID: #{JOB_ATTR&.job_id}, Job Time: #{Time.zone.now}")
capture_exception(error: error,
extra: { job_id: JOB_ATTR&.job_id.to_s,
job_time: Time.zone.now.to_s })
next
end
end
else
Rails.logger.info("No Unfinished Batches Could Be Identified. Time: #{Time.zone.now}.")
end
end
end
7 changes: 3 additions & 4 deletions app/models/batch_processes/batch_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ class BatchProcess < CaseflowRecord
BATCH_LIMIT = ENV["BATCH_LIMIT"].to_i

scope :completed_batch_process_ids, -> { where(state: Constants.BATCH_PROCESS.completed).select(:batch_id) }
scope :needs_reprocessing, lambda {
where("created_at <= ? AND state <> ?", BatchProcess::ERROR_DELAY.hours.ago, Constants.BATCH_PROCESS.completed)
}

enum state: {
Constants.BATCH_PROCESS.pre_processing.to_sym => Constants.BATCH_PROCESS.pre_processing,
Expand All @@ -20,7 +23,6 @@ class BatchProcess < CaseflowRecord
}

class << self

# A method for overriding, for the purpose of finding the records that
# need to be batched. This method should return the records found.
def find_records
Expand All @@ -41,7 +43,6 @@ def process_batch!
# no-op, can be overwritten
end


private

# Instance var methods
Expand All @@ -58,7 +59,6 @@ def increment_failed
@failed_count += 1
end


# State update Methods
def batch_processing!
update!(state: Constants.BATCH_PROCESS.processing, started_at: Time.zone.now)
Expand All @@ -71,7 +71,6 @@ def batch_complete!
ended_at: Time.zone.now)
end


# When a record and error is sent to this method, it updates the record and checks to see
# if the record should be declared stuck. If the records should be stuck, it calls the
# declare_record_stuck method (Found in priority_end_product_sync_queue.rb).
Expand Down
2 changes: 2 additions & 0 deletions config/initializers/scheduled_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
require "./app/jobs/batch_processes/batch_process_priority_ep_sync_job.rb"
require "./app/jobs/batch_processes/batch_process_rescue_job.rb"

SCHEDULED_JOBS = {
"amo_metrics_report" => AMOMetricsReportJob,
"annual_metrics" => AnnualMetricsReportJob,
"batch_process_priority_ep_sync" => BatchProcessPriorityEpSyncJob,
"batch_process_rescue_job" => BatchProcessRescueJob,
"calculate_dispatch_stats" => CalculateDispatchStatsJob,
"create_establish_claim" => CreateEstablishClaimTasksJob,
"data_integrity_checks" => DataIntegrityChecksJob,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class AddCreatedAtAndUpdatedAtColumnsToBatchProcesses < Caseflow::Migration
def change
add_column :batch_processes, :created_at, :datetime, null: false, comment: "Date and Time that batch was created."
add_column :batch_processes, :updated_at, :datetime, null: false, comment: "Date and Time that batch was last updated."
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddUpdatedAtColumnToPriorityEndProductSyncQueue < Caseflow::Migration
def change
add_column :priority_end_product_sync_queue, :updated_at, :datetime, null: false, comment: "Date and Time the record was last updated."
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class AddIndexOnLastBatchedAtAndStatusToPriorityEndProductSyncQueue < Caseflow::Migration
def change
add_safe_index :priority_end_product_sync_queue, [:last_batched_at], name: "index_priority_ep_sync_queue_on_last_batched_at", unique: false
add_safe_index :priority_end_product_sync_queue, [:status], name: "index_priority_ep_sync_queue_on_status", unique: false
end
end
7 changes: 6 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2023_06_30_134611) do
ActiveRecord::Schema.define(version: 2023_07_11_153654) do

# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
Expand Down Expand Up @@ -222,12 +222,14 @@

create_table "batch_processes", primary_key: "batch_id", id: :uuid, default: -> { "uuid_generate_v4()" }, comment: "A generalized table for batching and processing records within caseflow", force: :cascade do |t|
t.string "batch_type", null: false, comment: "Indicates what type of record is being batched"
t.datetime "created_at", null: false, comment: "Date and Time that batch was created."
t.datetime "ended_at", comment: "The date/time that the batch finsished processing"
t.integer "records_attempted", default: 0, comment: "The number of records in the batch attempting to be processed"
t.integer "records_completed", default: 0, comment: "The number of records in the batch that completed processing successfully"
t.integer "records_failed", default: 0, comment: "The number of records in the batch that failed processing"
t.datetime "started_at", comment: "The date/time that the batch began processing"
t.string "state", default: "PRE_PROCESSING", null: false, comment: "The state that the batch is currently in. PRE_PROCESSING, PROCESSING, PROCESSED"
t.datetime "updated_at", null: false, comment: "Date and Time that batch was last updated."
t.index ["batch_type"], name: "index_batch_processes_on_batch_type"
t.index ["records_failed"], name: "index_batch_processes_on_records_failed"
t.index ["state"], name: "index_batch_processes_on_state"
Expand Down Expand Up @@ -1370,8 +1372,11 @@
t.string "error_messages", default: [], comment: "Array of Error Message(s) containing Batch ID and specific error if a failure occurs", array: true
t.datetime "last_batched_at", comment: "Date and Time the record was last batched"
t.string "status", default: "NOT_PROCESSED", null: false, comment: "A status to indicate what state the record is in such as PROCESSING and PROCESSED"
t.datetime "updated_at", null: false, comment: "Date and Time the record was last updated."
t.index ["batch_id"], name: "index_priority_end_product_sync_queue_on_batch_id"
t.index ["end_product_establishment_id"], name: "index_priority_end_product_sync_queue_on_epe_id", unique: true
t.index ["last_batched_at"], name: "index_priority_ep_sync_queue_on_last_batched_at"
t.index ["status"], name: "index_priority_ep_sync_queue_on_status"
end

create_table "ramp_closed_appeals", id: :serial, comment: "Keeps track of legacy appeals that are closed or partially closed in VACOLS due to being transitioned to a RAMP election. This data can be used to rollback the RAMP Election if needed.", force: :cascade do |t|
Expand Down
136 changes: 136 additions & 0 deletions spec/jobs/batch_processes/batch_process_rescue_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# frozen_string_literal: true

require "./app/jobs/batch_processes/batch_process_rescue_job.rb"

describe BatchProcessRescueJob, type: :job do
before do
Timecop.freeze(Time.utc(2022, 1, 1, 12, 0, 0))
end

let!(:end_product_establishments_one) do
create_list(:end_product_establishment, 2, :active_hlr_with_cleared_vbms_ext_claim)
end

let!(:pepsq_records_one) do
PopulateEndProductSyncQueueJob.perform_now
end

let!(:first_batch_process) do
BatchProcessPriorityEpSyncJob.perform_now
end

let!(:end_product_establishments_two) do
create_list(:end_product_establishment, 2, :active_hlr_with_cleared_vbms_ext_claim)
end

let!(:pepsq_records_two) do
PopulateEndProductSyncQueueJob.perform_now
end

let!(:second_batch_process) do
BatchProcessPriorityEpSyncJob.perform_now
end

let!(:batch_process_one) do
BatchProcess.first
end

let!(:batch_process_two) do
BatchProcess.second
end

subject { BatchProcessRescueJob.perform_now }

describe "#perform" do
context "when all batch processes are 'COMPLETED'" do
before do
subject
end
it "all batch processes remain unchanged and do NOT reprocess" do
expect(batch_process_one).to eq(batch_process_one.reload)
expect(batch_process_two).to eq(batch_process_two.reload)
end
end

context "when all batch processes are 'COMPLETED' but one has a created_at time more than 12 hours ago" do
before do
batch_process_one.update!(created_at: Time.zone.now - 16.hours)
subject
end
it "all batch processes remain unchanged and do NOT reprocess" do
expect(batch_process_one).to eq(batch_process_one.reload)
expect(batch_process_two).to eq(batch_process_two.reload)
end
end

context "when a batch process has a state of 'PRE_PROCESSING' & a created_at less than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 2.hours)
subject
end
it "the batch process will remain unchanged and will NOT reprocess" do
expect(batch_process_one).to eq(batch_process_one.reload)
end
end

context "when a batch process has a state of 'PRE_PROCESSING' & a created_at more than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours)
subject
end
it "the batch process will reprocess" do
expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.pre_processing)
expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed)
end
end

context "when a batch process has a state of 'PROCESSING' & a created_at less than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 2.hours)
subject
end
it "the batch process will remain unchanged and will NOT reprocess" do
expect(batch_process_one).to eq(batch_process_one.reload)
end
end

context "when a batch process has a state of 'PROCESSING' & a created_at more than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours)
subject
end
it "the batch process will reprocess" do
expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.processing)
expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed)
end
end

context "when two batch processes have a state of 'PRE_PROCESSING' & a created_at more than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours)
batch_process_two.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours)
subject
end
it "both batch processes will reprocess" do
expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.pre_processing)
expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed)
expect(batch_process_two.state).to eq(Constants.BATCH_PROCESS.pre_processing)
expect(batch_process_two.reload.state).to eq(Constants.BATCH_PROCESS.completed)
end
end

context "when two batch processes have a state of 'PROCESSING' & a created_at more than 12 hours ago" do
before do
batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours)
batch_process_two.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours)
subject
end
it "both batch processes will reprocess" do
expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.processing)
expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed)
expect(batch_process_two.state).to eq(Constants.BATCH_PROCESS.processing)
expect(batch_process_two.reload.state).to eq(Constants.BATCH_PROCESS.completed)
end
end
end
end
Loading