From 0d09616742fb2ac4e1640c49fbd955ceb44a4c6b Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:04:12 -0400 Subject: [PATCH 1/8] APPEALS-24678 Added created_at & updated_at columns to Batch Processes table. --- ..._created_at_and_updated_at_columns_to_batch_processes.rb | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 db/migrate/20230711153345_add_created_at_and_updated_at_columns_to_batch_processes.rb diff --git a/db/migrate/20230711153345_add_created_at_and_updated_at_columns_to_batch_processes.rb b/db/migrate/20230711153345_add_created_at_and_updated_at_columns_to_batch_processes.rb new file mode 100644 index 00000000000..64cd52f534c --- /dev/null +++ b/db/migrate/20230711153345_add_created_at_and_updated_at_columns_to_batch_processes.rb @@ -0,0 +1,6 @@ +class AddCreatedAtAndUpdatedAtColumnsToBatchProcesses < Caseflow::Migration + def change + add_column :batch_processes, :created_at, :datetime, null: false, comment: "Date and Time that batch was created." + add_column :batch_processes, :updated_at, :datetime, null: false, comment: "Date and Time that batch was last updated." + end +end From 02b42b856f120fb60f7791d766d74c53e3bb51d5 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:05:01 -0400 Subject: [PATCH 2/8] APPEALS-24678 Added updated_at column to Priority End Product Sync Queue table. --- ...d_updated_at_column_to_priority_end_product_sync_queue.rb | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 db/migrate/20230711153536_add_updated_at_column_to_priority_end_product_sync_queue.rb diff --git a/db/migrate/20230711153536_add_updated_at_column_to_priority_end_product_sync_queue.rb b/db/migrate/20230711153536_add_updated_at_column_to_priority_end_product_sync_queue.rb new file mode 100644 index 00000000000..8adff954fb7 --- /dev/null +++ b/db/migrate/20230711153536_add_updated_at_column_to_priority_end_product_sync_queue.rb @@ -0,0 +1,5 @@ +class AddUpdatedAtColumnToPriorityEndProductSyncQueue < Caseflow::Migration + def change + add_column :priority_end_product_sync_queue, :updated_at, :datetime, null: false, comment: "Date and Time the record was last updated." + end +end From 8829a779eb30a972d6cb686e0b1937c713a9730a Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:06:24 -0400 Subject: [PATCH 3/8] APPEALS-24678 Added indexing on last_batched_at & status columns in Priority End Product Sync Queue table. --- ...ched_at_and_status_to_priority_end_product_sync_queue.rb | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb diff --git a/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb b/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb new file mode 100644 index 00000000000..bfff7830cc2 --- /dev/null +++ b/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb @@ -0,0 +1,6 @@ +class AddIndexOnLastBatchedAtAndStatusToPriorityEndProductSyncQueue < Caseflow::Migration + def change + add_safe_index :priority_end_product_sync_queue, [:last_batched_at], name: "index_priority_ep_sync_queue_on_last_batched_at", unique: false + add_safe_index :priority_end_product_sync_queue, [:status], name: "index_priority_ep_sync_queue_on_last_batched_at", unique: false + end +end From 90633811d74ede9c4c1f7edb50a282d0035d78d7 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:07:35 -0400 Subject: [PATCH 4/8] APPEALS-24678 Added new scope 'needs_processing' to the Batch Process model. --- app/models/batch_processes/batch_process.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/app/models/batch_processes/batch_process.rb b/app/models/batch_processes/batch_process.rb index 99aa4df35b2..34c8fbdb0cf 100644 --- a/app/models/batch_processes/batch_process.rb +++ b/app/models/batch_processes/batch_process.rb @@ -11,6 +11,9 @@ class BatchProcess < CaseflowRecord BATCH_LIMIT = ENV["BATCH_LIMIT"].to_i scope :completed_batch_process_ids, -> { where(state: Constants.BATCH_PROCESS.completed).select(:batch_id) } + scope :needs_reprocessing, lambda { + where("created_at <= ? AND state <> ?", BatchProcess::ERROR_DELAY.hours.ago, Constants.BATCH_PROCESS.completed) + } enum state: { Constants.BATCH_PROCESS.pre_processing.to_sym => Constants.BATCH_PROCESS.pre_processing, @@ -20,7 +23,6 @@ class BatchProcess < CaseflowRecord } class << self - # A method for overriding, for the purpose of finding the records that # need to be batched. This method should return the records found. def find_records @@ -41,7 +43,6 @@ def process_batch! # no-op, can be overwritten end - private # Instance var methods @@ -58,7 +59,6 @@ def increment_failed @failed_count += 1 end - # State update Methods def batch_processing! update!(state: Constants.BATCH_PROCESS.processing, started_at: Time.zone.now) @@ -71,7 +71,6 @@ def batch_complete! ended_at: Time.zone.now) end - # When a record and error is sent to this method, it updates the record and checks to see # if the record should be declared stuck. If the records should be stuck, it calls the # declare_record_stuck method (Found in priority_end_product_sync_queue.rb). From c8b5a4e5f6734405932cbb32c563409e25a71f3f Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:10:35 -0400 Subject: [PATCH 5/8] APPEALS-24678 Created BatchProcessRescueJob. --- .../batch_process_rescue_job.rb | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 app/jobs/batch_processes/batch_process_rescue_job.rb diff --git a/app/jobs/batch_processes/batch_process_rescue_job.rb b/app/jobs/batch_processes/batch_process_rescue_job.rb new file mode 100644 index 00000000000..7db1d63e5a7 --- /dev/null +++ b/app/jobs/batch_processes/batch_process_rescue_job.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +# This job will search for and reprocess unfinished Batch Processes nightly. +# Search Criteria is for Batch Processes that are in an unfinished state ('PRE_PROCESSING', 'PROCESSING') & +# have a created_at date/time that is greater than the ERROR_DELAY defined within batch_process.rb +class BatchProcessRescueJob < CaseflowJob + queue_with_priority :low_priority + + before_perform do |job| + JOB_ATTR = job + end + + def perform + batches = BatchProcess.needs_reprocessing + if batches.any? + batches.each do |batch| + begin + batch.process_batch! + rescue StandardError => error + Rails.logger.error("Error: #{error.inspect}, Job ID: #{JOB_ATTR&.job_id}, Job Time: #{Time.zone.now}") + capture_exception(error: error, + extra: { job_id: JOB_ATTR&.job_id.to_s, + job_time: Time.zone.now.to_s }) + next + end + end + else + Rails.logger.info("No Unfinished Batches Could Be Identified. Time: #{Time.zone.now}.") + end + end +end From dde2fbd81547c422f0972f0c9954c209eefb0de4 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Thu, 13 Jul 2023 22:11:27 -0400 Subject: [PATCH 6/8] APPEALS-24678 Created RSPEC for BatchProcessRescueJob. --- .../batch_process_rescue_job_spec.rb | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 spec/jobs/batch_processes/batch_process_rescue_job_spec.rb diff --git a/spec/jobs/batch_processes/batch_process_rescue_job_spec.rb b/spec/jobs/batch_processes/batch_process_rescue_job_spec.rb new file mode 100644 index 00000000000..9d796377b1d --- /dev/null +++ b/spec/jobs/batch_processes/batch_process_rescue_job_spec.rb @@ -0,0 +1,136 @@ +# frozen_string_literal: true + +require "./app/jobs/batch_processes/batch_process_rescue_job.rb" + +describe BatchProcessRescueJob, type: :job do + before do + Timecop.freeze(Time.utc(2022, 1, 1, 12, 0, 0)) + end + + let!(:end_product_establishments_one) do + create_list(:end_product_establishment, 2, :active_hlr_with_cleared_vbms_ext_claim) + end + + let!(:pepsq_records_one) do + PopulateEndProductSyncQueueJob.perform_now + end + + let!(:first_batch_process) do + BatchProcessPriorityEpSyncJob.perform_now + end + + let!(:end_product_establishments_two) do + create_list(:end_product_establishment, 2, :active_hlr_with_cleared_vbms_ext_claim) + end + + let!(:pepsq_records_two) do + PopulateEndProductSyncQueueJob.perform_now + end + + let!(:second_batch_process) do + BatchProcessPriorityEpSyncJob.perform_now + end + + let!(:batch_process_one) do + BatchProcess.first + end + + let!(:batch_process_two) do + BatchProcess.second + end + + subject { BatchProcessRescueJob.perform_now } + + describe "#perform" do + context "when all batch processes are 'COMPLETED'" do + before do + subject + end + it "all batch processes remain unchanged and do NOT reprocess" do + expect(batch_process_one).to eq(batch_process_one.reload) + expect(batch_process_two).to eq(batch_process_two.reload) + end + end + + context "when all batch processes are 'COMPLETED' but one has a created_at time more than 12 hours ago" do + before do + batch_process_one.update!(created_at: Time.zone.now - 16.hours) + subject + end + it "all batch processes remain unchanged and do NOT reprocess" do + expect(batch_process_one).to eq(batch_process_one.reload) + expect(batch_process_two).to eq(batch_process_two.reload) + end + end + + context "when a batch process has a state of 'PRE_PROCESSING' & a created_at less than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 2.hours) + subject + end + it "the batch process will remain unchanged and will NOT reprocess" do + expect(batch_process_one).to eq(batch_process_one.reload) + end + end + + context "when a batch process has a state of 'PRE_PROCESSING' & a created_at more than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours) + subject + end + it "the batch process will reprocess" do + expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.pre_processing) + expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed) + end + end + + context "when a batch process has a state of 'PROCESSING' & a created_at less than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 2.hours) + subject + end + it "the batch process will remain unchanged and will NOT reprocess" do + expect(batch_process_one).to eq(batch_process_one.reload) + end + end + + context "when a batch process has a state of 'PROCESSING' & a created_at more than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours) + subject + end + it "the batch process will reprocess" do + expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.processing) + expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed) + end + end + + context "when two batch processes have a state of 'PRE_PROCESSING' & a created_at more than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours) + batch_process_two.update!(state: Constants.BATCH_PROCESS.pre_processing, created_at: Time.zone.now - 16.hours) + subject + end + it "both batch processes will reprocess" do + expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.pre_processing) + expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed) + expect(batch_process_two.state).to eq(Constants.BATCH_PROCESS.pre_processing) + expect(batch_process_two.reload.state).to eq(Constants.BATCH_PROCESS.completed) + end + end + + context "when two batch processes have a state of 'PROCESSING' & a created_at more than 12 hours ago" do + before do + batch_process_one.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours) + batch_process_two.update!(state: Constants.BATCH_PROCESS.processing, created_at: Time.zone.now - 16.hours) + subject + end + it "both batch processes will reprocess" do + expect(batch_process_one.state).to eq(Constants.BATCH_PROCESS.processing) + expect(batch_process_one.reload.state).to eq(Constants.BATCH_PROCESS.completed) + expect(batch_process_two.state).to eq(Constants.BATCH_PROCESS.processing) + expect(batch_process_two.reload.state).to eq(Constants.BATCH_PROCESS.completed) + end + end + end +end From 006187d0bee9f2dbc4a644251d1a9b592cf143e2 Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Fri, 14 Jul 2023 10:02:31 -0400 Subject: [PATCH 7/8] APPEALS-24678 Added indexing to Priority End Product Sync Queue table. Updated Schema. --- ...hed_at_and_status_to_priority_end_product_sync_queue.rb | 2 +- db/schema.rb | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb b/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb index bfff7830cc2..3a9bbe7a12f 100644 --- a/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb +++ b/db/migrate/20230711153654_add_index_on_last_batched_at_and_status_to_priority_end_product_sync_queue.rb @@ -1,6 +1,6 @@ class AddIndexOnLastBatchedAtAndStatusToPriorityEndProductSyncQueue < Caseflow::Migration def change add_safe_index :priority_end_product_sync_queue, [:last_batched_at], name: "index_priority_ep_sync_queue_on_last_batched_at", unique: false - add_safe_index :priority_end_product_sync_queue, [:status], name: "index_priority_ep_sync_queue_on_last_batched_at", unique: false + add_safe_index :priority_end_product_sync_queue, [:status], name: "index_priority_ep_sync_queue_on_status", unique: false end end diff --git a/db/schema.rb b/db/schema.rb index 1e32e47bbb1..4188f7251a6 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_06_30_134611) do +ActiveRecord::Schema.define(version: 2023_07_11_153654) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -222,12 +222,14 @@ create_table "batch_processes", primary_key: "batch_id", id: :uuid, default: -> { "uuid_generate_v4()" }, comment: "A generalized table for batching and processing records within caseflow", force: :cascade do |t| t.string "batch_type", null: false, comment: "Indicates what type of record is being batched" + t.datetime "created_at", null: false, comment: "Date and Time that batch was created." t.datetime "ended_at", comment: "The date/time that the batch finsished processing" t.integer "records_attempted", default: 0, comment: "The number of records in the batch attempting to be processed" t.integer "records_completed", default: 0, comment: "The number of records in the batch that completed processing successfully" t.integer "records_failed", default: 0, comment: "The number of records in the batch that failed processing" t.datetime "started_at", comment: "The date/time that the batch began processing" t.string "state", default: "PRE_PROCESSING", null: false, comment: "The state that the batch is currently in. PRE_PROCESSING, PROCESSING, PROCESSED" + t.datetime "updated_at", null: false, comment: "Date and Time that batch was last updated." t.index ["batch_type"], name: "index_batch_processes_on_batch_type" t.index ["records_failed"], name: "index_batch_processes_on_records_failed" t.index ["state"], name: "index_batch_processes_on_state" @@ -1370,8 +1372,11 @@ t.string "error_messages", default: [], comment: "Array of Error Message(s) containing Batch ID and specific error if a failure occurs", array: true t.datetime "last_batched_at", comment: "Date and Time the record was last batched" t.string "status", default: "NOT_PROCESSED", null: false, comment: "A status to indicate what state the record is in such as PROCESSING and PROCESSED" + t.datetime "updated_at", null: false, comment: "Date and Time the record was last updated." t.index ["batch_id"], name: "index_priority_end_product_sync_queue_on_batch_id" t.index ["end_product_establishment_id"], name: "index_priority_end_product_sync_queue_on_epe_id", unique: true + t.index ["last_batched_at"], name: "index_priority_ep_sync_queue_on_last_batched_at" + t.index ["status"], name: "index_priority_ep_sync_queue_on_status" end create_table "ramp_closed_appeals", id: :serial, comment: "Keeps track of legacy appeals that are closed or partially closed in VACOLS due to being transitioned to a RAMP election. This data can be used to rollback the RAMP Election if needed.", force: :cascade do |t| From 5c03d39f9bd56c1e26883b379e515497ff8e935f Mon Sep 17 00:00:00 2001 From: Jeffrey Aaron Willis Date: Fri, 14 Jul 2023 12:10:34 -0400 Subject: [PATCH 8/8] APPEALS-24678 Added job to whitelist. --- config/initializers/scheduled_jobs.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/initializers/scheduled_jobs.rb b/config/initializers/scheduled_jobs.rb index dcc071564d5..216a68504f4 100644 --- a/config/initializers/scheduled_jobs.rb +++ b/config/initializers/scheduled_jobs.rb @@ -1,9 +1,11 @@ require "./app/jobs/batch_processes/batch_process_priority_ep_sync_job.rb" +require "./app/jobs/batch_processes/batch_process_rescue_job.rb" SCHEDULED_JOBS = { "amo_metrics_report" => AMOMetricsReportJob, "annual_metrics" => AnnualMetricsReportJob, "batch_process_priority_ep_sync" => BatchProcessPriorityEpSyncJob, + "batch_process_rescue_job" => BatchProcessRescueJob, "calculate_dispatch_stats" => CalculateDispatchStatsJob, "create_establish_claim" => CreateEstablishClaimTasksJob, "data_integrity_checks" => DataIntegrityChecksJob,