Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept of using TestStepInputOutput that hides using channels behind interface. #144

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/runner/base_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *BaseTestSuite) TearDownTest() {
}

func (s *BaseTestSuite) RegisterStateFullStep(
runFunction func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
runFunction func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters,
resumeState json.RawMessage) (json.RawMessage, error),
validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error) error {
Expand Down
14 changes: 7 additions & 7 deletions pkg/runner/job_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (s *JobRunnerSuite) TestSimpleJobStartFinish() {
var resultTargets []*target.Target

require.NoError(s.T(), s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error {
return teststeps.ForEachTarget(stateFullStepName, ctx, io, func(ctx xcontext.Context, target *target.Target) error {
assert.NotNil(s.T(), target)
mu.Lock()
defer mu.Unlock()
Expand Down Expand Up @@ -125,9 +125,9 @@ func (s *JobRunnerSuite) TestJobWithTestRetry() {
var callsCount int

require.NoError(s.T(), s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error {
return teststeps.ForEachTarget(stateFullStepName, ctx, io, func(ctx xcontext.Context, target *target.Target) error {
assert.NotNil(s.T(), target)
mu.Lock()
defer mu.Unlock()
Expand Down Expand Up @@ -456,7 +456,7 @@ func (s *JobRunnerSuite) TestResumeStateBadJobId() {
const stateFullStepName = "statefull"

type stateFullStep struct {
runFunction func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
runFunction func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error)
validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error
}
Expand All @@ -467,7 +467,7 @@ func (sfs *stateFullStep) Name() string {

func (sfs *stateFullStep) Run(
ctx xcontext.Context,
ch test.TestStepChannels,
io test.TestStepInputOutput,
ev testevent.Emitter,
stepsVars test.StepsVariables,
params test.TestStepParameters,
Expand All @@ -476,7 +476,7 @@ func (sfs *stateFullStep) Run(
if sfs.runFunction == nil {
return nil, fmt.Errorf("stateFullStep run is not initialised")
}
return sfs.runFunction(ctx, ch, ev, stepsVars, params, resumeState)
return sfs.runFunction(ctx, io, ev, stepsVars, params, resumeState)
}

func (sfs *stateFullStep) ValidateParameters(ctx xcontext.Context, params test.TestStepParameters) error {
Expand Down
76 changes: 33 additions & 43 deletions pkg/runner/step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type StepResult struct {
type StepRunner struct {
mu sync.Mutex

input chan *target.Target
targetsCh chan targetInput
inputWg sync.WaitGroup
activeTargets map[string]*stepTargetInfo

Expand Down Expand Up @@ -64,22 +64,13 @@ func (str *resultNotifier) postResult(err error) {
}

type stepTargetInfo struct {
targetInEmitted bool
result *resultNotifier
}

func (sti *stepTargetInfo) acquireTargetInEmission() bool {
if sti.targetInEmitted {
return false
}
sti.targetInEmitted = true
return true
result *resultNotifier
}

// NewStepRunner creates a new StepRunner object
func NewStepRunner() *StepRunner {
return &StepRunner{
input: make(chan *target.Target),
targetsCh: make(chan targetInput),
activeTargets: make(map[string]*stepTargetInfo),
notifyStopped: newResultNotifier(),
stopped: make(chan struct{}),
Expand All @@ -105,8 +96,7 @@ func (sr *StepRunner) Run(
var resumedTargetsResults []ChanNotifier
for _, resumeTarget := range resumeStateTargets {
targetInfo := &stepTargetInfo{
targetInEmitted: true,
result: newResultNotifier(),
result: newResultNotifier(),
}
sr.activeTargets[resumeTarget.ID] = targetInfo
resumedTargetsResults = append(resumedTargetsResults, targetInfo.result)
Expand All @@ -131,9 +121,23 @@ func (sr *StepRunner) Run(
}

stepOut := make(chan test.TestStepResult)
stepIO := newTestStepInputOutput(sr.targetsCh, func(_ctx xcontext.Context, tgt target.Target, err error) error {
var resultErr error
select {
case stepOut <- test.TestStepResult{Target: &tgt, Err: err}:
return nil
case <-_ctx.Done():
resultErr = _ctx.Err()
case <-ctx.Done():
resultErr = ctx.Err()
}
ctx.Debugf("canceled while reporting target '%s' result: %v", tgt.ID, err)
return resultErr
})

go func() {
defer finish()
sr.runningLoop(ctx, sr.input, stepOut, bundle, stepsVariables, ev, resumeState)
sr.runningLoop(ctx, stepIO, stepOut, bundle, stepsVariables, ev, resumeState)
ctx.Debugf("Running loop finished")
}()

Expand Down Expand Up @@ -169,6 +173,12 @@ func (sr *StepRunner) addTarget(
return nil, fmt.Errorf("step runner was stopped")
}

onTargetConsumed := func() {
if err := emitEvent(ctx, ev, target.EventTargetIn, tgt, nil); err != nil {
sr.setErrLocked(ctx, fmt.Errorf("failed to report target injection: %w", err))
}
}

targetInfo, err := func() (*stepTargetInfo, error) {
targetInfo, err := func() (*stepTargetInfo, error) {
sr.mu.Lock()
Expand All @@ -190,17 +200,7 @@ func (sr *StepRunner) addTarget(

defer sr.inputWg.Done()
select {
case sr.input <- tgt:
// we should always emit TargetIn before TargetOut or TargetError
// we have a race condition that outputLoop may receive result for this target first
// in that case we will emit TargetIn in outputLoop and should not emit it here
sr.mu.Lock()
if targetInfo.acquireTargetInEmission() {
if err := emitEvent(ctx, ev, target.EventTargetIn, tgt, nil); err != nil {
sr.setErrLocked(ctx, fmt.Errorf("failed to report target injection: %w", err))
}
}
sr.mu.Unlock()
case sr.targetsCh <- targetInput{tgt: *tgt, onConsumed: onTargetConsumed}:
return targetInfo, nil
case <-stopped:
return nil, fmt.Errorf("step runner was stopped")
Expand Down Expand Up @@ -273,7 +273,7 @@ func (sr *StepRunner) Stop() {
}

sr.inputWg.Wait()
close(sr.input)
close(sr.targetsCh)
}

func (sr *StepRunner) outputLoop(
Expand Down Expand Up @@ -314,37 +314,28 @@ func (sr *StepRunner) outputLoop(
}
ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID)

shouldEmitTargetIn, targetResult, err := func() (bool, *resultNotifier, error) {
targetResult, err := func() (*resultNotifier, error) {
sr.mu.Lock()
defer sr.mu.Unlock()

info, found := sr.activeTargets[res.Target.ID]
if !found {
return false, nil, &cerrors.ErrTestStepReturnedUnexpectedResult{
return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{
StepName: testStepLabel,
Target: res.Target.ID,
}
}
if info == nil {
return false, nil, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}
return nil, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}
}
sr.activeTargets[res.Target.ID] = nil

shouldEmitTargetIn := info.acquireTargetInEmission()
return shouldEmitTargetIn, info.result, nil
return info.result, nil
}()
if err != nil {
sr.setErr(ctx, err)
return
}

if shouldEmitTargetIn {
if err := emitEvent(ctx, ev, target.EventTargetIn, res.Target, nil); err != nil {
sr.setErr(ctx, fmt.Errorf("failed to report target injection: %w", err))
return
}
}

if res.Err == nil {
err = emitEvent(ctx, ev, target.EventTargetOut, res.Target, nil)
} else {
Expand All @@ -365,7 +356,7 @@ func (sr *StepRunner) outputLoop(

func (sr *StepRunner) runningLoop(
ctx xcontext.Context,
stepIn <-chan *target.Target,
stepIO *testStepInputOutput,
stepOut chan test.TestStepResult,
bundle test.TestStepBundle,
stepsVariables test.StepsVariables,
Expand Down Expand Up @@ -397,8 +388,7 @@ func (sr *StepRunner) runningLoop(
}
}()

inChannels := test.TestStepChannels{In: stepIn, Out: stepOut}
return bundle.TestStep.Run(ctx, inChannels, ev, stepsVariables, bundle.Parameters, resumeState)
return bundle.TestStep.Run(ctx, stepIO, ev, stepsVariables, bundle.Parameters, resumeState)
}()
ctx.Debugf("TestStep finished '%v', rs: '%s'", err, string(resultResumeState))

Expand Down
23 changes: 13 additions & 10 deletions pkg/runner/step_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (s *StepRunnerSuite) TestRunningStep() {
var obtainedResumeState json.RawMessage

err := s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
obtainedResumeState = resumeState
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error {
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, io, func(ctx xcontext.Context, target *target.Target) error {
require.NotNil(s.T(), target)

mu.Lock()
Expand Down Expand Up @@ -129,9 +129,9 @@ func (s *StepRunnerSuite) TestAddSameTargetSequentiallyTimes() {
const inputTargetID = "input_target_id"

err := s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error {
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, io, func(ctx xcontext.Context, target *target.Target) error {
require.NotNil(s.T(), target)
require.Equal(s.T(), inputTargetID, target.ID)
return nil
Expand Down Expand Up @@ -184,11 +184,14 @@ func (s *StepRunnerSuite) TestAddTargetReturnsErrorIfFailsToInput() {
}
}()
err := s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, io test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
<-hangCh
for range ch.In {
require.Fail(s.T(), "unexpected input")
for {
tgt, err := io.Get(ctx)
require.NoError(s.T(), err)
require.Nil(s.T(), tgt, "unexpected input")
break
}
return nil, nil
},
Expand Down Expand Up @@ -244,7 +247,7 @@ func (s *StepRunnerSuite) TestStepPanics() {
defer cancel()

err := s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, ch test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
panic("panic")
},
Expand Down Expand Up @@ -296,9 +299,9 @@ func (s *StepRunnerSuite) TestCornerCases() {
defer cancel()

err := s.RegisterStateFullStep(
func(ctx xcontext.Context, ch test.TestStepChannels, ev testevent.Emitter,
func(ctx xcontext.Context, in test.TestStepInputOutput, ev testevent.Emitter,
stepsVars test.StepsVariables, params test.TestStepParameters, resumeState json.RawMessage) (json.RawMessage, error) {
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error {
_, err := teststeps.ForEachTarget(stateFullStepName, ctx, in, func(ctx xcontext.Context, target *target.Target) error {
return fmt.Errorf("should not be called")
})
return nil, err
Expand Down
29 changes: 2 additions & 27 deletions pkg/runner/test_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/linuxboot/contest/tests/common"
"github.com/linuxboot/contest/tests/common/goroutine_leak_check"
"github.com/linuxboot/contest/tests/plugins/teststeps/badtargets"
"github.com/linuxboot/contest/tests/plugins/teststeps/channels"
"github.com/linuxboot/contest/tests/plugins/teststeps/hanging"
"github.com/linuxboot/contest/tests/plugins/teststeps/noreturn"
"github.com/linuxboot/contest/tests/plugins/teststeps/panicstep"
Expand Down Expand Up @@ -86,7 +85,6 @@ func (s *TestRunnerSuite) SetupTest() {
events []event.Name
}{
{badtargets.Name, badtargets.New, badtargets.Events},
{channels.Name, channels.New, channels.Events},
{hanging.Name, hanging.New, hanging.Events},
{noreturn.Name, noreturn.New, noreturn.Events},
{panicstep.Name, panicstep.New, panicstep.Events},
Expand Down Expand Up @@ -332,29 +330,6 @@ func (s *TestRunnerSuite) TestStepPanics() {
require.Contains(s.T(), s.MemoryStorage.GetStepEvents(ctx, testName, "Step1"), "step Step1 paniced")
}

// A misbehaving step that closes its output channel.
func (s *TestRunnerSuite) TestStepClosesChannels() {
ctx, cancel := logrusctx.NewContext(logger.LevelDebug)
defer cancel()

tr := newTestRunner()
_, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second,
[]*target.Target{tgt("T1")},
[]test.TestStepBundle{
s.NewStep(ctx, "Step1", channels.Name, nil),
},
)
require.Error(s.T(), err)
require.IsType(s.T(), &cerrors.ErrTestStepClosedChannels{}, err)
require.Equal(s.T(), `
{[1 1 SimpleTest 0 Step1][Target{ID: "T1"} TargetIn]}
{[1 1 SimpleTest 0 Step1][Target{ID: "T1"} TargetOut]}
`, s.MemoryStorage.GetTargetEvents(ctx, testName, "T1"))
require.Equal(s.T(), `
{[1 1 SimpleTest 0 Step1][(*Target)(nil) TestError &"\"test step Step1 closed output channels (api violation)\""]}
`, s.MemoryStorage.GetStepEvents(ctx, testName, "Step1"))
}

// A misbehaving step that yields a result for a target that does not exist.
func (s *TestRunnerSuite) TestStepYieldsResultForNonexistentTarget() {
ctx, cancel := logrusctx.NewContext(logger.LevelDebug)
Expand Down Expand Up @@ -480,13 +455,13 @@ func (s *TestRunnerSuite) TestVariables() {
)
require.NoError(s.T(), s.RegisterStateFullStep(
func(ctx xcontext.Context,
ch test.TestStepChannels,
io test.TestStepInputOutput,
ev testevent.Emitter,
stepsVars test.StepsVariables,
params test.TestStepParameters,
resumeState json.RawMessage,
) (json.RawMessage, error) {
_, err := teststeps.ForEachTargetWithResume(ctx, ch, resumeState, 1,
_, err := teststeps.ForEachTargetWithResume(ctx, io, resumeState, 1,
func(ctx xcontext.Context, target *teststeps.TargetWithData) error {
require.NoError(s.T(), stepsVars.Add(target.Target.ID, "target_id", target.Target.ID))

Expand Down
Loading