diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e6c091..c6535146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617). + ## [0.12.0] - 2024-09-23 ⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version: diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 8afd27a4..60a771ec 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -48,7 +48,7 @@ type CompleterJobUpdated struct { // but is a minimal interface with the functions needed for completers to work // to more easily facilitate mocking. type PartialExecutor interface { - JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) } @@ -220,13 +220,13 @@ type BatchCompleter struct { baseservice.BaseService startstop.BaseStartStop - asyncCompleter *AsyncCompleter // used for non-complete completions - completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation - disableSleep bool // disable sleep in testing - maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted + completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation + disableSleep bool // disable sleep in testing + maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted exec PartialExecutor setStateParams map[int64]*batchCompleterSetState setStateParamsMu sync.RWMutex + setStateStartTimes map[int64]time.Time subscribeCh SubscribeChan waitOnBacklogChan chan struct{} waitOnBacklogWaiting bool @@ -239,18 +239,17 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s ) return baseservice.Init(archetype, &BatchCompleter{ - asyncCompleter: NewAsyncCompleter(archetype, exec, subscribeCh), - completionMaxSize: completionMaxSize, - exec: exec, - maxBacklog: maxBacklog, - setStateParams: make(map[int64]*batchCompleterSetState), - subscribeCh: subscribeCh, + completionMaxSize: completionMaxSize, + exec: exec, + maxBacklog: maxBacklog, + setStateParams: make(map[int64]*batchCompleterSetState), + setStateStartTimes: make(map[int64]time.Time), + subscribeCh: subscribeCh, }) } func (c *BatchCompleter) ResetSubscribeChan(subscribeCh SubscribeChan) { c.subscribeCh = subscribeCh - c.asyncCompleter.subscribeCh = subscribeCh } func (c *BatchCompleter) Start(ctx context.Context) error { @@ -263,13 +262,10 @@ func (c *BatchCompleter) Start(ctx context.Context) error { panic("subscribeCh must be non-nil") } - if err := c.asyncCompleter.Start(ctx); err != nil { - return err - } - go func() { started() defer stopped() // this defer should come first so it's first out + defer close(c.subscribeCh) c.Logger.DebugContext(ctx, c.Name+": Run loop started") defer c.Logger.DebugContext(ctx, c.Name+": Run loop stopped") @@ -327,17 +323,22 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } func (c *BatchCompleter) handleBatch(ctx context.Context) error { - var setStateBatch map[int64]*batchCompleterSetState + var ( + setStateBatch map[int64]*batchCompleterSetState + setStateStartTimes map[int64]time.Time + ) func() { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() setStateBatch = c.setStateParams + setStateStartTimes = c.setStateStartTimes // Don't bother resetting the map if there's nothing to process, // allowing the completer to idle efficiently. if len(setStateBatch) > 0 { c.setStateParams = make(map[int64]*batchCompleterSetState) + c.setStateStartTimes = make(map[int64]time.Time) } else { // Set nil to avoid a data race below in case the map is set as a // new job comes in. @@ -351,34 +352,39 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { // Complete a sub-batch with retries. Also helps reduce visual noise and // increase readability of loop below. - completeSubBatch := func(batchID []int64, batchFinalizedAt []time.Time) ([]*rivertype.JobRow, error) { + completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { start := time.Now() defer func() { - c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchID)) + c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchParams.ID)) }() return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - return c.exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ - ID: batchID, - FinalizedAt: batchFinalizedAt, - }) + return c.exec.JobSetStateIfRunningMany(ctx, batchParams) }) } // This could be written more simply using multiple `sliceutil.Map`s, but // it's done this way to allocate as few new slices as necessary. - mapIDsAndFinalizedAt := func(setStateBatch map[int64]*batchCompleterSetState) ([]int64, []time.Time) { - var ( - batchIDs = make([]int64, len(setStateBatch)) - batchFinalizedAt = make([]time.Time, len(setStateBatch)) - i int - ) + mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { + params := &riverdriver.JobSetStateIfRunningManyParams{ + ID: make([]int64, len(setStateBatch)), + ErrData: make([][]byte, len(setStateBatch)), + FinalizedAt: make([]*time.Time, len(setStateBatch)), + MaxAttempts: make([]*int, len(setStateBatch)), + ScheduledAt: make([]*time.Time, len(setStateBatch)), + State: make([]rivertype.JobState, len(setStateBatch)), + } + var i int for _, setState := range setStateBatch { - batchIDs[i] = setState.Params.ID - batchFinalizedAt[i] = *setState.Params.FinalizedAt + params.ID[i] = setState.Params.ID + params.ErrData[i] = setState.Params.ErrData + params.FinalizedAt[i] = setState.Params.FinalizedAt + params.MaxAttempts[i] = setState.Params.MaxAttempts + params.ScheduledAt[i] = setState.Params.ScheduledAt + params.State[i] = setState.Params.State i++ } - return batchIDs, batchFinalizedAt + return params } // Tease apart enormous batches into sub-batches. @@ -387,15 +393,23 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { // doesn't allocate any additional memory in case the entire batch is // smaller than the sub-batch maximum size (which will be the common case). var ( - batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch) - jobRows []*rivertype.JobRow + params = mapBatch(setStateBatch) + jobRows []*rivertype.JobRow ) c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch)) if len(setStateBatch) > c.completionMaxSize { jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) for i := 0; i < len(setStateBatch); i += c.completionMaxSize { - endIndex := min(i+c.completionMaxSize, len(batchID)) // beginning of next sub-batch or end of slice - jobRowsSubBatch, err := completeSubBatch(batchID[i:endIndex], batchFinalizedAt[i:endIndex]) + endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice + subBatch := &riverdriver.JobSetStateIfRunningManyParams{ + ID: params.ID[i:endIndex], + ErrData: params.ErrData[i:endIndex], + FinalizedAt: params.FinalizedAt[i:endIndex], + MaxAttempts: params.MaxAttempts[i:endIndex], + ScheduledAt: params.ScheduledAt[i:endIndex], + State: params.State[i:endIndex], + } + jobRowsSubBatch, err := completeSubBatch(subBatch) if err != nil { return err } @@ -403,7 +417,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } } else { var err error - jobRows, err = completeSubBatch(batchID, batchFinalizedAt) + jobRows, err = completeSubBatch(params) if err != nil { return err } @@ -411,7 +425,8 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { setState := setStateBatch[jobRow.ID] - setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(*setState.Params.FinalizedAt) + startTime := setStateStartTimes[jobRow.ID] + setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime) return CompleterJobUpdated{Job: jobRow, JobStats: setState.Stats} }) @@ -432,13 +447,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - // Send completions other than setting to `complete` to an async completer. - // We consider this okay because these are expected to be much more rare, so - // only optimizing `complete` will yield huge speed gains. - if params.State != rivertype.JobStateCompleted { - return c.asyncCompleter.JobSetStateIfRunning(ctx, stats, params) - } - + now := c.Time.NowUTC() // If we've built up too much of a backlog because the completer's fallen // behind, block completions until the complete loop's had a chance to catch // up. @@ -448,16 +457,11 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta defer c.setStateParamsMu.Unlock() c.setStateParams[params.ID] = &batchCompleterSetState{params, stats} + c.setStateStartTimes[params.ID] = now return nil } -func (c *BatchCompleter) Stop() { - c.BaseStartStop.Stop() - c.asyncCompleter.Stop() - // subscribeCh already closed by asyncCompleter.Stop ^ -} - func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { c.setStateParamsMu.RLock() var ( diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 942e4bed..8e237fe4 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -25,25 +25,25 @@ import ( ) type partialExecutorMock struct { - JobSetCompleteIfRunningManyCalled bool - JobSetCompleteIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) - JobSetStateIfRunningCalled bool - JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) - mu sync.Mutex + JobSetStateIfRunningManyCalled bool + JobSetStateIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningCalled bool + JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) + mu sync.Mutex } // NewPartialExecutorMock returns a new mock with all mock functions set to call // down into the given real executor. func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock { return &partialExecutorMock{ - JobSetCompleteIfRunningManyFunc: exec.JobSetCompleteIfRunningMany, - JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, + JobSetStateIfRunningManyFunc: exec.JobSetStateIfRunningMany, + JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, } } -func (m *partialExecutorMock) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { - m.setCalled(func() { m.JobSetCompleteIfRunningManyCalled = true }) - return m.JobSetCompleteIfRunningManyFunc(ctx, params) +func (m *partialExecutorMock) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetStateIfRunningManyCalled = true }) + return m.JobSetStateIfRunningManyFunc(ctx, params) } func (m *partialExecutorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -325,7 +325,8 @@ func TestAsyncCompleter(t *testing.T) { return NewAsyncCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *AsyncCompleter) { completer.disableSleep = true }, - func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }) + func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }, + ) } func TestBatchCompleter(t *testing.T) { @@ -336,7 +337,8 @@ func TestBatchCompleter(t *testing.T) { return NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *BatchCompleter) { completer.disableSleep = true }, - func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }) + func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }, + ) ctx := context.Background() @@ -728,11 +730,11 @@ func testCompleter[TCompleter JobCompleter]( } execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { if err := maybeError(); err != nil { return nil, err } - return bundle.exec.JobSetCompleteIfRunningMany(ctx, params) + return bundle.exec.JobSetStateIfRunningMany(ctx, params) } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { if err := maybeError(); err != nil { @@ -751,7 +753,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job still managed to complete despite the errors. requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) @@ -767,7 +769,7 @@ func testCompleter[TCompleter JobCompleter]( disableSleep(completer) execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { return nil, context.Canceled } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -788,7 +790,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job is still running because the completer is forced to give up // immediately on certain types of errors like where a pool is closed. @@ -805,7 +807,7 @@ func testCompleter[TCompleter JobCompleter]( disableSleep(completer) execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { return nil, puddle.ErrClosedPool } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -826,7 +828,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job is still running because the completer is forced to give up // immediately on certain types of errors like where a pool is closed. diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index ce93868c..a4cf1add 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1944,6 +1944,293 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + setStateManyParams := func(params ...*riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams { + batchParams := &riverdriver.JobSetStateIfRunningManyParams{} + // ID: make([]int64, len(params)), + // ErrData: make([]byte, len(params)), + // FinalizedAt: make([]*time.Time, len(params)), + // MaxAttempts: []*int{maxAttempts}, + // ScheduledAt: []*time.Time{scheduledAt}, + // State: []rivertype.JobState{params.State}, + // } + for _, param := range params { + var ( + errData []byte + finalizedAt *time.Time + maxAttempts *int + scheduledAt *time.Time + ) + if param.ErrData != nil { + errData = param.ErrData + } + if param.FinalizedAt != nil { + finalizedAt = param.FinalizedAt + } + if param.MaxAttempts != nil { + maxAttempts = param.MaxAttempts + } + if param.ScheduledAt != nil { + scheduledAt = param.ScheduledAt + } + + batchParams.ID = append(batchParams.ID, param.ID) + batchParams.ErrData = append(batchParams.ErrData, errData) + batchParams.FinalizedAt = append(batchParams.FinalizedAt, finalizedAt) + batchParams.MaxAttempts = append(batchParams.MaxAttempts, maxAttempts) + batchParams.ScheduledAt = append(batchParams.ScheduledAt, scheduledAt) + batchParams.State = append(batchParams.State, param.State) + } + + return batchParams + } + + t.Run("JobSetStateIfRunningMany_JobSetStateCompleted", func(t *testing.T) { + t.Parallel() + + t.Run("CompletesARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCompleted(job.ID, now))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + + t.Run("DoesNotCompleteARetryableJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCompleted(job.ID, now))) + jobAfter := jobsAfter[0] + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.Nil(t, jobAfter.FinalizedAt) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateErrored", func(t *testing.T) { + t.Parallel() + + t.Run("SetsARunningJobToRetryable", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.WithinDuration(t, now, jobAfter.ScheduledAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + + // validate error payload: + require.Len(t, jobAfter.Errors, 1) + require.Equal(t, now, jobAfter.Errors[0].At) + require.Equal(t, 1, jobAfter.Errors[0].Attempt) + require.Equal(t, "fake error", jobAfter.Errors[0].Error) + require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace) + }) + + t.Run("DoesNotTouchAlreadyRetryableJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRetryable), + ScheduledAt: ptrutil.Ptr(now.Add(10 * time.Second)), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + }) + + t.Run("SetsAJobWithCancelAttemptedAtToCancelled", func(t *testing.T) { + // If a job has cancel_attempted_at in its metadata, it means that the user + // tried to cancel the job with the Cancel API but that the job + // finished/errored before the producer received the cancel notification. + // + // In this case, we want to move the job to cancelled instead of retryable + // so that the job is not retried. + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339))), + State: ptrutil.Ptr(rivertype.JobStateRunning), + ScheduledAt: ptrutil.Ptr(now.Add(-10 * time.Second)), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.NotNil(t, jobAfter.FinalizedAt) + // Loose assertion against FinalizedAt just to make sure it was set (it uses + // the database's now() instead of a passed-in time): + require.WithinDuration(t, time.Now().UTC(), *jobAfter.FinalizedAt, 2*time.Second) + // ScheduledAt should not be touched: + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + + // Errors should still be appended to: + require.Len(t, jobAfter.Errors, 1) + require.Contains(t, jobAfter.Errors[0].Error, "fake error") + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateCancelled", func(t *testing.T) { + t.Parallel() + + t.Run("CancelsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + UniqueStates: 0xFF, + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCancelled(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateDiscarded", func(t *testing.T) { + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + UniqueStates: 0xFF, + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateDiscarded(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateDiscarded, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + require.Equal(t, "unique-key", string(jobAfter.UniqueKey)) + require.Equal(t, rivertype.JobStates(), jobAfter.UniqueStates) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) + }) + }) + + t.Run("JobSetStateIfRunningMany_MultipleJobsAtOnce", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + future := now.Add(10 * time.Second) + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams( + riverdriver.JobSetStateCompleted(job1.ID, now), + riverdriver.JobSetStateErrorRetryable(job2.ID, future, makeErrPayload(t, now)), + riverdriver.JobSetStateCancelled(job3.ID, now, makeErrPayload(t, now)), + )) + require.NoError(t, err) + completedJob := jobsAfter[0] + require.Equal(t, rivertype.JobStateCompleted, completedJob.State) + require.WithinDuration(t, now, *completedJob.FinalizedAt, time.Microsecond) + + retryableJob := jobsAfter[1] + require.Equal(t, rivertype.JobStateRetryable, retryableJob.State) + require.WithinDuration(t, future, retryableJob.ScheduledAt, time.Microsecond) + // validate error payload: + require.Len(t, retryableJob.Errors, 1) + require.Equal(t, now, retryableJob.Errors[0].At) + require.Equal(t, 1, retryableJob.Errors[0].Attempt) + require.Equal(t, "fake error", retryableJob.Errors[0].Error) + require.Equal(t, "foo.go:123\nbar.go:456", retryableJob.Errors[0].Trace) + + cancelledJob := jobsAfter[2] + require.Equal(t, rivertype.JobStateCancelled, cancelledJob.State) + require.WithinDuration(t, now, *cancelledJob.FinalizedAt, time.Microsecond) + }) + t.Run("JobUpdate", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 2cedb610..52af07f4 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -126,6 +126,7 @@ type Executor interface { JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error) JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error) @@ -355,6 +356,18 @@ func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, maxAttempts in return &JobSetStateIfRunningParams{ID: id, MaxAttempts: &maxAttempts, ScheduledAt: &scheduledAt, State: rivertype.JobStateAvailable} } +// JobSetStateIfRunningManyParams are parameters to update the state of +// currently running jobs. Use one of the constructors below to ensure a correct +// combination of parameters. +type JobSetStateIfRunningManyParams struct { + ID []int64 + ErrData [][]byte + FinalizedAt []*time.Time + MaxAttempts []*int + ScheduledAt []*time.Time + State []rivertype.JobState +} + type JobUpdateParams struct { ID int64 AttemptDoUpdate bool diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 77f16cda..d337907d 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1283,6 +1283,133 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet return &i, err } +const jobSetStateIfRunningMany = `-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest($1::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($2::text[])::river_job_state AS state, + unnest($3::boolean[]) AS finalized_at_do_update, + unnest($4::timestamptz[]) AS finalized_at, + unnest($5::boolean[]) AS errors_do_update, + unnest($6::jsonb[]) AS errors, + unnest($7::boolean[]) AS max_attempts_do_update, + unnest($8::int[]) AS max_attempts, + unnest($9::boolean[]) AS scheduled_at_do_update, + unnest($10::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM updated_job +` + +type JobSetStateIfRunningManyParams struct { + IDs []int64 + State []string + FinalizedAtDoUpdate []bool + FinalizedAt []time.Time + ErrorsDoUpdate []bool + Errors []string + MaxAttemptsDoUpdate []bool + MaxAttempts []int32 + ScheduledAtDoUpdate []bool + ScheduledAt []time.Time +} + +func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *JobSetStateIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobSetStateIfRunningMany, + pq.Array(arg.IDs), + pq.Array(arg.State), + pq.Array(arg.FinalizedAtDoUpdate), + pq.Array(arg.FinalizedAt), + pq.Array(arg.ErrorsDoUpdate), + pq.Array(arg.Errors), + pq.Array(arg.MaxAttemptsDoUpdate), + pq.Array(arg.MaxAttempts), + pq.Array(arg.ScheduledAtDoUpdate), + pq.Array(arg.ScheduledAt), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobUpdate = `-- name: JobUpdate :one UPDATE river_job SET diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml index 0290d662..f8ebbee1 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml @@ -28,6 +28,7 @@ sql: emit_result_struct_pointers: true rename: + ids: "IDs" ttl: "TTL" overrides: diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 4e4e894e..ca8f0e1a 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -576,6 +576,49 @@ func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver return jobRowFromInternal(job) } +func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + setStateParams := &dbsqlc.JobSetStateIfRunningManyParams{ + IDs: params.ID, + Errors: make([]string, len(params.ID)), + ErrorsDoUpdate: make([]bool, len(params.ID)), + FinalizedAt: make([]time.Time, len(params.ID)), + FinalizedAtDoUpdate: make([]bool, len(params.ID)), + MaxAttempts: make([]int32, len(params.ID)), + MaxAttemptsDoUpdate: make([]bool, len(params.ID)), + ScheduledAt: make([]time.Time, len(params.ID)), + ScheduledAtDoUpdate: make([]bool, len(params.ID)), + State: make([]string, len(params.ID)), + } + + const defaultObject = "{}" + + for i := 0; i < len(params.ID); i++ { + setStateParams.Errors[i] = valutil.ValOrDefault(string(params.ErrData[i]), defaultObject) + if params.ErrData[i] != nil { + setStateParams.ErrorsDoUpdate[i] = true + } + if params.FinalizedAt[i] != nil { + setStateParams.FinalizedAtDoUpdate[i] = true + setStateParams.FinalizedAt[i] = *params.FinalizedAt[i] + } + if params.MaxAttempts[i] != nil { + setStateParams.MaxAttemptsDoUpdate[i] = true + setStateParams.MaxAttempts[i] = int32(*params.MaxAttempts[i]) //nolint:gosec + } + if params.ScheduledAt[i] != nil { + setStateParams.ScheduledAtDoUpdate[i] = true + setStateParams.ScheduledAt[i] = *params.ScheduledAt[i] + } + setStateParams.State[i] = string(params.State[i]) + } + + jobs, err := dbsqlc.New().JobSetStateIfRunningMany(ctx, e.dbtx, setStateParams) + if err != nil { + return nil, interpretError(err) + } + return mapSliceError(jobs, jobRowFromInternal) +} + func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { job, err := dbsqlc.New().JobUpdate(ctx, e.dbtx, &dbsqlc.JobUpdateParams{ ID: params.ID, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 06c84c25..7703c15a 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -523,6 +523,66 @@ UNION SELECT * FROM updated_job; +-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest(@ids::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state AS state, + unnest(@finalized_at_do_update::boolean[]) AS finalized_at_do_update, + unnest(@finalized_at::timestamptz[]) AS finalized_at, + unnest(@errors_do_update::boolean[]) AS errors_do_update, + unnest(@errors::jsonb[]) AS errors, + unnest(@max_attempts_do_update::boolean[]) AS max_attempts_do_update, + unnest(@max_attempts::int[]) AS max_attempts, + unnest(@scheduled_at_do_update::boolean[]) AS scheduled_at_do_update, + unnest(@scheduled_at::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.* +) +SELECT * +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT * +FROM updated_job; + -- A generalized update for any property on a job. This brings in a large number -- of parameters and therefore may be more suitable for testing than production. -- name: JobUpdate :one diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 399300c7..00317e53 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -1261,6 +1261,130 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet return &i, err } +const jobSetStateIfRunningMany = `-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest($1::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($2::text[])::river_job_state AS state, + unnest($3::boolean[]) AS finalized_at_do_update, + unnest($4::timestamptz[]) AS finalized_at, + unnest($5::boolean[]) AS errors_do_update, + unnest($6::jsonb[]) AS errors, + unnest($7::boolean[]) AS max_attempts_do_update, + unnest($8::int[]) AS max_attempts, + unnest($9::boolean[]) AS scheduled_at_do_update, + unnest($10::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM updated_job +` + +type JobSetStateIfRunningManyParams struct { + IDs []int64 + State []string + FinalizedAtDoUpdate []bool + FinalizedAt []time.Time + ErrorsDoUpdate []bool + Errors [][]byte + MaxAttemptsDoUpdate []bool + MaxAttempts []int32 + ScheduledAtDoUpdate []bool + ScheduledAt []time.Time +} + +func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *JobSetStateIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobSetStateIfRunningMany, + arg.IDs, + arg.State, + arg.FinalizedAtDoUpdate, + arg.FinalizedAt, + arg.ErrorsDoUpdate, + arg.Errors, + arg.MaxAttemptsDoUpdate, + arg.MaxAttempts, + arg.ScheduledAtDoUpdate, + arg.ScheduledAt, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobUpdate = `-- name: JobUpdate :one UPDATE river_job SET diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml index 93fb53fc..17ff029c 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml +++ b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml @@ -29,6 +29,7 @@ sql: emit_result_struct_pointers: true rename: + ids: "IDs" ttl: "TTL" overrides: diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 3107f598..8b4e6d0b 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -460,6 +460,46 @@ func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver return jobRowFromInternal(job) } +func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + setStateParams := &dbsqlc.JobSetStateIfRunningManyParams{ + IDs: params.ID, + Errors: params.ErrData, + ErrorsDoUpdate: make([]bool, len(params.ID)), + FinalizedAt: make([]time.Time, len(params.ID)), + FinalizedAtDoUpdate: make([]bool, len(params.ID)), + MaxAttempts: make([]int32, len(params.ID)), + MaxAttemptsDoUpdate: make([]bool, len(params.ID)), + ScheduledAt: make([]time.Time, len(params.ID)), + ScheduledAtDoUpdate: make([]bool, len(params.ID)), + State: make([]string, len(params.ID)), + } + + for i := 0; i < len(params.ID); i++ { + if params.ErrData[i] != nil { + setStateParams.ErrorsDoUpdate[i] = true + } + if params.FinalizedAt[i] != nil { + setStateParams.FinalizedAtDoUpdate[i] = true + setStateParams.FinalizedAt[i] = *params.FinalizedAt[i] + } + if params.MaxAttempts[i] != nil { + setStateParams.MaxAttemptsDoUpdate[i] = true + setStateParams.MaxAttempts[i] = int32(*params.MaxAttempts[i]) //nolint:gosec + } + if params.ScheduledAt[i] != nil { + setStateParams.ScheduledAtDoUpdate[i] = true + setStateParams.ScheduledAt[i] = *params.ScheduledAt[i] + } + setStateParams.State[i] = string(params.State[i]) + } + + jobs, err := dbsqlc.New().JobSetStateIfRunningMany(ctx, e.dbtx, setStateParams) + if err != nil { + return nil, interpretError(err) + } + return mapSliceError(jobs, jobRowFromInternal) +} + func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { job, err := dbsqlc.New().JobUpdate(ctx, e.dbtx, &dbsqlc.JobUpdateParams{ ID: params.ID,