Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alec k/appeals 22705 #18874

Merged
merged 50 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
81a5d16
APPEALS-22705 batch_priority_end_product_sync
AKeyframe Jun 20, 2023
e2ee1d2
APPEALS-22705 batch_priority... and process_priortiy... methods rough…
AKeyframe Jun 21, 2023
e3a05d9
APPEALS-22705 job creation
AKeyframe Jun 21, 2023
6f783c9
APPEALS-22705 reworked batching query for error delay, job creation, …
AKeyframe Jun 21, 2023
2c18bbd
APPEALS-22705 rough rspec tests
AKeyframe Jun 21, 2023
817e8bf
APPEALS-22705 reworked batch PEPSQ call, rspec tests
AKeyframe Jun 22, 2023
709ebb9
Merge branch 'feature/APPEALS-21351' of https://github.com/department…
AKeyframe Jun 22, 2023
8c5359f
APPEALS-22705 batch_ep_sync! error fixes, process_ep_sync! testing
AKeyframe Jun 22, 2023
2199a4a
APPEALS-22705 commiting to save progress to pull in changes from feature
AKeyframe Jun 26, 2023
e640dc7
APPEALS-22705 ENV, code cleanup and comments, error handling debugging
AKeyframe Jun 27, 2023
2529195
APPEALS-22705 error handling fixes, cleaning up code
AKeyframe Jun 27, 2023
508eddd
Merge branch 'feature/APPEALS-21351' of https://github.com/department…
AKeyframe Jun 27, 2023
8524fa7
APPEALS-22705 Constants added, small refactor, reworked PEPSQ queuery…
AKeyframe Jun 28, 2023
da6b731
Merge branch 'feature/APPEALS-21351' of https://github.com/department…
AKeyframe Jun 28, 2023
64b74d2
APPEALS-22705 removed manual uuid generation
AKeyframe Jun 28, 2023
ec3b23a
APPEALS-22705 Refactor of Batch Process and BatchProcessPriorityEPSyn…
Aaron-Willis Jun 29, 2023
727c8bf
APPEALS-22705 Removed byebugs.
Aaron-Willis Jun 29, 2023
b76d80a
APPEALS-22705 - Fixed error_out_record method in batch_process
Aaron-Willis Jun 29, 2023
0442f51
APPEALS-22705 Added Child class BatchProcessPriorityEpSync that inher…
Aaron-Willis Jun 29, 2023
8a81a99
APPEALS-22705 Decoupled Batch Creation method from batch build method.
Aaron-Willis Jun 29, 2023
ceca7ca
APPEALS-22705 added return statement with batched_records in perform …
Aaron-Willis Jun 29, 2023
ebbc926
APPEALS-22705 Removed 'require-relative' statement from batch_process…
Aaron-Willis Jun 29, 2023
fb44fae
Merge branch 'AlecK/APPEALS-22705' of https://github.com/department-o…
AKeyframe Jun 27, 2023
645d055
APPEALS-22705 scheduled_jobs fix
AKeyframe Jun 29, 2023
9f81567
Possible rspec fix for APPEALS-22704s PENDING not a valid status error
AKeyframe Jun 29, 2023
8f3a6b6
linter warning fixes
AKeyframe Jun 29, 2023
28ea19f
APPEALS-22705 find_records query adjusted, fixed 22704 spec error
AKeyframe Jun 29, 2023
4eaa120
APPEALS-22705 find_records fix, create_batch rspec
AKeyframe Jun 30, 2023
e4698cf
Merge branch 'feature/APPEALS-21351' of https://github.com/department…
AKeyframe Jun 30, 2023
c9e72ce
added rspec and factories
elilogbro Jul 3, 2023
b3123b8
Merge branch 'AlecK/APPEALS-22705' of https://github.com/department-o…
AKeyframe Jun 28, 2023
6cc8100
cleaned up files, completed pepsq rspec
elilogbro Jul 5, 2023
d6ef782
APPEALS-22705 added PEPSQ and BatchProcess jobs to whitelist
AKeyframe Jun 29, 2023
0b75af1
refactored PEPSQ and EPE factories to not auto create VbmsExtClaim ob…
elilogbro Jul 5, 2023
42fef15
Merge branch 'AlecK/APPEALS-22705' of https://github.com/department-o…
elilogbro Jul 5, 2023
88cc72e
added comments and refactored spec files
elilogbro Jul 5, 2023
817d714
added more verbose comments
elilogbro Jul 5, 2023
b4c82ad
APPEALS-22705 Refactored find_records method to use scoped queries. …
Aaron-Willis Jul 6, 2023
1af2585
APPEALS-22705 Refactored find_records query using scopes.
Aaron-Willis Jul 6, 2023
d0e9392
APPEALS-22705 cleanup and comments
AKeyframe Jun 29, 2023
bac2d39
APPEALS-22705 Updated RSPEC for find_records, create_batch!, and proc…
Aaron-Willis Jul 7, 2023
4d70f4f
APPEALS-22705 find_records spec working
AKeyframe Jun 30, 2023
99a2d82
APPEALS-22705 batch_processes and priority_end_product_sync_queu rspe…
AKeyframe Jul 12, 2023
3e6df3d
APPEALS-22705 Updated RSPEC. Updated End Product Establishment Facto…
Aaron-Willis Jul 12, 2023
78d631e
APPEALS-22705 Updated RSPEC. Updated End Product Establishment Facto…
Aaron-Willis Jul 12, 2023
8e5c4eb
APPEALS-22705 Updated Several RSPEC Files.
Aaron-Willis Jul 13, 2023
9b56fbc
added two additional EPE factory traits for vbms seeding
elilogbro Jul 13, 2023
8a810d3
APPEALS-22705 rspec batch_limit fix
AKeyframe Jul 13, 2023
aaf65e3
Merge branch 'feature/APPEALS-21351' into AlecK/APPEALS-22705
jtsangVA Jul 13, 2023
62d2177
APPEALS-22705 Fixed Caseflow Stuck Records assocation. Refactored RS…
Aaron-Willis Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions app/jobs/batch_processes/batch_process_priority_ep_sync_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

