From a3d8beb8fff09298c23f82e35c5f498a1abd1424 Mon Sep 17 00:00:00 2001 From: David Cristofaro Date: Tue, 5 Mar 2024 11:38:22 +1100 Subject: [PATCH 1/3] Fix bundler in CI [Bundler 2.5 dropped support for Ruby 2.7](https://github.com/rubygems/rubygems/releases/tag/bundler-v2.5.0). Since we are testing with Ruby 2.7, we should pin the version of Bundler to 2.4.x since this is the last version that's compatible with 2.7. --- .github/workflows/tests.yml | 2 +- Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 41c814ef..77965ddc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -45,7 +45,7 @@ jobs: run: | sudo apt-get -yqq install libpq-dev postgresql-client createdb que-test - gem install bundler + gem install bundler --version '~> 2.4.22' bundle install --jobs 4 --retry 3 USE_RAILS=true bundle exec rake test bundle exec rake test diff --git a/Dockerfile b/Dockerfile index cc85d0da..f10a61f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ RUN apt-get update \ && apt-get install -y libpq-dev \ && rm -rf /var/lib/apt/lists/* -ENV RUBY_BUNDLER_VERSION 2.3.7 +ENV RUBY_BUNDLER_VERSION 2.4.22 RUN gem install bundler -v $RUBY_BUNDLER_VERSION ENV BUNDLE_PATH /usr/local/bundle From ae9508c029d69e99346817f009c683a9bab6109b Mon Sep 17 00:00:00 2001 From: David Cristofaro Date: Mon, 4 Mar 2024 13:11:51 +1100 Subject: [PATCH 2/3] Refactor bulk enqueue spec Prepare spec for bulk enqueue refactor to support bulk enqueue of jobs with differing job class and job options. --- spec/que/job.bulk_enqueue_spec.rb | 124 +++++++++++++++--------------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/spec/que/job.bulk_enqueue_spec.rb b/spec/que/job.bulk_enqueue_spec.rb index 4a1b85b8..51a4982b 100644 --- a/spec/que/job.bulk_enqueue_spec.rb +++ b/spec/que/job.bulk_enqueue_spec.rb @@ -4,14 +4,14 @@ describe Que::Job, '.bulk_enqueue' do def assert_enqueue( - expected_queue: 'default', - expected_priority: 100, - expected_run_at: Time.now, - expected_job_class: Que::Job, - expected_result_class: nil, + expected_queues: ['default'] * 10, + expected_priorities: [100] * 10, + expected_run_ats: [Time.now] * 10, + expected_job_classes: [Que::Job] * 10, + expected_result_classes: [nil] * 10, expected_args:, expected_kwargs:, - expected_tags: nil, + expected_tags: [nil] * 10, expected_count:, &enqueue_block ) @@ -24,24 +24,24 @@ def assert_enqueue( results.each_with_index do |result, i| assert_kind_of Que::Job, result - assert_instance_of (expected_result_class || expected_job_class), result + assert_instance_of (expected_result_classes[i] || expected_job_classes[i]), result - assert_equal expected_priority, result.que_attrs[:priority] + assert_equal expected_priorities[i], result.que_attrs[:priority] assert_equal expected_args[i], result.que_attrs[:args] assert_equal expected_kwargs[i], result.que_attrs[:kwargs] - if expected_tags.nil? + if expected_tags[i].nil? assert_equal({}, result.que_attrs[:data]) else - assert_equal expected_tags, result.que_attrs[:data][:tags] + assert_equal expected_tags[i], result.que_attrs[:data][:tags] end end jobs_dataset.order(:id).each_with_index do |job, i| - assert_equal expected_queue, job[:queue] - assert_equal expected_priority, job[:priority] - assert_in_delta job[:run_at], expected_run_at, QueSpec::TIME_SKEW - assert_equal expected_job_class.to_s, job[:job_class] + assert_equal expected_queues[i], job[:queue] + assert_equal expected_priorities[i], job[:priority] + assert_in_delta job[:run_at], expected_run_ats[i], QueSpec::TIME_SKEW + assert_equal expected_job_classes[i].to_s, job[:job_class] assert_equal expected_args[i], job[:args] assert_equal expected_kwargs[i], job[:kwargs] end @@ -185,7 +185,7 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: '3' }, { five: '6' }], - expected_job_class: NamespacedJobNamespace::NamespacedJob, + expected_job_classes: [NamespacedJobNamespace::NamespacedJob, NamespacedJobNamespace::NamespacedJob], ) do Que.bulk_enqueue do NamespacedJobNamespace::NamespacedJob.enqueue(1, two: '3') @@ -207,7 +207,7 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: '3' }, { five: 'six' }], - expected_queue: 'special_queue_name', + expected_queues: ['special_queue_name', 'special_queue_name'], ) do Que.bulk_enqueue(job_options: { queue: 'special_queue_name' }) do Que.enqueue(1, two: '3') @@ -222,7 +222,7 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [2]], expected_kwargs: [{}, {}], - expected_run_at: Time.now + 60, + expected_run_ats: [Time.now + 60, Time.now + 60], ) do Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do Que.enqueue(1) @@ -236,7 +236,7 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [2]], expected_kwargs: [{}, {}], - expected_priority: 4 + expected_priorities: [4, 4] ) do Que.bulk_enqueue(job_options: { priority: 4 }) do Que.enqueue(1) @@ -250,8 +250,8 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 60, - expected_priority: 4, + expected_run_ats: [Time.now + 60, Time.now + 60], + expected_priorities: [4, 4], ) do Que.bulk_enqueue(job_options: { run_at: Time.now + 60, priority: 4 }) do Que.enqueue(1, two: "3") @@ -268,8 +268,8 @@ def assert_enqueue( { two: "3", run_at: Time.utc(2050).to_s, priority: 10 }, { five: "six" } ], - expected_run_at: Time.now, - expected_priority: 15, + expected_run_ats: [Time.now, Time.now], + expected_priorities: [15, 15], ) do Que.bulk_enqueue(job_options: { priority: 15 }) do Que.enqueue(1, two: "3", run_at: Time.utc(2050), priority: 10) @@ -292,7 +292,7 @@ def assert_enqueue( expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_tags: ["tag_1", "tag_2"], + expected_tags: [["tag_1", "tag_2"], ["tag_1", "tag_2"]], ) do Que.bulk_enqueue(job_options: { tags: ["tag_1", "tag_2"] }) do Que.enqueue(1, two: "3") @@ -309,7 +309,7 @@ def assert_enqueue( { two: "3", tags: ["tag_1", "tag_2"] }, { five: "six" }, ], - expected_tags: nil, + expected_tags: [nil, nil], ) do Que.bulk_enqueue do Que.enqueue(1, two: "3", tags: ["tag_1", "tag_2"]) @@ -354,8 +354,8 @@ class MyJobClass < Que::Job; end expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_job_class: MyJobClass, - expected_result_class: Que::Job, + expected_job_classes: [MyJobClass, MyJobClass], + expected_result_classes: [Que::Job, Que::Job], ) do Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do Que.enqueue(1, two: "3") @@ -392,8 +392,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 3, - expected_job_class: PriorityDefaultJob, + expected_priorities: [3, 3], + expected_job_classes: [PriorityDefaultJob, PriorityDefaultJob], ) do Que.bulk_enqueue do PriorityDefaultJob.enqueue(1, two: "3") @@ -405,8 +405,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 4, - expected_job_class: PriorityDefaultJob, + expected_priorities: [4, 4], + expected_job_classes: [PriorityDefaultJob, PriorityDefaultJob], ) do Que.bulk_enqueue(job_options: { priority: 4 }) do PriorityDefaultJob.enqueue(1, two: "3") @@ -420,8 +420,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 3, - expected_job_class: PrioritySubclassJob + expected_priorities: [3, 3], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] ) do Que.bulk_enqueue do PrioritySubclassJob.enqueue(1, two: "3") @@ -433,8 +433,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 4, - expected_job_class: PrioritySubclassJob + expected_priorities: [4, 4], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] ) do Que.bulk_enqueue(job_options: { priority: 4 }) do PrioritySubclassJob.enqueue(1, two: "3") @@ -451,8 +451,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 60, - expected_job_class: PrioritySubclassJob + expected_priorities: [60, 60], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] ) do Que.bulk_enqueue do PrioritySubclassJob.enqueue(1, two: "3") @@ -464,8 +464,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_priority: 4, - expected_job_class: PrioritySubclassJob + expected_priorities: [4, 4], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] ) do Que.bulk_enqueue(job_options: { priority: 4 }) do PrioritySubclassJob.enqueue(1, two: "3") @@ -484,8 +484,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 30, - expected_job_class: RunAtDefaultJob + expected_run_ats: [Time.now + 30, Time.now + 30], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] ) do Que.bulk_enqueue do RunAtDefaultJob.enqueue(1, two: "3") @@ -497,8 +497,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 60, - expected_job_class: RunAtDefaultJob + expected_run_ats: [Time.now + 60, Time.now + 60], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] ) do Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do RunAtDefaultJob.enqueue(1, two: "3") @@ -512,8 +512,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 30, - expected_job_class: RunAtDefaultJob + expected_run_ats: [Time.now + 30, Time.now + 30], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] ) do Que.bulk_enqueue do RunAtDefaultJob.enqueue(1, two: "3") @@ -525,8 +525,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 60, - expected_job_class: RunAtDefaultJob + expected_run_ats: [Time.now + 60, Time.now + 60], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] ) do Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do RunAtDefaultJob.enqueue(1, two: "3") @@ -543,8 +543,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 90, - expected_job_class: RunAtSubclassJob + expected_run_ats: [Time.now + 90, Time.now + 90], + expected_job_classes: [RunAtSubclassJob, RunAtSubclassJob] ) do Que.bulk_enqueue do RunAtSubclassJob.enqueue(1, two: "3") @@ -556,8 +556,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_run_at: Time.now + 60, - expected_job_class: RunAtSubclassJob + expected_run_ats: [Time.now + 60, Time.now + 60], + expected_job_classes: [RunAtSubclassJob, RunAtSubclassJob] ) do Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do RunAtSubclassJob.enqueue(1, two: "3") @@ -576,8 +576,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_1', - expected_job_class: QueueDefaultJob + expected_queues: ['queue_1', 'queue_1'], + expected_job_classes: [QueueDefaultJob, QueueDefaultJob] ) do Que.bulk_enqueue do QueueDefaultJob.enqueue(1, two: "3") @@ -589,8 +589,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_3', - expected_job_class: QueueDefaultJob + expected_queues: ['queue_3', 'queue_3'], + expected_job_classes: [QueueDefaultJob, QueueDefaultJob] ) do Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do QueueDefaultJob.enqueue(1, two: "3") @@ -604,8 +604,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_1', - expected_job_class: QueueSubclassJob + expected_queues: ['queue_1', 'queue_1'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] ) do Que.bulk_enqueue do QueueSubclassJob.enqueue(1, two: "3") @@ -617,8 +617,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_3', - expected_job_class: QueueSubclassJob + expected_queues: ['queue_3', 'queue_3'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] ) do Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do QueueSubclassJob.enqueue(1, two: "3") @@ -635,8 +635,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_2', - expected_job_class: QueueSubclassJob + expected_queues: ['queue_2', 'queue_2'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] ) do Que.bulk_enqueue do QueueSubclassJob.enqueue(1, two: "3") @@ -648,8 +648,8 @@ class QueueSubclassJob < QueueDefaultJob expected_count: 2, expected_args: [[1], [4]], expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_queue: 'queue_3', - expected_job_class: QueueSubclassJob + expected_queues: ['queue_3', 'queue_3'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] ) do Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do QueueSubclassJob.enqueue(1, two: "3") From 483593603a3daa64c6b4fc9cf60651c411b12275 Mon Sep 17 00:00:00 2001 From: David Cristofaro Date: Mon, 4 Mar 2024 13:11:55 +1100 Subject: [PATCH 3/3] Support bulk enqueue with differing class and options Allow bulk enqueue of multiple different job classes and differing job options in a single `.bulk_enqueue` block. Each job can now differ by job class, queue, priority, run at and tags (in addition to args and kwargs). --- docs/README.md | 4 +- lib/que/job.rb | 153 ++++++++++++++------------- spec/que/job.bulk_enqueue_spec.rb | 168 ++++++++++++++++++++++++++---- 3 files changed, 227 insertions(+), 98 deletions(-) diff --git a/docs/README.md b/docs/README.md index 4b5e1f46..25ef9283 100644 --- a/docs/README.md +++ b/docs/README.md @@ -849,13 +849,11 @@ Que.bulk_enqueue do end ``` -The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. +The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. `job_options` may be provided to `.bulk_enqueue` as defaults for the entire block. Alternatively, `job_options` may be individually provided to `.enqueue` and will take priority over block options. Limitations: - ActiveJob is not supported -- All jobs must use the same job class -- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`) - The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`) - The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll diff --git a/lib/que/job.rb b/lib/que/job.rb index 6093b162..50f48ba6 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -29,21 +29,18 @@ class Job SQL[:bulk_insert_jobs] = %{ - WITH args_and_kwargs as ( - SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb) - ) INSERT INTO public.que_jobs (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) SELECT - coalesce($1, 'default')::text, - coalesce($2, 100)::smallint, - coalesce($3, now())::timestamptz, - $4::text, - args_and_kwargs.args, - args_and_kwargs.kwargs, - coalesce($6, '{}')::jsonb, + coalesce(queue, 'default')::text, + coalesce(priority, 100)::smallint, + coalesce(run_at, now())::timestamptz, + job_class::text, + coalesce(args, '[]')::jsonb, + coalesce(kwargs, '{}')::jsonb, + coalesce(data, '{}')::jsonb, #{Que.job_schema_version} - FROM args_and_kwargs + FROM json_populate_recordset(null::que_jobs, $1) RETURNING * } @@ -82,6 +79,9 @@ def enqueue(*args) job_options = kwargs.delete(:job_options) || {} + job_class = job_options[:job_class] || name || + raise(Error, "Can't enqueue an anonymous subclass of Que::Job") + if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" @@ -94,28 +94,40 @@ def enqueue(*args) end end - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args: args, - kwargs: kwargs, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - if Thread.current[:que_jobs_to_bulk_insert] + # Don't resolve class settings during `.enqueue`, only resolve them + # during `._bulk_enqueue_insert` so they can be overwritten by specifying + # them in `.bulk_enqueue`. + attrs = { + queue: job_options[:queue], + priority: job_options[:priority], + run_at: job_options[:run_at], + job_class: job_class == 'Que::Job' ? nil : job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] && { tags: job_options[:tags] }, + klass: self, + } + if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' raise Que::Error, "Que.bulk_enqueue does not support ActiveJob." end - raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {} - Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs - new({}) - elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) + return new({}) + end + + attrs = { + queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, + priority: job_options[:priority] || resolve_que_setting(:priority), + run_at: job_options[:run_at] || resolve_que_setting(:run_at), + job_class: job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] ? { tags: job_options[:tags] } : {}, + } + + if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) attrs.merge!( args: Que.deserialize_json(Que.serialize_json(attrs[:args])), kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), @@ -144,16 +156,13 @@ def bulk_enqueue(job_options: {}, notify: false) jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] return [] if jobs_attrs.empty? - raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one? - args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) } - klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class]) - klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify) + _bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify) ensure Thread.current[:que_jobs_to_bulk_insert] = nil end - def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) - raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) } + def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false) + raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) } if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT @@ -167,49 +176,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) end end - args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs| - args_and_kwargs.merge( - args: args_and_kwargs.fetch(:args, []), - kwargs: args_and_kwargs.fetch(:kwargs, {}), - ) - end - - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args_and_kwargs_array: args_and_kwargs_array, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - - if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) - args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array))) - args_and_kwargs_array.map do |args_and_kwargs| - _run_attrs( - attrs.merge( - args: args_and_kwargs.fetch(:args), - kwargs: args_and_kwargs.fetch(:kwargs), - ), + jobs_attrs = jobs_attrs.map do |attrs| + klass = attrs[:klass] || self + + attrs = { + queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue, + priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority), + run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at), + job_class: attrs[:job_class] || job_options[:job_class] || klass.name, + args: attrs[:args] || [], + kwargs: attrs[:kwargs] || {}, + data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}), + klass: klass + } + + if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously) + klass._run_attrs( + attrs.reject { |k| k == :klass }.merge( + args: Que.deserialize_json(Que.serialize_json(attrs[:args])), + kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), + data: Que.deserialize_json(Que.serialize_json(attrs[:data])), + ) ) + nil + else + attrs end - else - attrs.merge!( - args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]), - data: Que.serialize_json(attrs[:data]), - ) - values_array = - Que.transaction do - Que.execute('SET LOCAL que.skip_notify TO true') unless notify - Que.execute( - :bulk_insert_jobs, - attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data), - ) - end - values_array.map(&method(:new)) - end + end.compact + + values_array = + Que.transaction do + Que.execute('SET LOCAL que.skip_notify TO true') unless notify + Que.execute( + :bulk_insert_jobs, + [Que.serialize_json(jobs_attrs.map { |attrs| attrs.reject { |k| k == :klass } })] + ) + end + values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) } end def run(*args) @@ -237,7 +240,7 @@ def resolve_que_setting(setting, *args) end end - private + protected def _run_attrs(attrs) attrs[:error_count] = 0 diff --git a/spec/que/job.bulk_enqueue_spec.rb b/spec/que/job.bulk_enqueue_spec.rb index 51a4982b..1b2523e6 100644 --- a/spec/que/job.bulk_enqueue_spec.rb +++ b/spec/que/job.bulk_enqueue_spec.rb @@ -194,6 +194,24 @@ def assert_enqueue( end end + it "should be able to handle multiple different job classes" do + class MyJobClass < Que::Job; end + class MyJobOtherClass < Que::Job; end + + assert_enqueue( + expected_count: 3, + expected_args: [[1], [4], []], + expected_kwargs: [{ two: '3' }, { five: '6' }, {}], + expected_job_classes: [MyJobClass, MyJobOtherClass, Que::Job], + ) do + Que.bulk_enqueue do + MyJobClass.enqueue(1, two: '3') + MyJobOtherClass.enqueue(4, five: '6') + Que.enqueue + end + end + end + it "should error appropriately on an anonymous job subclass" do klass = Class.new(Que::Job) error = assert_raises(Que::Error) { Que.bulk_enqueue { klass.enqueue } } @@ -278,14 +296,6 @@ def assert_enqueue( end end - it "should raise when job_options are passed to .enqueue rather than .bulk_enqueue" do - assert_raises_with_message(Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue") do - Que.bulk_enqueue do - Que.enqueue(1, two: "3", job_options: { priority: 15 }) - end - end - end - describe "when enqueuing jobs with tags" do it "should be able to specify tags on a case-by-case basis" do assert_enqueue( @@ -301,6 +311,20 @@ def assert_enqueue( end end + it "should be respect tags passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_tags: [["tag_3", "tag_4"], ["tag_1", "tag_2"]], + ) do + Que.bulk_enqueue(job_options: { tags: ["tag_1", "tag_2"] }) do + Que.enqueue(1, two: "3", job_options: { tags: ["tag_3", "tag_4"] }) + Que.enqueue(4, five: "six") + end + end + end + it "should no longer fall back to using tags specified at the top level if not specified in job_options" do assert_enqueue( expected_count: 2, @@ -347,19 +371,39 @@ def assert_enqueue( end end - it "should respect a job class defined as a string" do - class MyJobClass < Que::Job; end + describe "job class string" do + it "should respect a job class defined as a string" do + class MyJobClass < Que::Job; end - assert_enqueue( - expected_count: 2, - expected_args: [[1], [4]], - expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_job_classes: [MyJobClass, MyJobClass], - expected_result_classes: [Que::Job, Que::Job], - ) do - Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do - Que.enqueue(1, two: "3") - Que.enqueue(4, five: "six") + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_job_classes: [MyJobClass, MyJobClass], + expected_result_classes: [Que::Job, Que::Job], + ) do + Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do + Que.enqueue(1, two: "3") + Que.enqueue(4, five: "six") + end + end + end + + it "should respect a job class defined as a string passed to .enqueue" do + class MyJobClass < Que::Job; end + class MyJobOtherClass < Que::Job; end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_job_classes: [MyJobOtherClass, MyJobClass], + expected_result_classes: [Que::Job, Que::Job], + ) do + Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do + Que.enqueue(1, two: "3", job_options: { job_class: 'MyJobOtherClass' }) + Que.enqueue(4, five: "six") + end end end end @@ -443,6 +487,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a priority passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priorities: [2, 3], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] + ) do + Que.bulk_enqueue do + PrioritySubclassJob.enqueue(1, two: "3", job_options: { priority: 2 }) + PrioritySubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priorities: [2, 4], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + PrioritySubclassJob.enqueue(1, two: "3", job_options: { priority: 2 }) + PrioritySubclassJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden priority in a job class" do begin PrioritySubclassJob.priority = 60 @@ -535,6 +607,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a run_at passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_ats: [Time.now + 10, Time.now + 30], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] + ) do + Que.bulk_enqueue do + RunAtDefaultJob.enqueue(1, two: "3", job_options: { run_at: Time.now + 10 }) + RunAtDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_ats: [Time.now + 10, Time.now + 60], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + RunAtDefaultJob.enqueue(1, two: "3", job_options: { run_at: Time.now + 10 }) + RunAtDefaultJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden run_at in a job class" do begin RunAtSubclassJob.run_at = -> {Time.now + 90} @@ -627,6 +727,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a queue passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queues: ['queue_4', 'queue_1'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] + ) do + Que.bulk_enqueue do + QueueSubclassJob.enqueue(1, two: "3", job_options: { queue: 'queue_4' }) + QueueSubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queues: ['queue_4', 'queue_3'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] + ) do + Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do + QueueSubclassJob.enqueue(1, two: "3", job_options: { queue: 'queue_4' }) + QueueSubclassJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden queue in a job class" do begin QueueSubclassJob.queue = :queue_2