From 35a739271818f2eead408937580558e1174007b0 Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Mon, 12 Aug 2024 14:50:05 -0700 Subject: [PATCH] only send status updates when state meaningfully changes --- processor.go | 6 ++++- processor_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/processor.go b/processor.go index 9d0a6f6..4de9177 100644 --- a/processor.go +++ b/processor.go @@ -327,7 +327,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg log.Fatalf("Error serializing, aborting now to not lose work: %v", err) } - p.updateStatus() + // If we move a job back to the same state and there are no kick requests, no need to see a status + // update as the totals will be the same + if completedJob.PriorState != completedJob.Job.State || len(completedJob.KickRequests) > 0 { + p.updateStatus() + } if p.stateStorage.allJobsAreTerminal(r) && !p.stateStorage.hasExecutingJobs() { return diff --git a/processor_test.go b/processor_test.go index 10d9b7f..2092d10 100644 --- a/processor_test.go +++ b/processor_test.go @@ -482,6 +482,66 @@ func TestProcessor_StateCallback(t *testing.T) { } } +func TestNoStatusCounts(t *testing.T) { + oc := MyOverallContext{} + ac := MyAppContext{} + r := NewRun[MyOverallContext, MyJobContext]("job", oc) + r.AddJob(MyJobContext{ + Count: 0, + }) + states := []State[MyAppContext, MyOverallContext, MyJobContext]{ + State[MyAppContext, MyOverallContext, MyJobContext]{ + TriggerState: TRIGGER_STATE_NEW, + Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) { + jc.Count++ + if jc.Count < 10 { + return jc, TRIGGER_STATE_NEW, nil, nil + } + return jc, STATE_DONE, nil, nil + }, + Concurrency: 1, + }, + State[MyAppContext, MyOverallContext, MyJobContext]{ + TriggerState: STATE_DONE, + Terminal: true, + }, + } + + testSl := testStatusListener{ + t: t, + expectedStatuses: [][]StatusCount{ + { + StatusCount{ + State: STATE_DONE, + Completed: 0, + Terminal: true, + }, + StatusCount{ + State: TRIGGER_STATE_NEW, + Executing: 1, + }, + }, + { + StatusCount{ + State: STATE_DONE, + Completed: 1, + Terminal: true, + }, + StatusCount{ + State: TRIGGER_STATE_NEW, + }, + }, + }, + } + + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, &testSl) + assert.NoError(t, err) + + err = p.Exec(context.Background(), r) + time.Sleep(1 * time.Second) + assert.Equal(t, 2, testSl.cur) +} + func TestProcessor_Retries(t *testing.T) { t.Parallel() oc := MyOverallContext{}