Skip to content

Commit

Permalink
fix: Register processes for cleanup (#153)
Browse files Browse the repository at this point in the history
* fix: Register processes for cleanup

* Make maxProcessHandlers configurable

* Add tests and improve handler cleanup

* Start k6 with a Context to make killing it easier

* Run tests with race detector

* Fix linting issues

* Introduce a top-level context

* Cancel test runs with WaitForResults if the HTTP request is canceled

* Fix cyclomatic complexity issue in ServerHTTP

* Introduce maxConcurrentTests limit on ServerHTTP level

* Fix linting issue

* Tune internal queue for processes to be handled

* Use http.StatusTooManyRequests

Co-authored-by: Iain Lane <iain@orangesquash.org.uk>

* Log if the maximum concurrent tests limit is reached

* Calculate Retry-After value from the median testrun times

* Fix linting issue

* Make metric name a constant

---------

Co-authored-by: Iain Lane <iain@orangesquash.org.uk>
  • Loading branch information
zerok and iainlane authored Jun 25, 2024
1 parent e78395d commit 127f1b9
Show file tree
Hide file tree
Showing 8 changed files with 552 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
run: go build ./...

- name: Test
run: go test ./...
run: go test ./... -race

- name: Lint
uses: golangci/golangci-lint-action@v6
Expand Down
29 changes: 21 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/grafana/flagger-k6-webhook/pkg"
"github.com/grafana/flagger-k6-webhook/pkg/k6"
Expand All @@ -14,13 +17,15 @@ import (
)

const (
defaultPort = 8000
defaultPort = 8000
defaultMaxConcurrentTests = 1000

flagCloudToken = "cloud-token"
flagLogLevel = "log-level"
flagListenPort = "listen-port"
flagSlackToken = "slack-token"
flagKubernetesClient = "kubernetes-client"
flagCloudToken = "cloud-token"
flagLogLevel = "log-level"
flagListenPort = "listen-port"
flagSlackToken = "slack-token"
flagKubernetesClient = "kubernetes-client"
flagMaxConcurrentTests = "max-concurrent-tests"

kubernetesClientNone = "none"
kubernetesClientInCluster = "in-cluster"
Expand All @@ -33,6 +38,8 @@ func main() {
}

func run(args []string) error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
app := cli.NewApp()
app.Name = "flagger-k6-webhook"
app.Usage = "Launches k6 load testing from a flagger webhook"
Expand Down Expand Up @@ -63,12 +70,18 @@ func run(args []string) error {
Value: kubernetesClientNone,
Usage: fmt.Sprintf("Kubernetes client to use: '%s' or '%s'", kubernetesClientInCluster, kubernetesClientNone),
},
&cli.IntFlag{
Name: flagMaxConcurrentTests,
EnvVars: []string{"MAX_CONCURRENT_TESTS"},
Value: defaultMaxConcurrentTests,
},
}

return app.Run(args)
return app.RunContext(ctx, args)
}

