feat: Consolidate enqueue and enqueue_at to use enqueue_all under the hood#103
feat: Consolidate enqueue and enqueue_at to use enqueue_all under the hood#103CelticMajora wants to merge 16 commits intomainfrom
Conversation
Implements ActiveJob 7.1's enqueue_all adapter contract so that ActiveJob.perform_all_later issues a single bulk INSERT rather than looping through per-job enqueues. Per the Rails bulk-enqueue contract, this skips ActiveJob's before/around/after_enqueue callbacks, sets provider_job_id and successfully_enqueued on each job, and returns the count of successfully enqueued jobs. Since insert_all bypasses ActiveRecord callbacks, Delayed::Job's before_save hooks (set_default_run_at, set_name) are replicated inline while building each row. Jobs configured to run inline (Delayed::Worker.delay_jobs != true) fall back to the per-job path so that invoke_job semantics are preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wraps per-job attribute-building inside Delayed.lifecycle run_callbacks(:enqueue, dj) so Delayed plugins (monitoring, instrumentation, etc.) continue to observe each job as it is bulk-enqueued via perform_all_later. The callback block invokes dj.hook(:enqueue) for parity with the single-enqueue path; JobWrapper explicitly no-ops that hook, as before. ActiveJob's own before/around/after_enqueue callbacks remain skipped, per the Rails 7.1 bulk-enqueue contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces two inline ActiveSupport::Notifications subscriptions with the shared `emit_notification` matcher from spec/helper.rb, matching the precedent in monitor_spec.rb. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the adapter's inline replication of Delayed::Job's before_save callbacks with a single `before_save_hooks` method on the model, and passes `record_timestamps: true` to insert_all so ActiveRecord stamps created_at/updated_at during the bulk insert rather than the adapter hoisting db_time_now and assigning them manually. Keeps the bulk path automatically in sync with any future before_save additions to Delayed::Job. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
insert_all with `returning: %w(id)` raises ArgumentError on MySQL (supports_insert_returning? is false unless MariaDB 10.5+). Dropping `returning:` alone would leave `result.rows` empty on those adapters, breaking the `provider_job_id` contract. Introduces a `bulk_enqueue_supported?` predicate guarding the bulk path on both `Delayed::Worker.delay_jobs == true` and `connection.supports_insert_returning?`. MySQL callers still get correct semantics via the per-job fallback; PostgreSQL and SQLite keep the single-INSERT fast path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lity The bulk path relies on three ActiveJob/Rails 7.1+ APIs that don't exist in Rails 6.0/6.1/7.0: - insert_all(record_timestamps: true) (Rails 7.0+) - ActiveJob::Base#set instance method (AJ 7.1+) - successfully_enqueued= setter (AJ 7.1+) Since perform_all_later itself is AJ 7.1+, there's no caller for enqueue_all on earlier versions. Define the public method only when ActiveJob >= 7.1 and guard the spec block the same way. Private helpers stay defined unconditionally for simpler class layout; they are unreachable on <7.1 since only enqueue_all calls them. Verified against rails-6-0, rails-6-1, rails-7-0 appraisals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| end | ||
|
|
||
| def coerce_scheduled_at(value) | ||
| value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone |
There was a problem hiding this comment.
Does job.scheduled_at really need to be coerced? Seems reasonable to expect the implementation to construct a valid Time object.
There was a problem hiding this comment.
I wasn't sure if we needed to support old versions of ActiveJob where it was a numeric: https://apidock.com/rails/ActiveJob/Core/scheduled_at%3D
There was a problem hiding this comment.
Ah, looks like that landed in 7.1, so we'd need to restrict our activejob dependency in turn. The activejob dependency is technically optional, but based on our appraisals we still support >= 6.0... 🤔
I think we can consider changing support in a separate PR and keep the coercion in place here, if that makes sense to you.
| job.provider_job_id = id | ||
| job.successfully_enqueued = true |
There was a problem hiding this comment.
Setting successfully_enqueued actually diverges from the singleton _enqueue method, and makes me wonder if we should be setting successfully_enqueued down there as well.
Additionally, the way that _enqueue handles the provider ID assignment has always felt a little vestigial, as the current implementation cannot know the ID before it has persisted the job, at which point it would be pointlessly costly to issue an UPDATE to persist the ID into the ActiveJob attributes. As a result, the ActiveJob attributes always have a null provider_job_id upon read, so we make a point not to rely on it.
My thinking is that we could simplify this method significantly (and support MySQL) if we skip handling provider_job_id after persistence.
(IMO, if we actually wanted to set it in the ActiveJob payload, we should generate a uuidv7 up front and persist that both to the jobs table and the ActiveJob payload. But proper support starts to tug at other behaviors like retry_on, which can pass the same ActiveJob payload through multiple provider job rows, each with their own execution IDs.)
| # on <7.1. | ||
| if ActiveJob.gem_version >= Gem::Version.new('7.1') | ||
| def enqueue_all(jobs) | ||
| return 0 if jobs.empty? |
There was a problem hiding this comment.
It's not clear to me if enqueue_all is required to return anything: https://github.com/rails/rails/blob/eb126bb140127d3589ad9be093a845e24fc4f475/activejob/lib/active_job/enqueuing.rb#L19
There was a problem hiding this comment.
Other adapters I was looking at(ie: Solid Queue and Sidekiq) all returned the count, but it was unclear whether that was part of the contract.
smudge
left a comment
There was a problem hiding this comment.
TAFN -- Really excited for this! I left a few comments, but overall I'm looking to unify the implementation of enqueue and enqueue_all as much as possible. (Like, in theory, enqueue(job) could defined as enqueue_all([job]), and I'd like to see how close to that we can get!)
As it stands, my read is that the enqueue_all path makes an effort to replicate the lifecycle hooks of Delayed::Backend::Base#enqueue (e.g. using JobPreparer, calling the before_save methods, and running the :enqueue callback for each one), but the codepaths are not shared and would be subject to drift.
Additionally, pushing ourselves closer to a shared enqueue_all(one_or_more_jobs) implementation could also force some interesting questions about the necessity of some of these extra enqueue-time operations like the :enqueue lifecycle hook. I think there are some compelling arguments for dropping support for per-job operations/configurations that could instead be managed in bulk. (Like, if :enqueue as a lifecycle hook were to receive a list of jobs rather than a single job -- obv. a breaking change, but gets us the observability hook without the N+1 calls against each individual job object.)
I'm mostly spitballing in that last paragraph, but wanted to start a conversation, because we're in a territory where I think it would already be pretty reasonable to cut a new major version and break some older behaviors/assumptions.
Previously, calling enqueue_all on an adapter without INSERT RETURNING support (e.g. MySQL) or with Delayed::Worker.delay_jobs set to false silently fell back to enqueueing jobs one at a time. That fallback lived entirely inside the adapter and partially papered over a situation the caller probably wants to know about. Replace the fallback with an explicit typed raise: EnqueueAllNotSupportedError. ActiveJob.perform_all_later now propagates the exception to the caller rather than quietly doing the per-job thing. Callers who want per-job enqueue should stick with perform_later. Rename bulk_enqueue_supported? -> enqueue_all_supported? to match the error class. Drop the transactional enqueue_all_one_by_one helper since there is no fallback path to wrap anymore. Adjust specs: gate happy-path assertions on a 'when bulk enqueue is supported' context (skipped on adapters lacking INSERT RETURNING) and replace the former fallback tests with EnqueueAllNotSupportedError assertions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Waiting on #104 to move forward |
Now that the ActiveJob adapter routes singleton `enqueue` through `enqueue_all([job])`, the `:enqueue` lifecycle event needs a shape that naturally accommodates both single- and bulk-enqueue calls. Reshape `Delayed::Lifecycle::EVENTS[:enqueue]` from `[:job]` to `[:jobs]` and fire it once per call with the array. - Singleton path (`Delayed::Backend::Base.enqueue_job`) passes `[job]`. - Bulk path (`ActiveJobAdapter#enqueue_all`) passes the input array, with `provider_job_id` assignment moved inside the callback block so `after(:enqueue)` subscribers see the populated ids. - The `delayed.job.enqueue` ActiveSupport::Notification stays under the same name; its payload now carries `:count` and `:jobs` (heterogeneous: `Delayed::Job` instances from the singleton path, ActiveJob instances from the bulk path). Note for subscribers: Ruby auto-splats array args into block parameters when the block has multiple required params. Use a single-param signature (`do |jobs, &block|`) to keep the array intact. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| } | ||
| end | ||
|
|
||
| def self.bulk_enqueue_tags(jobs) |
There was a problem hiding this comment.
TODO, determine what is valuable here
| unless Delayed::Worker.delay_jobs == true | ||
| raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with delay_jobs false" | ||
| end |
There was a problem hiding this comment.
Not confident on this restriction, but removing it makes the code quite a bit messier.
Implements
enqueue_allin order to support Rails 7.1 perform_all_laterKey decisions:
enqueue_after_transaction_commitWorker.delay_jobs == falseenqueuelifecycle hookprovider_job_idis not assigned in the mysql/sqlite world, only the postgres world/domain @smudge @effron
/platform @smudge @effron