Skip to content

Commit

Permalink
Support bulk enqueue with differing class and options
Browse files Browse the repository at this point in the history
Allow bulk enqueue of multiple different job classes and differing job
options in a single `.bulk_enqueue` block. Each job can now differ by
job class, queue, priority, run at and tags (in addition to args and
kwargs).
  • Loading branch information
dtcristo committed Mar 6, 2024
1 parent ae9508c commit 4835936
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 98 deletions.
4 changes: 1 addition & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -849,13 +849,11 @@ Que.bulk_enqueue do
end
```

The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query.
The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. `job_options` may be provided to `.bulk_enqueue` as defaults for the entire block. Alternatively, `job_options` may be individually provided to `.enqueue` and will take priority over block options.

Limitations:

- ActiveJob is not supported
- All jobs must use the same job class
- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`)
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll

Expand Down
153 changes: 78 additions & 75 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ class Job

SQL[:bulk_insert_jobs] =
%{
WITH args_and_kwargs as (
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
coalesce($3, now())::timestamptz,
$4::text,
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
coalesce(queue, 'default')::text,
coalesce(priority, 100)::smallint,
coalesce(run_at, now())::timestamptz,
job_class::text,
coalesce(args, '[]')::jsonb,
coalesce(kwargs, '{}')::jsonb,
coalesce(data, '{}')::jsonb,
#{Que.job_schema_version}
FROM args_and_kwargs
FROM json_populate_recordset(null::que_jobs, $1)
RETURNING *
}

Expand Down Expand Up @@ -82,6 +79,9 @@ def enqueue(*args)

job_options = kwargs.delete(:job_options) || {}

job_class = job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job")

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
Expand All @@ -94,28 +94,40 @@ def enqueue(*args)
end
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if Thread.current[:que_jobs_to_bulk_insert]
# Don't resolve class settings during `.enqueue`, only resolve them
# during `._bulk_enqueue_insert` so they can be overwritten by specifying
# them in `.bulk_enqueue`.
attrs = {
queue: job_options[:queue],
priority: job_options[:priority],
run_at: job_options[:run_at],
job_class: job_class == 'Que::Job' ? nil : job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] && { tags: job_options[:tags] },
klass: self,
}

if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
new({})
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
return new({})
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
job_class: job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs.merge!(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
Expand Down Expand Up @@ -144,16 +156,13 @@ def bulk_enqueue(job_options: {}, notify: false)
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
return [] if jobs_attrs.empty?
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
_bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify)
ensure
Thread.current[:que_jobs_to_bulk_insert] = nil
end

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }
def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false)
raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) }

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
Expand All @@ -167,49 +176,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
end
end

args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
args_and_kwargs.merge(
args: args_and_kwargs.fetch(:args, []),
kwargs: args_and_kwargs.fetch(:kwargs, {}),
)
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args_and_kwargs_array: args_and_kwargs_array,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
args_and_kwargs_array.map do |args_and_kwargs|
_run_attrs(
attrs.merge(
args: args_and_kwargs.fetch(:args),
kwargs: args_and_kwargs.fetch(:kwargs),
),
jobs_attrs = jobs_attrs.map do |attrs|
klass = attrs[:klass] || self

attrs = {
queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue,
priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority),
run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at),
job_class: attrs[:job_class] || job_options[:job_class] || klass.name,
args: attrs[:args] || [],
kwargs: attrs[:kwargs] || {},
data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}),
klass: klass
}

if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously)
klass._run_attrs(
attrs.reject { |k| k == :klass }.merge(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
)
)
nil
else
attrs
end
else
attrs.merge!(
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
data: Que.serialize_json(attrs[:data]),
)
values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
)
end
values_array.map(&method(:new))
end
end.compact

values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
[Que.serialize_json(jobs_attrs.map { |attrs| attrs.reject { |k| k == :klass } })]
)
end
values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) }
end

def run(*args)
Expand Down Expand Up @@ -237,7 +240,7 @@ def resolve_que_setting(setting, *args)
end
end

private
protected

def _run_attrs(attrs)
attrs[:error_count] = 0
Expand Down
Loading

0 comments on commit 4835936

Please sign in to comment.