func launchServer(c *cli.Context) error {
ctx := c.Context
logLevel, err := log.ParseLevel(c.String(flagLogLevel))
if err != nil {
return err
Expand All @@ -95,5 +108,5 @@ func launchServer(c *cli.Context) error {
log.Info("not creating a kubernetes client")
}

return pkg.Listen(client, kubeClient, slackClient, c.Int(flagListenPort))
return pkg.Listen(ctx, client, kubeClient, slackClient, c.Int(flagListenPort), c.Int(flagMaxConcurrentTests))
}
198 changes: 189 additions & 9 deletions pkg/handlers/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/grafana/flagger-k6-webhook/pkg/k6"
"github.com/grafana/flagger-k6-webhook/pkg/slack"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -24,6 +25,8 @@ const (
emojiSuccess = ":large_green_circle:"
emojiWarning = ":warning:"
emojiFailure = ":red_circle:"

metricTestDurationName = "launch_test_duration"
)

// https://regex101.com/r/OZwd8Y/1
Expand Down Expand Up @@ -144,23 +147,132 @@ type launchHandler struct {
lastFailureTime map[string]time.Time
lastFailureTimeMutex sync.Mutex

processToWaitFor chan k6.TestRun
waitForProcessesDone chan struct{}
ctx context.Context

availableTestRuns chan struct{}

metricsRegistry *prometheus.Registry
metricTestDuration *prometheus.SummaryVec

// mockables
sleep func(time.Duration)
}

type LaunchHandler interface {
http.Handler
Wait()
}

// NewLaunchHandler returns an handler that launches a k6 load test.
func NewLaunchHandler(client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client) (http.Handler, error) {
func NewLaunchHandler(ctx context.Context, client k6.Client, kubeClient kubernetes.Interface, slackClient slack.Client, maxConcurrentTests int) (LaunchHandler, error) {
if slackClient == nil {
return nil, errors.New("unexpected state. Slack client is nil")
}

return &launchHandler{
client: client,
kubeClient: kubeClient,
slackClient: slackClient,
lastFailureTime: make(map[string]time.Time),
sleep: time.Sleep,
}, nil
h := &launchHandler{
client: client,
kubeClient: kubeClient,
slackClient: slackClient,
lastFailureTime: make(map[string]time.Time),
sleep: time.Sleep,
processToWaitFor: make(chan k6.TestRun, maxConcurrentTests),
waitForProcessesDone: make(chan struct{}, 1),
ctx: ctx,
}
h.availableTestRuns = make(chan struct{}, maxConcurrentTests)
for range maxConcurrentTests {
h.availableTestRuns <- struct{}{}
}

metricMaxConcurrentTests := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "launch_max_concurrent_tests",
Help: "The maximum number of concurrent tests",
})
metricMaxConcurrentTests.Set(float64(maxConcurrentTests))
if err := prometheus.Register(metricMaxConcurrentTests); err != nil {
log.Warnf("Failed to register new metric: %s", err.Error())
}

metricAvailableConcurrentTests := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "launch_available_concurrent_tests",
Help: "The current number of available concurrent tests. If 0 then new requests will be rejected",
}, func() float64 {
return float64(len(h.availableTestRuns))
})
if err := prometheus.Register(metricAvailableConcurrentTests); err != nil {
log.Warnf("Failed to register new metric: %s", err.Error())
}

// metricTestDuration is an internal metric that we use to calculate the
// expected wait time in case the maximum number of concurrent tests is
// reached:
metricTestDuration := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metricTestDurationName,
Help: "Durations of the executed k6 test run in seconds",
Objectives: map[float64]float64{0.5: float64(30)},
}, []string{"exit_code"})
h.metricTestDuration = metricTestDuration
h.metricsRegistry = prometheus.NewRegistry()
_ = h.metricsRegistry.Register(h.metricTestDuration)

go h.waitForProcesses(ctx)
return h, nil
}

// Wait is blocking until all subprocesses have terminated. This should only be
// used if the passed context can (and is) canceled.
func (h *launchHandler) Wait() {
<-h.waitForProcessesDone
log.Debug("launch handler finished")
}

// waitForProcesses handles incoming processes and waits for them to complete.
// This way we can avoid k6 jobs where we do not need the results to become
// zombie processes.
func (h *launchHandler) waitForProcesses(ctx context.Context) {
defer func() {
h.waitForProcessesDone <- struct{}{}
}()
wg := sync.WaitGroup{}
loop:
for {
select {
case cmd := <-h.processToWaitFor:
wg.Add(1)
go func() {
h.waitForProcess(cmd)
wg.Done()
}()
case <-ctx.Done():
break loop
}
}
wg.Wait()
}

func (h *launchHandler) waitForProcess(cmd k6.TestRun) {
if cmd == nil {
log.Warnf("nil as testrun passed")
return
}
pid := cmd.PID()
log.WithField("pid", pid).Debug("waiting for testrun to exit")
_ = cmd.Wait()
h.trackExecutionDuration(cmd)
log.WithField("pid", pid).Debugf("testrun exited")

h.availableTestRuns <- struct{}{}
}

// registerProcessCleanup adds a handler to the process so that it will
// eventually be closed and its resources returned.
//
// Note that this method can actually block which will, in turn, cause the
// calling HTTP handler to be blocked.
func (h *launchHandler) registerProcessCleanup(cmd k6.TestRun) {
h.processToWaitFor <- cmd
}