class BatchProcessPriorityEpSyncJob < CaseflowJob
queue_with_priority :low_priority

before_perform do |job|
JOB_ATTR = job
end

def perform
begin
batch = ActiveRecord::Base.transaction do
# The find records method NEEDS to remain within the transation block.
# to ensure that the table lock is not released until records have been batched.
records_to_batch = BatchProcessPriorityEpSync.find_records
next if records_to_batch.empty?

BatchProcessPriorityEpSync.create_batch!(records_to_batch)
end

if batch
batch.process_batch!
else
Rails.logger.info("No Records Available to Batch. Time: #{Time.zone.now}")
end
rescue StandardError => error
Rails.logger.error("Error: #{error.inspect}, Job ID: #{JOB_ATTR&.job_id}, Job Time: #{Time.zone.now}")
capture_exception(error: error,
extra: { job_id: JOB_ATTR&.job_id.to_s,
job_time: Time.zone.now.to_s })
end
end
end
4 changes: 2 additions & 2 deletions app/jobs/populate_end_product_sync_queue_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def find_priority_end_product_establishments_to_sync
limit #{ENV['END_PRODUCT_QUEUE_BATCH_LIMIT']};
SQL

ActiveRecord::Base.connection.exec_query(get_batch).rows.flatten
ActiveRecord::Base.connection.exec_query(ActiveRecord::Base.sanitize_sql(get_batch)).rows.flatten
end

