Skip to content

Commit

Permalink
Merge pull request #5 from thesilentg/Status
Browse files Browse the repository at this point in the history
Send initial status update
  • Loading branch information
gaffo authored Jul 26, 2024
2 parents 8e5b993 + 135677e commit f3f6b4b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
33 changes: 19 additions & 14 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s stateStorage[AC, OC, JC]) getStatusCounts() []StatusCount {
type Processor[AC any, OC any, JC any] struct {
appContext AC
serializer Serializer[OC, JC]
stateThing stateStorage[AC, OC, JC]
stateStorage stateStorage[AC, OC, JC]
statusListener StatusListener
returnChan chan Return[JC]
wg sync.WaitGroup
Expand All @@ -229,12 +229,12 @@ type Return[JC any] struct {
func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error) {
p := &Processor[AC, OC, JC]{
appContext: ac,
stateThing: newStateStorageFromStates(states),
stateStorage: newStateStorageFromStates(states),
serializer: serializer,
statusListener: statusListener,
}

if err := p.stateThing.validate(); err != nil {
if err := p.stateStorage.validate(); err != nil {
return nil, err
}

Expand All @@ -257,13 +257,18 @@ func (p *Processor[AC, OC, JC]) init() {
func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error {
p.init()

if p.stateThing.allJobsAreTerminal(r) {
if p.stateStorage.allJobsAreTerminal(r) {
// Send one status update so that if there are listeners they can render the correct values
for _, job := range r.Jobs {
p.stateStorage.completeJob(job)
}
p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts())
slog.Info("AllJobsTerminal")
return nil
}

// create the workers
for _, s := range p.stateThing.states {
for _, s := range p.stateStorage.states {
// Terminal states don't need to recieve jobs, they're just done
if s.Terminal {
continue
Expand All @@ -289,7 +294,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg

// Enqueue the jobs to start
for _, job := range r.Jobs {
p.stateThing.processJob(job)
p.stateStorage.processJob(job)
}

// Send the initial status update with the state of all the jobs
Expand All @@ -301,11 +306,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg
return
case completedJob := <-p.returnChan:
// If the prior state of the completed job was at capacity, we now have space for one more
p.stateThing.runNextWaitingJob(completedJob.PriorState)
p.stateStorage.runNextWaitingJob(completedJob.PriorState)

// Update the run with the new state
r.UpdateJob(completedJob.Job)
p.stateThing.processJob(completedJob.Job)
p.stateStorage.processJob(completedJob.Job)

// Start any of the new jobs that need kicking
for idx, kickRequest := range completedJob.KickRequests {
Expand All @@ -316,7 +321,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg
StateErrors: map[string][]string{},
}
r.UpdateJob(job)
p.stateThing.processJob(job)
p.stateStorage.processJob(job)
}

if err := p.serializer.Serialize(*r); err != nil {
Expand All @@ -325,21 +330,21 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg

p.updateStatus()

if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() {
if p.stateStorage.allJobsAreTerminal(r) && !p.stateStorage.hasExecutingJobs() {
return
}
}
}
}

func (p *Processor[AC, OC, JC]) updateStatus() {
p.statusListener.StatusUpdate(p.stateThing.getStatusCounts())
p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts())
}

func (p *Processor[AC, OC, JC]) shutdown() {
// close all of the channels
for _, state := range p.stateThing.states {
p.stateThing.closeJobChannelForState(state.TriggerState)
for _, state := range p.stateStorage.states {
p.stateStorage.closeJobChannelForState(state.TriggerState)
}
// close ourselves down
close(p.returnChan)
Expand Down Expand Up @@ -402,7 +407,7 @@ func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC
ac: p.appContext,
oc: overallContext,
state: state,
jobChan: p.stateThing.getJobChannelForState(state.TriggerState),
jobChan: p.stateStorage.getJobChannelForState(state.TriggerState),
returnChan: p.returnChan,
i: i,
wg: wg,
Expand Down
33 changes: 31 additions & 2 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,50 @@ func TestProcessorAllTerminal(t *testing.T) {
Count: 0,
})
}
for i := 0; i < 5; i++ {
r.AddJobWithState(MyJobContext{
Count: 0,
}, STATE_DONE_TWO)
}
states := []State[MyAppContext, MyOverallContext, MyJobContext]{
State[MyAppContext, MyOverallContext, MyJobContext]{
{
TriggerState: STATE_DONE_TWO,
Terminal: true,
},
{
TriggerState: TRIGGER_STATE_NEW,
Terminal: true,
},
}

p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil)
testSl := testStatusListener{
t: t,
expectedStatuses: [][]StatusCount{
{
StatusCount{
State: STATE_DONE_TWO,
Completed: 5,
Terminal: true,
},
StatusCount{
State: TRIGGER_STATE_NEW,
Completed: 10,
Terminal: true,
},
},
},
}

p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, &testSl)
assert.NoError(t, err)

start := time.Now()
err = p.Exec(context.Background(), r)
delta := time.Since(start)
require.NoError(t, err)
assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel")
// Should have gotten an update
assert.Equal(t, 1, testSl.cur)
}

func TestProcessorTwoSequentialJobs(t *testing.T) {
Expand Down

0 comments on commit f3f6b4b

Please sign in to comment.