func (h *launchHandler) getLastFailureTime(payload *launchPayload) (time.Time, bool) {
Expand Down Expand Up @@ -213,8 +325,36 @@ func (h *launchHandler) buildEnvVars(payload *launchPayload) (map[string]string,
return envVars, nil
}

func (h *launchHandler) getWaitTime() int64 {
families, err := h.metricsRegistry.Gather()
if err != nil {
return 60
}
for _, family := range families {
if family.GetName() == metricTestDurationName {
for _, metric := range family.GetMetric() {
for _, quantile := range metric.GetSummary().GetQuantile() {
if quantile.GetQuantile() == 0.5 {
result := quantile.GetValue()
return int64(result)
}
}
}
}
}
return 60
}

func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
cmdLog := createLogEntry(req)
select {
case <-h.availableTestRuns:
default:
cmdLog.Warn("Maximum concurrent test runs reached. Rejecting request.")
resp.Header().Set("Retry-After", fmt.Sprintf("%d", h.getWaitTime()))
http.Error(resp, "Maximum concurrent test runs reached", http.StatusTooManyRequests)
return
}
logIfError := func(err error) {
if err != nil {
cmdLog.Error(err)
Expand Down Expand Up @@ -253,8 +393,18 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
return
}

ctx, cancelCtx := context.WithCancel(context.Background())
defer func() {
if payload.Metadata.WaitForResults {
cancelCtx()
}
}()
go func() {
h.propagateCancel(req, payload, cancelCtx)
}()

cmdLog.Info("launching k6 test")
cmd, err := h.client.Start(payload.Metadata.Script, payload.Metadata.UploadToCloud, envVars, &buf)
cmd, err := h.client.Start(ctx, payload.Metadata.Script, payload.Metadata.UploadToCloud, envVars, &buf)
if err != nil {
fail(fmt.Sprintf("error while launching the test: %v", err))
return
Expand All @@ -268,13 +418,15 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
logIfError(err)
logIfError(h.slackClient.AddFileToThreads(slackMessages, "k6-results.txt", buf.String()))
fail(fmt.Sprintf("error while waiting for test to start: %v", waitErr))
h.registerProcessCleanup(cmd)
return
}

if payload.Metadata.UploadToCloud {
url, err := getCloudURL(buf.String())
if err != nil {
fail(err.Error())
h.registerProcessCleanup(cmd)
return
}
slackContext += fmt.Sprintf("\nCloud URL: <%s>", url)
Expand All @@ -287,12 +439,17 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {

if !payload.Metadata.WaitForResults {
cmdLog.Infof("the load test for %s.%s was launched successfully!", payload.Name, payload.Namespace)
h.registerProcessCleanup(cmd)
return
}

// Wait for the test to finish and write the output to slack
defer func() {
h.availableTestRuns <- struct{}{}
}()
cmdLog.Info("waiting for the results")
err = cmd.Wait()
h.trackExecutionDuration(cmd)
logIfError(h.slackClient.AddFileToThreads(slackMessages, "k6-results.txt", buf.String()))

// Load testing failed, log the output
Expand All @@ -309,6 +466,29 @@ func (h *launchHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
cmdLog.Infof("the load test for %s.%s succeeded!", payload.Name, payload.Namespace)
}

func (h *launchHandler) trackExecutionDuration(cmd k6.TestRun) {
if dur := cmd.ExecutionDuration(); dur != 0 {
h.metricTestDuration.With(prometheus.Labels{"exit_code": fmt.Sprintf("%d", cmd.ExitCode())}).Observe(float64(dur / time.Second))
}
}

func (h *launchHandler) propagateCancel(req *http.Request, payload *launchPayload, cancelCtx context.CancelFunc) {
if payload.Metadata.WaitForResults {
select {
case <-req.Context().Done():
cancelCtx()
case <-h.ctx.Done():
cancelCtx()
}
} else {
// If we are not waiting for the results then we should only cancel
// if the global context is done:
<-h.ctx.Done()
cancelCtx()
}
log.Info("canceling process")
}

func (h *launchHandler) waitForOutputPath(cmdLog *log.Entry, buf *bytes.Buffer) error {
for i := 0; i < 10; i++ {
if strings.Contains(buf.String(), "output:") {
Expand Down
Loading

0 comments on commit 127f1b9

Please sign in to comment.