diff --git a/processor.go b/processor.go index f2f734e..9d0a6f6 100644 --- a/processor.go +++ b/processor.go @@ -223,7 +223,6 @@ type Return[JC any] struct { PriorState string Job Job[JC] KickRequests []KickRequest[JC] - Error error } func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error) { @@ -382,13 +381,20 @@ func (s *StateExec[AC, OC, JC]) Run() { s.state.RateLimit.Wait(s.ctx) slog.Info("LimiterAllowed", "worker", s.i, "state", s.state.TriggerState, "job", j.Id) } + priorState := j.State // Execute the job rtn := Return[JC]{ - PriorState: j.State, + PriorState: priorState, } slog.Info("Executing job", "job", j.Id, "state", s.state.TriggerState) - j.C, j.State, rtn.KickRequests, rtn.Error = s.state.Exec(s.ctx, s.ac, s.oc, j.C) - slog.Info("Execution complete", "job", j.Id, "state", s.state.TriggerState, "newState", j.State, "error", rtn.Error, "kickRequests", len(rtn.KickRequests)) + var err error + j.C, j.State, rtn.KickRequests, err = s.state.Exec(s.ctx, s.ac, s.oc, j.C) + if err != nil { + j.StateErrors[priorState] = append(j.StateErrors[priorState], err.Error()) + slog.Info("Execution complete", "job", j.Id, "state", s.state.TriggerState, "newState", j.State, "error", err, "kickRequests", len(rtn.KickRequests)) + } else { + slog.Info("Execution complete", "job", j.Id, "state", s.state.TriggerState, "newState", j.State, "kickRequests", len(rtn.KickRequests)) + } rtn.Job = j slog.Info("Returning job", "job", j.Id, "newState", j.State) diff --git a/processor_test.go b/processor_test.go index 7ce3564..10d9b7f 100644 --- a/processor_test.go +++ b/processor_test.go @@ -2,6 +2,7 @@ package jorb import ( "context" + "errors" "fmt" "io" "log" @@ -716,10 +717,14 @@ func TestProcessor_Serialization(t *testing.T) { State[MyAppContext, MyOverallContext, MyJobContext]{ TriggerState: TRIGGER_STATE_NEW, Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) { + if jc.Count == 1 { + return jc, STATE_DONE, nil, errors.New("errored again") + } + //log.Println("Processing New") jc.Count += 1 time.Sleep(time.Second) - return jc, STATE_DONE, nil, nil + return jc, TRIGGER_STATE_NEW, nil, errors.New("errored") }, Terminal: false, Concurrency: 10, @@ -738,10 +743,11 @@ func TestProcessor_Serialization(t *testing.T) { err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) - assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") + assert.Less(t, delta, time.Second*4, "Should take less than 4 seconds when run in parallel") for _, j := range r.Jobs { assert.Equal(t, 1, j.C.Count, "Job Count should be 1") + assert.Equal(t, map[string][]string{TRIGGER_STATE_NEW: {"errored", "errored again"}}, j.StateErrors) } // Now reload the job