def insert_into_priority_sync_queue(batch)
batch.each do |ep_id|
PriorityEndProductSyncQueue.create(
PriorityEndProductSyncQueue.create!(
end_product_establishment_id: ep_id
)
end
Expand Down
53 changes: 0 additions & 53 deletions app/models/batch_process.rb

This file was deleted.

96 changes: 96 additions & 0 deletions app/models/batch_processes/batch_process.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

class BatchProcess < CaseflowRecord
self.inheritance_column = :batch_type
has_many :priority_end_product_sync_queue, foreign_key: "batch_id", primary_key: "batch_id"
has_many :end_product_establishments, through: :priority_end_product_sync_queue
after_initialize :init_counters

ERROR_LIMIT = ENV["MAX_ERRORS_BEFORE_STUCK"].to_i
ERROR_DELAY = ENV["ERROR_DELAY"].to_i
BATCH_LIMIT = ENV["BATCH_LIMIT"].to_i

scope :completed_batch_process_ids, -> { where(state: Constants.BATCH_PROCESS.completed).select(:batch_id) }

enum state: {
Constants.BATCH_PROCESS.pre_processing.to_sym => Constants.BATCH_PROCESS.pre_processing,
Constants.BATCH_PROCESS.processing.to_sym => Constants.BATCH_PROCESS.processing,
Constants.BATCH_PROCESS.completed.to_sym => Constants.BATCH_PROCESS.completed

}

class << self

# A method for overriding, for the purpose of finding the records that
# need to be batched. This method should return the records found.
def find_records
# no-op, can be overwritten
end

# A method for orverriding, for the purpose of creating the batch and
# associating the batch_id with the records gathered by the find_records method.
def create_batch!(record)
# no-op, can be overwritten
end
end

# A method for overriding, for the purpose of processing the batch created
# in the create_batch method. Processing can be anything, an example of which
# is syncing up the records within the batch between caseflow and vbms.
def process_batch!
# no-op, can be overwritten
end


private

# Instance var methods
def init_counters
@completed_count = 0
@failed_count = 0
end

def increment_completed
@completed_count += 1
end

def increment_failed
@failed_count += 1
end


# State update Methods
def batch_processing!
update!(state: Constants.BATCH_PROCESS.processing, started_at: Time.zone.now)
end

def batch_complete!
update!(state: Constants.BATCH_PROCESS.completed,
records_failed: @failed_count,
records_completed: @completed_count,
ended_at: Time.zone.now)
end


# When a record and error is sent to this method, it updates the record and checks to see
# if the record should be declared stuck. If the records should be stuck, it calls the
# declare_record_stuck method (Found in priority_end_product_sync_queue.rb).
# Otherwise, the record is updated with status: error and the error message is added to
# error_messages.
#
# As a general method, it's assumed the record has a batch_id and error_messages
# column within the associated table.
def error_out_record!(record, error)
increment_failed
error_array = record.error_messages || []
error_array.push("Error: #{error.inspect} - Batch ID: #{record.batch_id} - Time: #{Time.zone.now}.")

if error_array.length >= ERROR_LIMIT
record.declare_record_stuck!
else
record.status_error!(error_array)
end

Rails.logger.error(error.inspect)
end
end
63 changes: 63 additions & 0 deletions app/models/batch_processes/batch_process_priority_ep_sync.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

class BatchProcessPriorityEpSync < BatchProcess
class << self
# Finds the records within the PEPSQ table that need to be batched and returns
# a total number of records equal to the BATCH_LIMIT constant
def find_records
PriorityEndProductSyncQueue.completed_or_unbatched.not_synced_or_stuck.batchable.batch_limit.lock
end

# This method takes the records from find_records as an agrument.
# Creates a new batch record within the batch_processes table. Then the batch
# information is assigned to each record. Returns the newly created batch record.
def create_batch!(records)
new_batch = BatchProcessPriorityEpSync.create!(batch_type: name,
state: Constants.BATCH_PROCESS.pre_processing,
records_attempted: records.count)

new_batch.assign_batch_to_queued_records!(records)
new_batch
end
end

# Updates the batches status to processing then loops through each record within
# the batch. Each records status is updated to processing, then the sync! method is
# attempted. If the record fails, the error_out_record! method is called.
def process_batch!
batch_processing!

priority_end_product_sync_queue.each do |record|
record.status_processing!
epe = record.end_product_establishment

begin
epe.sync!
epe.reload

if epe.vbms_ext_claim.nil?
fail Caseflow::Error::PriorityEndProductSyncError, "Claim Not In VBMS_EXT_CLAIM."
elsif epe.synced_status != epe.vbms_ext_claim&.level_status_code
fail Caseflow::Error::PriorityEndProductSyncError, "EPE synced_status does not match VBMS."
end
rescue StandardError => error
error_out_record!(record, error)
next
end

record.status_sync!
increment_completed
end

batch_complete!
end

# Assigns the batch_id (line 20) to every record that needs to be associated with the batch
def assign_batch_to_queued_records!(records)
records.each do |pepsq_record|
pepsq_record.update!(batch_id: batch_id,
AKeyframe marked this conversation as resolved.
Show resolved Hide resolved
status: Constants.PRIORITY_EP_SYNC.pre_processing,
last_batched_at: Time.zone.now)
end
end
end
3 changes: 1 addition & 2 deletions app/models/end_product_establishment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class EndProductEstablishment < CaseflowRecord
# # It is NOT recommended to go below 0.01. (default: 0.1)
# :expire => 10 # Specify in seconds when the lock should be considered stale when something went wrong
# # with the one who held the lock and failed to unlock. (default: 10)
auto_mutex :sync!, block: 60, expire: 100, after_failure: lambda { Rails.logger.error('failed to acquire lock! EPE sync is being called by another process. Please try again later.') }
# auto_mutex :sync!, block: 60, expire: 100, after_failure: lambda { Rails.logger.error('failed to acquire lock! EPE sync is being called by another process. Please try again later.') }

# allow @veteran to be assigned to save upstream calls
attr_writer :veteran
Expand Down Expand Up @@ -211,7 +211,6 @@ def cancel_unused_end_product!
end

def sync!
sleep(1)
# There is no need to sync end_product_status if the status
# is already inactive since an EP can never leave that state
return true unless status_active?
Expand Down
43 changes: 42 additions & 1 deletion app/models/priority_queues/priority_end_product_sync_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,46 @@ class PriorityEndProductSyncQueue < CaseflowRecord
self.table_name = "priority_end_product_sync_queue"

belongs_to :end_product_establishment
belongs_to :batch_process, foreign_key: "batch_id"
belongs_to :batch_process, foreign_key: "batch_id", primary_key: "batch_id"
has_many :caseflow_stuck_records, as: :stuck_record

scope :completed_or_unbatched, -> { where(batch_id: [nil, BatchProcess.completed_batch_process_ids]) }
scope :batchable, -> { where("last_batched_at IS NULL OR last_batched_at <= ?", BatchProcess::ERROR_DELAY.hours.ago) }
scope :batch_limit, -> { limit(BatchProcess::BATCH_LIMIT) }
scope :not_synced_or_stuck, lambda {
where.not(status: [Constants.PRIORITY_EP_SYNC.synced, Constants.PRIORITY_EP_SYNC.stuck])
}

enum status: {
Constants.PRIORITY_EP_SYNC.not_processed.to_sym => Constants.PRIORITY_EP_SYNC.not_processed,
Constants.PRIORITY_EP_SYNC.pre_processing.to_sym => Constants.PRIORITY_EP_SYNC.pre_processing,
Constants.PRIORITY_EP_SYNC.processing.to_sym => Constants.PRIORITY_EP_SYNC.processing,
Constants.PRIORITY_EP_SYNC.synced.to_sym => Constants.PRIORITY_EP_SYNC.synced,
Constants.PRIORITY_EP_SYNC.error.to_sym => Constants.PRIORITY_EP_SYNC.error,
Constants.PRIORITY_EP_SYNC.stuck.to_sym => Constants.PRIORITY_EP_SYNC.stuck
}

# Status Update methods
def status_processing!
update!(status: Constants.PRIORITY_EP_SYNC.processing)
end

def status_sync!
update!(status: Constants.PRIORITY_EP_SYNC.synced)
end

def status_error!(errors)
update!(status: Constants.PRIORITY_EP_SYNC.error,
error_messages: errors)
end

# Method will update the status of the record to STUCK
# While also create a record within the caseflow_stuck_records table
# for later manual review.
def declare_record_stuck!
update!(status: Constants.PRIORITY_EP_SYNC.stuck)
CaseflowStuckRecord.create!(stuck_record: self,
error_messages: error_messages,
determined_stuck_at: Time.zone.now)
end
end
5 changes: 5 additions & 0 deletions client/constants/BATCH_PROCESS.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"pre_processing": "PRE_PROCESSING",
"processing": "PROCESSING",
"completed": "COMPLETED"
}
8 changes: 8 additions & 0 deletions client/constants/PRIORITY_EP_SYNC.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"not_processed": "NOT_PROCESSED",
"pre_processing": "PRE_PROCESSING",
"processing": "PROCESSING",
"synced": "SYNCED",
"error": "ERROR",
"stuck": "STUCK"
}
6 changes: 6 additions & 0 deletions config/environments/development.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
ENV["AWS_ACCESS_KEY_ID"] ||= "dummykeyid"
ENV["AWS_SECRET_ACCESS_KEY"] ||= "dummysecretkey"

