Skip to content

Commit

Permalink
Merge pull request #7 from thesilentg/status2
Browse files Browse the repository at this point in the history
Only send status updates when state meaningfully changes
  • Loading branch information
gaffo authored Aug 12, 2024
2 parents 2f2ad66 + 35a7392 commit ec5cc41
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
6 changes: 5 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg
log.Fatalf("Error serializing, aborting now to not lose work: %v", err)
}

p.updateStatus()
// If we move a job back to the same state and there are no kick requests, no need to see a status
// update as the totals will be the same
if completedJob.PriorState != completedJob.Job.State || len(completedJob.KickRequests) > 0 {
p.updateStatus()
}

if p.stateStorage.allJobsAreTerminal(r) && !p.stateStorage.hasExecutingJobs() {
return
Expand Down
60 changes: 60 additions & 0 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,66 @@ func TestProcessor_StateCallback(t *testing.T) {
}
}

func TestNoStatusCounts(t *testing.T) {
oc := MyOverallContext{}
ac := MyAppContext{}
r := NewRun[MyOverallContext, MyJobContext]("job", oc)
r.AddJob(MyJobContext{
Count: 0,
})
states := []State[MyAppContext, MyOverallContext, MyJobContext]{
State[MyAppContext, MyOverallContext, MyJobContext]{
TriggerState: TRIGGER_STATE_NEW,
Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) {
jc.Count++
if jc.Count < 10 {
return jc, TRIGGER_STATE_NEW, nil, nil
}
return jc, STATE_DONE, nil, nil
},
Concurrency: 1,
},
State[MyAppContext, MyOverallContext, MyJobContext]{
TriggerState: STATE_DONE,
Terminal: true,
},
}

testSl := testStatusListener{
t: t,
expectedStatuses: [][]StatusCount{
{
StatusCount{
State: STATE_DONE,
Completed: 0,
Terminal: true,
},
StatusCount{
State: TRIGGER_STATE_NEW,
Executing: 1,
},
},
{
StatusCount{
State: STATE_DONE,
Completed: 1,
Terminal: true,
},
StatusCount{
State: TRIGGER_STATE_NEW,
},
},
},
}

p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, &testSl)
assert.NoError(t, err)

err = p.Exec(context.Background(), r)
time.Sleep(1 * time.Second)
assert.Equal(t, 2, testSl.cur)
}

func TestProcessor_Retries(t *testing.T) {
t.Parallel()
oc := MyOverallContext{}
Expand Down

0 comments on commit ec5cc41

Please sign in to comment.