diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index d0c4174..dfd2af3 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -20,7 +20,7 @@ jobs: run: go build ./... - name: Test - run: go test ./... + run: go test ./... -race - name: Lint uses: golangci/golangci-lint-action@v6 diff --git a/cmd/main.go b/cmd/main.go index ffc9ad6..f6685ea 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,8 +1,11 @@ package main import ( + "context" "fmt" "os" + "os/signal" + "syscall" "github.com/grafana/flagger-k6-webhook/pkg" "github.com/grafana/flagger-k6-webhook/pkg/k6" @@ -14,13 +17,15 @@ import ( ) const ( - defaultPort = 8000 + defaultPort = 8000 + defaultMaxConcurrentTests = 1000 - flagCloudToken = "cloud-token" - flagLogLevel = "log-level" - flagListenPort = "listen-port" - flagSlackToken = "slack-token" - flagKubernetesClient = "kubernetes-client" + flagCloudToken = "cloud-token" + flagLogLevel = "log-level" + flagListenPort = "listen-port" + flagSlackToken = "slack-token" + flagKubernetesClient = "kubernetes-client" + flagMaxConcurrentTests = "max-concurrent-tests" kubernetesClientNone = "none" kubernetesClientInCluster = "in-cluster" @@ -33,6 +38,8 @@ func main() { } func run(args []string) error { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() app := cli.NewApp() app.Name = "flagger-k6-webhook" app.Usage = "Launches k6 load testing from a flagger webhook" @@ -63,12 +70,18 @@ func run(args []string) error { Value: kubernetesClientNone, Usage: fmt.Sprintf("Kubernetes client to use: '%s' or '%s'", kubernetesClientInCluster, kubernetesClientNone), }, + &cli.IntFlag{ + Name: flagMaxConcurrentTests, + EnvVars: []string{"MAX_CONCURRENT_TESTS"}, + Value: defaultMaxConcurrentTests, + }, } - return app.Run(args) + return app.RunContext(ctx, args) } func launchServer(c *cli.Context) error { + ctx := c.Context logLevel, err := log.ParseLevel(c.String(flagLogLevel)) if err != nil { return err @@ -95,5 +108,5 @@ func launchServer(c *cli.Context) error { log.Info("not creating a kubernetes client") } - return pkg.Listen(client, kubeClient, slackClient, c.Int(flagListenPort)) + return pkg.Listen(ctx, client, kubeClient, slackClient, c.Int(flagListenPort), c.Int(flagMaxConcurrentTests)) } diff --git a/pkg/handlers/launch.go b/pkg/handlers/launch.go index 05e88b1..29f1105 100644 --- a/pkg/handlers/launch.go +++ b/pkg/handlers/launch.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/flagger-k6-webhook/pkg/k6" "github.com/grafana/flagger-k6-webhook/pkg/slack" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -24,6 +25,8 @@ const ( emojiSuccess = ":large_green_circle:" emojiWarning = ":warning:" emojiFailure = ":red_circle:" + + metricTestDurationName = "launch_test_duration" ) // https://regex101.com/r/OZwd8Y/1 @@ -144,23 +147,132 @@ type launchHandler struct { lastFailureTime map[string]time.Time lastFailureTimeMutex sync.Mutex + processToWaitFor chan k6.TestRun + waitForProcessesDone chan struct{} + ctx context.Context + + availableTestRuns chan struct{} + + metricsRegistry *prometheus.Registry + metricTestDuration *prometheus.SummaryVec + // mockables sleep func(time.Duration) } +type LaunchHandler interface { + http.Handler + Wait() +} + // NewLaunchHandler returns an handler that launches a k6 load test. -func NewLaunchHandler(client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client) (http.Handler, error) { +func NewLaunchHandler(ctx context.Context, client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client, maxConcurrentTests int) (LaunchHandler, error) { if slackClient == nil { return nil, errors.New("unexpected state. Slack client is nil") } - return &launchHandler{ - client: client, - kubeClient: kubeClient, - slackClient: slackClient, - lastFailureTime: make(map[string]time.Time), - sleep: time.Sleep, - }, nil + h := &launchHandler{ + client: client, + kubeClient: kubeClient, + slackClient: slackClient, + lastFailureTime: make(map[string]time.Time), + sleep: time.Sleep, + processToWaitFor: make(chan k6.TestRun, maxConcurrentTests), + waitForProcessesDone: make(chan struct{}, 1), + ctx: ctx, + } + h.availableTestRuns = make(chan struct{}, maxConcurrentTests) + for range maxConcurrentTests { + h.availableTestRuns <- struct{}{} + } + + metricMaxConcurrentTests := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "launch_max_concurrent_tests", + Help: "The maximum number of concurrent tests", + }) + metricMaxConcurrentTests.Set(float64(maxConcurrentTests)) + if err := prometheus.Register(metricMaxConcurrentTests); err != nil { + log.Warnf("Failed to register new metric: %s", err.Error()) + } + + metricAvailableConcurrentTests := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "launch_available_concurrent_tests", + Help: "The current number of available concurrent tests. If 0 then new requests will be rejected", + }, func() float64 { + return float64(len(h.availableTestRuns)) + }) + if err := prometheus.Register(metricAvailableConcurrentTests); err != nil { + log.Warnf("Failed to register new metric: %s", err.Error()) + } + + // metricTestDuration is an internal metric that we use to calculate the + // expected wait time in case the maximum number of concurrent tests is + // reached: + metricTestDuration := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: metricTestDurationName, + Help: "Durations of the executed k6 test run in seconds", + Objectives: map[float64]float64{0.5: float64(30)}, + }, []string{"exit_code"}) + h.metricTestDuration = metricTestDuration + h.metricsRegistry = prometheus.NewRegistry() + _ = h.metricsRegistry.Register(h.metricTestDuration) + + go h.waitForProcesses(ctx) + return h, nil +} + +// Wait is blocking until all subprocesses have terminated. This should only be +// used if the passed context can (and is) canceled. +func (h *launchHandler) Wait() { + <-h.waitForProcessesDone + log.Debug("launch handler finished") +} + +// waitForProcesses handles incoming processes and waits for them to complete. +// This way we can avoid k6 jobs where we do not need the results to become +// zombie processes. +func (h *launchHandler) waitForProcesses(ctx context.Context) { + defer func() { + h.waitForProcessesDone <- struct{}{} + }() + wg := sync.WaitGroup{} +loop: + for { + select { + case cmd := <-h.processToWaitFor: + wg.Add(1) + go func() { + h.waitForProcess(cmd) + wg.Done() + }() + case <-ctx.Done(): + break loop + } + } + wg.Wait() +} + +func (h *launchHandler) waitForProcess(cmd k6.TestRun) { + if cmd == nil { + log.Warnf("nil as testrun passed") + return + } + pid := cmd.PID() + log.WithField("pid", pid).Debug("waiting for testrun to exit") + _ = cmd.Wait() + h.trackExecutionDuration(cmd) + log.WithField("pid", pid).Debugf("testrun exited") + + h.availableTestRuns <- struct{}{} +} + +// registerProcessCleanup adds a handler to the process so that it will +// eventually be closed and its resources returned. +// +// Note that this method can actually block which will, in turn, cause the +// calling HTTP handler to be blocked. +func (h *launchHandler) registerProcessCleanup(cmd k6.TestRun) { + h.processToWaitFor <- cmd } func (h *launchHandler) getLastFailureTime(payload *launchPayload) (time.Time, bool) { @@ -213,8 +325,36 @@ func (h *launchHandler) buildEnvVars(payload *launchPayload) (map[string]string, return envVars, nil } +func (h *launchHandler) getWaitTime() int64 { + families, err := h.metricsRegistry.Gather() + if err != nil { + return 60 + } + for _, family := range families { + if family.GetName() == metricTestDurationName { + for _, metric := range family.GetMetric() { + for _, quantile := range metric.GetSummary().GetQuantile() { + if quantile.GetQuantile() == 0.5 { + result := quantile.GetValue() + return int64(result) + } + } + } + } + } + return 60 +} + func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { cmdLog := createLogEntry(req) + select { + case <-h.availableTestRuns: + default: + cmdLog.Warn("Maximum concurrent test runs reached. Rejecting request.") + resp.Header().Set("Retry-After", fmt.Sprintf("%d", h.getWaitTime())) + http.Error(resp, "Maximum concurrent test runs reached", http.StatusTooManyRequests) + return + } logIfError := func(err error) { if err != nil { cmdLog.Error(err) @@ -253,8 +393,18 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { return } + ctx, cancelCtx := context.WithCancel(context.Background()) + defer func() { + if payload.Metadata.WaitForResults { + cancelCtx() + } + }() + go func() { + h.propagateCancel(req, payload, cancelCtx) + }() + cmdLog.Info("launching k6 test") - cmd, err := h.client.Start(payload.Metadata.Script, payload.Metadata.UploadToCloud, envVars, &buf) + cmd, err := h.client.Start(ctx, payload.Metadata.Script, payload.Metadata.UploadToCloud, envVars, &buf) if err != nil { fail(fmt.Sprintf("error while launching the test: %v", err)) return @@ -268,6 +418,7 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { logIfError(err) logIfError(h.slackClient.AddFileToThreads(slackMessages, "k6-results.txt", buf.String())) fail(fmt.Sprintf("error while waiting for test to start: %v", waitErr)) + h.registerProcessCleanup(cmd) return } @@ -275,6 +426,7 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { url, err := getCloudURL(buf.String()) if err != nil { fail(err.Error()) + h.registerProcessCleanup(cmd) return } slackContext += fmt.Sprintf("\nCloud URL: <%s>", url) @@ -287,12 +439,17 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { if !payload.Metadata.WaitForResults { cmdLog.Infof("the load test for %s.%s was launched successfully!", payload.Name, payload.Namespace) + h.registerProcessCleanup(cmd) return } // Wait for the test to finish and write the output to slack + defer func() { + h.availableTestRuns <- struct{}{} + }() cmdLog.Info("waiting for the results") err = cmd.Wait() + h.trackExecutionDuration(cmd) logIfError(h.slackClient.AddFileToThreads(slackMessages, "k6-results.txt", buf.String())) // Load testing failed, log the output @@ -309,6 +466,29 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { cmdLog.Infof("the load test for %s.%s succeeded!", payload.Name, payload.Namespace) } +func (h *launchHandler) trackExecutionDuration(cmd k6.TestRun) { + if dur := cmd.ExecutionDuration(); dur != 0 { + h.metricTestDuration.With(prometheus.Labels{"exit_code": fmt.Sprintf("%d", cmd.ExitCode())}).Observe(float64(dur / time.Second)) + } +} + +func (h *launchHandler) propagateCancel(req *http.Request, payload *launchPayload, cancelCtx context.CancelFunc) { + if payload.Metadata.WaitForResults { + select { + case <-req.Context().Done(): + cancelCtx() + case <-h.ctx.Done(): + cancelCtx() + } + } else { + // If we are not waiting for the results then we should only cancel + // if the global context is done: + <-h.ctx.Done() + cancelCtx() + } + log.Info("canceling process") +} + func (h *launchHandler) waitForOutputPath(cmdLog *log.Entry, buf *bytes.Buffer) error { for i := 0; i < 10; i++ { if strings.Contains(buf.String(), "output:") { diff --git a/pkg/handlers/launch_test.go b/pkg/handlers/launch_test.go index b411bb9..e489d25 100644 --- a/pkg/handlers/launch_test.go +++ b/pkg/handlers/launch_test.go @@ -1,6 +1,7 @@ package handlers import ( + "context" "errors" "fmt" "io" @@ -8,6 +9,7 @@ import ( "net/http" "net/http/httptest" "os" + "os/exec" "strings" "testing" "time" @@ -15,6 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/grafana/flagger-k6-webhook/pkg/k6" "github.com/grafana/flagger-k6-webhook/pkg/mocks" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -75,14 +78,14 @@ func TestNewLaunchPayload(t *testing.T) { name: "set values", request: &http.Request{ Body: ioutil.NopCloser(strings.NewReader(`{ - "name": "test", - "namespace": "test", - "phase": "pre-rollout", + "name": "test", + "namespace": "test", + "phase": "pre-rollout", "metadata": { - "script": "my-script", - "upload_to_cloud": "true", - "wait_for_results": "false", - "slack_channels": "test,test2", + "script": "my-script", + "upload_to_cloud": "true", + "wait_for_results": "false", + "slack_channels": "test,test2", "min_failure_delay": "3m", "kubernetes_secrets": "{\"TEST_VAR\": \"secret/key\"}", "env_vars": "{\"TEST_VAR2\": \"value\"}" @@ -175,13 +178,15 @@ func TestLaunchAndWaitCloud(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) // Expected calls // * Start the run fullResults, resultParts := getTestOutputFromFile(t, test.k6OutputFile) var bufferWriter io.Writer - k6Client.EXPECT().Start("my-script", true, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", true, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { bufferWriter = outputWriter outputWriter.Write([]byte(resultParts[0])) return testRun, nil @@ -229,13 +234,15 @@ func TestLaunchAndWaitCloud(t *testing.T) { func TestSlackFailuresDontAbort(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) // Expected calls // * Start the run fullResults, resultParts := getTestOutput(t) var bufferWriter io.Writer - k6Client.EXPECT().Start("my-script", true, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", true, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { bufferWriter = outputWriter outputWriter.Write([]byte(resultParts[0])) return testRun, nil @@ -268,13 +275,15 @@ func TestSlackFailuresDontAbort(t *testing.T) { func TestLaunchAndWaitLocal(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) // Expected calls // * Start the run fullResults, resultParts := getTestOutput(t) var bufferWriter io.Writer - k6Client.EXPECT().Start("my-script", false, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", false, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { bufferWriter = outputWriter outputWriter.Write([]byte(resultParts[0])) return testRun, nil @@ -335,13 +344,15 @@ func TestLaunchAndWaitLocal(t *testing.T) { func TestLaunchAndWaitAndGetError(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) // Expected calls // * Start the run fullResults, resultParts := getTestOutput(t) var bufferWriter io.Writer - k6Client.EXPECT().Start("my-script", false, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", false, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { bufferWriter = outputWriter outputWriter.Write([]byte(resultParts[0])) return testRun, nil @@ -402,7 +413,15 @@ func TestLaunchAndWaitAndGetError(t *testing.T) { func TestLaunchNeverStarted(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) + + testRun.EXPECT().PID().Return(-1).AnyTimes() + testRun.EXPECT().Kill().Return(nil).AnyTimes() + testRun.EXPECT().Wait().Return(nil).AnyTimes() + testRun.EXPECT().Exited().Return(true).AnyTimes() + var sleepCalls []time.Duration sleepMock := func(d time.Duration) { sleepCalls = append(sleepCalls, d) @@ -411,7 +430,7 @@ func TestLaunchNeverStarted(t *testing.T) { // Expected calls // * Start the run (process fails and prints out an error) - k6Client.EXPECT().Start("my-script", false, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", false, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { outputWriter.Write([]byte("failed to run (k6 error)")) return testRun, nil }) @@ -446,12 +465,19 @@ func TestLaunchNeverStarted(t *testing.T) { func TestLaunchWithoutWaiting(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandler(t) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) + + testRun.EXPECT().PID().Return(-1).AnyTimes() + testRun.EXPECT().Kill().Return(nil).AnyTimes() + testRun.EXPECT().Wait().Return(nil).AnyTimes() + testRun.EXPECT().Exited().Return(true).AnyTimes() // Expected calls // * Start the run _, resultParts := getTestOutput(t) - k6Client.EXPECT().Start("my-script", false, nil, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", false, nil, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { outputWriter.Write([]byte(resultParts[0])) return testRun, nil }) @@ -478,7 +504,9 @@ func TestLaunchWithoutWaiting(t *testing.T) { func TestBadPayload(t *testing.T) { // Initialize controller - _, _, _, _, handler := setupHandler(t) + _, cancel, _, _, _, _, handler := setupHandler(t, 100) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) // Make request request := &http.Request{ @@ -573,16 +601,18 @@ func TestEnvVars(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // Initialize controller - _, k6Client, slackClient, testRun, handler := setupHandlerWithKubernetesObjects(t, tc.kubernetesObjects...) + _, cancel, _, k6Client, slackClient, testRun, handler := setupHandlerWithKubernetesObjects(t, 100, tc.kubernetesObjects...) if tc.nilKubeClient { handler.kubeClient = nil } + t.Cleanup(handler.Wait) + t.Cleanup(cancel) if tc.expectedCode == 200 { // Expected calls // * Start the run var bufferWriter io.Writer - k6Client.EXPECT().Start("my-script", false, tc.expectedEnvVars, gomock.Any()).DoAndReturn(func(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + k6Client.EXPECT().Start(gomock.Any(), "my-script", false, tc.expectedEnvVars, gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { bufferWriter = outputWriter outputWriter.Write([]byte(resultParts[0])) return testRun, nil @@ -605,11 +635,11 @@ func TestEnvVars(t *testing.T) { // Make request request := &http.Request{ Body: ioutil.NopCloser(strings.NewReader(fmt.Sprintf(`{ - "name": "test-name", - "namespace": "test-space", - "phase": "pre-rollout", + "name": "test-name", + "namespace": "test-space", + "phase": "pre-rollout", "metadata": { - "script": "my-script", + "script": "my-script", "kubernetes_secrets": "%s", "env_vars": "%s" } @@ -626,11 +656,111 @@ func TestEnvVars(t *testing.T) { } -func setupHandler(t *testing.T) (*gomock.Controller, *mocks.MockK6Client, *mocks.MockSlackClient, *mocks.MockK6TestRun, *launchHandler) { - return setupHandlerWithKubernetesObjects(t) +func TestProcessHandler(t *testing.T) { + t.Run("waits on processes", func(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + _, cancel, ctrl, _, _, _, handler := setupHandler(t, 100) + // Now let's produce a handful of test runs and check that they are waited + // on + for range 10 { + <-handler.availableTestRuns + tr := mocks.NewMockK6TestRun(ctrl) + tr.EXPECT().PID().Return(-1).AnyTimes() + tr.EXPECT().Kill().Return(nil).AnyTimes() + // Wait is called exactly once by the process handler + tr.EXPECT().Wait().Return(nil).Times(1) + tr.EXPECT().Exited().Return(true).AnyTimes() + tr.EXPECT().ExitCode().Return(0).AnyTimes() + tr.EXPECT().ExecutionDuration().Return(time.Minute).AnyTimes() + handler.registerProcessCleanup(tr) + } + time.Sleep(time.Second * 2) + t.Log("Cancelling handler") + cancel() + handler.Wait() + }) + + t.Run("kills process if handler is closed", func(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + ctx, cancelCtx, _, _, _, _, handler := setupHandler(t, 100) + cmd := exec.CommandContext(ctx, "sleep", "10") + require.NoError(t, cmd.Start()) + <-handler.availableTestRuns + handler.registerProcessCleanup(&k6.DefaultTestRun{Cmd: cmd}) + + // Also register a process that will be done by the time we are closing + // the handler: + cmdSuccess := exec.Command("true") + require.NoError(t, cmdSuccess.Start()) + <-handler.availableTestRuns + handler.registerProcessCleanup(&k6.DefaultTestRun{Cmd: cmdSuccess}) + + // Yield so that the handler can actually pick up the process: + time.Sleep(time.Second) + + cancelCtx() + handler.Wait() + require.False(t, cmd.ProcessState.Success()) + require.True(t, cmdSuccess.ProcessState.Success()) + }) } -func setupHandlerWithKubernetesObjects(t *testing.T, expectedKubernetesObjects ...runtime.Object) (*gomock.Controller, *mocks.MockK6Client, *mocks.MockSlackClient, *mocks.MockK6TestRun, *launchHandler) { +// If we get too many concurrent test requests, a 429 should be returned by the +// ServeHTTP method. +func Test429OnExcessiveRequests(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + // Initialize controller + _, cancel, ctrl, k6Client, slackClient, testRun1, handler := setupHandler(t, 1) + t.Cleanup(handler.Wait) + t.Cleanup(cancel) + + slackClient.EXPECT().SendMessages(nil, gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + slackClient.EXPECT().AddFileToThreads(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + _, resultParts := getTestOutputFromFile(t, "testdata/k6-output.txt") + + // The first request should go through but the second should be rejected + // with a 429 response as it would exceed the concurrent testrun limit: + request1 := &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"name": "hello", "namespace": "default", "phase": "somephase", "metadata": {"upload_to_cloud": "false", "wait_for_results": "false", "script": "import { sleep } from 'k6'; export default function() { sleep(10) }"}}`)), + } + + var bufferWriter1 io.Writer + k6Client.EXPECT().Start(gomock.Any(), gomock.Any(), false, gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (k6.TestRun, error) { + bufferWriter1 = outputWriter + outputWriter.Write([]byte(resultParts[0])) + return testRun1, nil + }).AnyTimes() + testRun1.EXPECT().PID().Return(-1).AnyTimes() + testRun1.EXPECT().Wait().DoAndReturn(func() error { + time.Sleep(time.Second * 2) + bufferWriter1.Write([]byte("running" + resultParts[1])) + return nil + }).AnyTimes() + rr1 := httptest.NewRecorder() + handler.ServeHTTP(rr1, request1) + require.Equal(t, 200, rr1.Code) + + testRun2 := mocks.NewMockK6TestRun(ctrl) + request2 := &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"name": "hello", "namespace": "default", "phase": "somephase", "metadata": {"upload_to_cloud": "false", "wait_for_results": "false", "script": "import { sleep } from 'k6'; export default function() { sleep(10) }"}}`)), + } + + // All these mock calls should actually never happen as the request is rejected right away + k6Client.EXPECT().Start(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + testRun2.EXPECT().PID().Return(-1).Times(0) + testRun2.EXPECT().Wait().Times(0) + + rr2 := httptest.NewRecorder() + handler.ServeHTTP(rr2, request2) + require.Equal(t, 429, rr2.Code) +} + +func setupHandler(t *testing.T, maxConcurrentTests int) (context.Context, context.CancelFunc, *gomock.Controller, *mocks.MockK6Client, *mocks.MockSlackClient, *mocks.MockK6TestRun, *launchHandler) { + return setupHandlerWithKubernetesObjects(t, maxConcurrentTests) +} + +func setupHandlerWithKubernetesObjects(t *testing.T, maxConcurrentTests int, expectedKubernetesObjects ...runtime.Object) (context.Context, context.CancelFunc, *gomock.Controller, *mocks.MockK6Client, *mocks.MockSlackClient, *mocks.MockK6TestRun, *launchHandler) { t.Helper() mockCtrl := gomock.NewController(t) @@ -638,11 +768,18 @@ func setupHandlerWithKubernetesObjects(t *testing.T, expectedKubernetesObjects . kubeClient := fake.NewSimpleClientset(expectedKubernetesObjects...) slackClient := mocks.NewMockSlackClient(mockCtrl) testRun := mocks.NewMockK6TestRun(mockCtrl) - handler, err := NewLaunchHandler(k6Client, kubeClient, slackClient) + + // For now we do not test the ExecutionDuration and so can set a default + // value here: + testRun.EXPECT().ExecutionDuration().Return(time.Minute).AnyTimes() + testRun.EXPECT().ExitCode().Return(0).AnyTimes() + + ctx, cancel := context.WithCancel(context.Background()) + handler, err := NewLaunchHandler(ctx, k6Client, kubeClient, slackClient, maxConcurrentTests) handler.(*launchHandler).sleep = func(d time.Duration) {} require.NoError(t, err) - return mockCtrl, k6Client, slackClient, testRun, handler.(*launchHandler) + return ctx, cancel, mockCtrl, k6Client, slackClient, testRun, handler.(*launchHandler) } func getTestOutput(t *testing.T) ([]byte, []string) { diff --git a/pkg/k6/cmd.go b/pkg/k6/cmd.go index 83fcc97..f1ca85b 100644 --- a/pkg/k6/cmd.go +++ b/pkg/k6/cmd.go @@ -1,11 +1,13 @@ package k6 import ( + "context" "fmt" "io" "os" "os/exec" "strings" + "time" log "github.com/sirupsen/logrus" ) @@ -19,7 +21,60 @@ func NewLocalRunnerClient(token string) (Client, error) { return client, nil } -func (c *LocalRunnerClient) Start(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (TestRun, error) { +type DefaultTestRun struct { + *exec.Cmd + startedAt time.Time + exitedAt time.Time +} + +func (tr *DefaultTestRun) Start() error { + tr.startedAt = time.Now() + return tr.Cmd.Start() +} + +func (tr *DefaultTestRun) Wait() error { + defer func() { + tr.exitedAt = time.Now() + }() + return tr.Cmd.Wait() +} + +func (tr *DefaultTestRun) ExitCode() int { + if tr.Cmd != nil && tr.Cmd.ProcessState != nil { + return tr.Cmd.ProcessState.ExitCode() + } + return -1 +} + +func (tr *DefaultTestRun) ExecutionDuration() time.Duration { + if tr.startedAt.IsZero() || tr.exitedAt.IsZero() { + return time.Duration(0) + } + return tr.exitedAt.Sub(tr.startedAt) +} + +func (tr *DefaultTestRun) Kill() error { + if tr.Cmd != nil && tr.Cmd.Process != nil { + return tr.Cmd.Process.Kill() + } + return nil +} + +func (tr *DefaultTestRun) PID() int { + if tr.Cmd != nil && tr.Cmd.Process != nil { + return tr.Cmd.Process.Pid + } + return -1 +} + +func (tr *DefaultTestRun) Exited() bool { + if tr.Cmd != nil && tr.Cmd.ProcessState != nil { + return tr.Cmd.ProcessState.Exited() + } + return false +} + +func (c *LocalRunnerClient) Start(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (TestRun, error) { tempFile, err := os.CreateTemp("", "k6-script") if err != nil { return nil, fmt.Errorf("could not create a tempfile for the script: %w", err) @@ -34,7 +89,7 @@ func (c *LocalRunnerClient) Start(scriptContent string, upload bool, envVars map } args = append(args, tempFile.Name()) - cmd := c.cmd("k6", args...) + cmd := c.cmd(ctx, "k6", args...) cmd.Stdout = outputWriter cmd.Stderr = outputWriter @@ -44,11 +99,12 @@ func (c *LocalRunnerClient) Start(scriptContent string, upload bool, envVars map } log.Debugf("launching 'k6 %s'", strings.Join(args, " ")) - return cmd, cmd.Start() + run := &DefaultTestRun{Cmd: cmd} + return run, run.Start() } -func (c *LocalRunnerClient) cmd(name string, arg ...string) *exec.Cmd { - cmd := exec.Command(name, arg...) +func (c *LocalRunnerClient) cmd(ctx context.Context, name string, arg ...string) *exec.Cmd { + cmd := exec.CommandContext(ctx, name, arg...) cmd.Env = append(os.Environ(), "K6_CLOUD_TOKEN="+c.token) return cmd diff --git a/pkg/k6/interface.go b/pkg/k6/interface.go index a8ffb3b..a2a30c8 100644 --- a/pkg/k6/interface.go +++ b/pkg/k6/interface.go @@ -3,13 +3,20 @@ package k6 //go:generate mockgen -destination=../mocks/mock_k6_client.go -package=mocks -mock_names=Client=MockK6Client,TestRun=MockK6TestRun github.com/grafana/flagger-k6-webhook/pkg/k6 Client,TestRun import ( + "context" "io" + "time" ) type Client interface { - Start(scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (TestRun, error) + Start(ctx context.Context, scriptContent string, upload bool, envVars map[string]string, outputWriter io.Writer) (TestRun, error) } type TestRun interface { Wait() error + Kill() error + PID() int + Exited() bool + ExitCode() int + ExecutionDuration() time.Duration } diff --git a/pkg/mocks/mock_k6_client.go b/pkg/mocks/mock_k6_client.go index 2bdb6a4..f929894 100644 --- a/pkg/mocks/mock_k6_client.go +++ b/pkg/mocks/mock_k6_client.go @@ -5,8 +5,10 @@ package mocks import ( + context "context" io "io" reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" k6 "github.com/grafana/flagger-k6-webhook/pkg/k6" @@ -36,18 +38,18 @@ func (m *MockK6Client) EXPECT() *MockK6ClientMockRecorder { } // Start mocks base method. -func (m *MockK6Client) Start(arg0 string, arg1 bool, arg2 map[string]string, arg3 io.Writer) (k6.TestRun, error) { +func (m *MockK6Client) Start(arg0 context.Context, arg1 string, arg2 bool, arg3 map[string]string, arg4 io.Writer) (k6.TestRun, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "Start", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(k6.TestRun) ret1, _ := ret[1].(error) return ret0, ret1 } // Start indicates an expected call of Start. -func (mr *MockK6ClientMockRecorder) Start(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockK6ClientMockRecorder) Start(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockK6Client)(nil).Start), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockK6Client)(nil).Start), arg0, arg1, arg2, arg3, arg4) } // MockK6TestRun is a mock of TestRun interface. @@ -73,6 +75,76 @@ func (m *MockK6TestRun) EXPECT() *MockK6TestRunMockRecorder { return m.recorder } +// ExecutionDuration mocks base method. +func (m *MockK6TestRun) ExecutionDuration() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExecutionDuration") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// ExecutionDuration indicates an expected call of ExecutionDuration. +func (mr *MockK6TestRunMockRecorder) ExecutionDuration() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecutionDuration", reflect.TypeOf((*MockK6TestRun)(nil).ExecutionDuration)) +} + +// ExitCode mocks base method. +func (m *MockK6TestRun) ExitCode() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExitCode") + ret0, _ := ret[0].(int) + return ret0 +} + +// ExitCode indicates an expected call of ExitCode. +func (mr *MockK6TestRunMockRecorder) ExitCode() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExitCode", reflect.TypeOf((*MockK6TestRun)(nil).ExitCode)) +} + +// Exited mocks base method. +func (m *MockK6TestRun) Exited() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Exited") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Exited indicates an expected call of Exited. +func (mr *MockK6TestRunMockRecorder) Exited() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exited", reflect.TypeOf((*MockK6TestRun)(nil).Exited)) +} + +// Kill mocks base method. +func (m *MockK6TestRun) Kill() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Kill") + ret0, _ := ret[0].(error) + return ret0 +} + +// Kill indicates an expected call of Kill. +func (mr *MockK6TestRunMockRecorder) Kill() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Kill", reflect.TypeOf((*MockK6TestRun)(nil).Kill)) +} + +// PID mocks base method. +func (m *MockK6TestRun) PID() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PID") + ret0, _ := ret[0].(int) + return ret0 +} + +// PID indicates an expected call of PID. +func (mr *MockK6TestRunMockRecorder) PID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PID", reflect.TypeOf((*MockK6TestRun)(nil).PID)) +} + // Wait mocks base method. func (m *MockK6TestRun) Wait() error { m.ctrl.T.Helper() diff --git a/pkg/server.go b/pkg/server.go index 4435a18..80f4323 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -1,8 +1,10 @@ package pkg import ( + "context" "fmt" "net/http" + "time" "github.com/grafana/flagger-k6-webhook/pkg/handlers" "github.com/grafana/flagger-k6-webhook/pkg/k6" @@ -14,8 +16,14 @@ import ( "k8s.io/client-go/kubernetes" ) -func Listen(client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client, port int) error { - launchHandler, err := handlers.NewLaunchHandler(client, kubeClient, slackClient) +func Listen(ctx context.Context, client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client, port int, maxProcessHandlers int) error { + launcherCtx, cancelLaunchCtx := context.WithCancel(ctx) + launchHandler, err := handlers.NewLaunchHandler(launcherCtx, client, kubeClient, slackClient, maxProcessHandlers) + defer func() { + logrus.Debug("shutting down launch handler") + cancelLaunchCtx() + launchHandler.Wait() + }() if err != nil { return err } @@ -23,10 +31,24 @@ func Listen(client k6.Client, kubeClient kubernetes.Interface, slackClient slack serveAddress := fmt.Sprintf(":%d", port) logrus.Info("starting server at " + serveAddress) - http.HandleFunc("/health", handlers.HandleHealth) - http.Handle("/metrics", promhttp.Handler()) + mux := http.NewServeMux() + srv := http.Server{ + Handler: mux, + Addr: serveAddress, + } + + go func() { + <-ctx.Done() + cancelLaunchCtx() + timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + _ = srv.Shutdown(timeoutCtx) + }() + + mux.HandleFunc("/health", handlers.HandleHealth) + mux.Handle("/metrics", promhttp.Handler()) - http.Handle("/launch-test", + mux.Handle("/launch-test", promhttp.InstrumentHandlerCounter( promauto.NewCounterVec( prometheus.CounterOpts{ @@ -39,5 +61,5 @@ func Listen(client k6.Client, kubeClient kubernetes.Interface, slackClient slack ), ) - return http.ListenAndServe(serveAddress, nil) + return srv.ListenAndServe() }