# BatchProcess ENVs
# priority_end_product_sync
ENV["BATCH_LIMIT"] ||= "100" # Max number of records in a batch
ENV["ERROR_DELAY"] ||= "12" # In number of hours
ENV["MAX_ERRORS_BEFORE_STUCK"] ||= "3" # When record errors for X time, it's declared stuck

# Necessary vars needed to create virtual hearing links
# Used by VirtualHearings::LinkService
ENV["VIRTUAL_HEARING_PIN_KEY"] ||= "mysecretkey"
Expand Down
7 changes: 7 additions & 0 deletions config/environments/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@
ENV["AWS_ACCESS_KEY_ID"] ||= "dummykeyid"
ENV["AWS_SECRET_ACCESS_KEY"] ||= "dummysecretkey"

# BatchProcess ENVs
# priority_end_product_sync
ENV["BATCH_LIMIT"] ||= "100" # Max number of records in a batch
ENV["ERROR_DELAY"] ||= "12" # In number of hours
ENV["MAX_ERRORS_BEFORE_STUCK"] ||= "3" # When record errors for X time, it's declared stuck


config.active_job.queue_adapter = :test

# Disable SqlTracker from creating tmp/sql_tracker-*.json files -- https://github.com/steventen/sql_tracker/pull/10
Expand Down
4 changes: 4 additions & 0 deletions config/initializers/scheduled_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require "./app/jobs/batch_processes/batch_process_priority_ep_sync_job.rb"

SCHEDULED_JOBS = {
"amo_metrics_report" => AMOMetricsReportJob,
"annual_metrics" => AnnualMetricsReportJob,
"batch_process_priority_ep_sync" => BatchProcessPriorityEpSyncJob,
"calculate_dispatch_stats" => CalculateDispatchStatsJob,
"create_establish_claim" => CreateEstablishClaimTasksJob,
"data_integrity_checks" => DataIntegrityChecksJob,
Expand All @@ -18,6 +21,7 @@
"monthly_metrics" => MonthlyMetricsReportJob,
"nightly_syncs" => NightlySyncsJob,
"out_of_service_reminder" => OutOfServiceReminderJob,
"populate_end_product_sync_queue" => PopulateEndProductSyncQueueJob,
"prepare_establish_claim" => PrepareEstablishClaimTasksJob,
"push_priority_appeals_to_judges" => PushPriorityAppealsToJudgesJob,
"quarterly_metrics" => QuarterlyMetricsReportJob,
Expand Down

This file was deleted.

Loading
Loading