From f00fb11977d10c05a2a0852d81ec6a4bd4163a90 Mon Sep 17 00:00:00 2001 From: nicklas dohrn Date: Mon, 7 Oct 2024 09:25:58 +0200 Subject: [PATCH] First retry implementation --- src/pkg/egress/syslog/https.go | 7 ++- src/pkg/egress/syslog/https_batch.go | 16 +++++-- src/pkg/egress/syslog/https_batch_test.go | 6 +++ src/pkg/egress/syslog/retryer.go | 52 +++++++++++++++++++++++ src/pkg/egress/syslog/writer_factory.go | 6 +++ 5 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 src/pkg/egress/syslog/retryer.go diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index 581fb7776..d814e3dbd 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -43,7 +43,7 @@ func NewHTTPSWriter( } } -func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { +func (w *HTTPSWriter) sendHttpRequest(msg []byte) error { req := fasthttp.AcquireRequest() req.SetRequestURI(w.url.String()) req.Header.SetMethod("POST") @@ -63,8 +63,6 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) } - w.egressMetric.Add(msgCount) - return nil } @@ -76,10 +74,11 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - err = w.sendHttpRequest(msg, 1) + err = w.sendHttpRequest(msg) if err != nil { return err } + w.egressMetric.Add(1) } return nil diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index 56bc55731..be79abaa4 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -15,6 +15,7 @@ const BATCHSIZE = 256 * 1024 type HTTPSBatchWriter struct { HTTPSWriter + *Retryer msgs chan []byte batchSize int sendInterval time.Duration @@ -27,6 +28,7 @@ func NewHTTPSBatchWriter( tlsConf *tls.Config, egressMetric metrics.Counter, c *Converter, + retryer *Retryer, ) egress.WriteCloser { client := httpClient(netConf, tlsConf) binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme @@ -43,6 +45,7 @@ func NewHTTPSBatchWriter( sendInterval: 1 * time.Second, egrMsgCount: 0, msgs: make(chan []byte), + Retryer: retryer, } go BatchWriter.startSender() return BatchWriter @@ -74,6 +77,13 @@ func (w *HTTPSBatchWriter) startSender() { msgCount = 0 t.Reset(w.sendInterval) } + sendBatch := func() { + err := w.Retryer.Retry(msgBatch.Bytes(), w.sendHttpRequest) + if err == nil { + w.egressMetric.Add(msgCount) + } + reset() + } for { select { case msg := <-w.msgs: @@ -84,14 +94,12 @@ func (w *HTTPSBatchWriter) startSender() { } else { msgCount++ if length >= w.batchSize { - w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck - reset() + sendBatch() } } case <-t.C: if msgBatch.Len() > 0 { - w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck - reset() + sendBatch() } } } diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go index 35789bcb6..149d05ab6 100644 --- a/src/pkg/egress/syslog/https_batch_test.go +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -39,12 +39,18 @@ var _ = Describe("HTTPS_batch", func() { "test-app-id", "test-hostname", ) + retryer := syslog.NewBackoffRetryer( + b, + syslog.ExponentialDuration, + 2, + ) writer = syslog.NewHTTPSBatchWriter( b, netConf, skipSSLTLSConfig, &metricsHelpers.SpyMetric{}, c, + retryer, ) }) diff --git a/src/pkg/egress/syslog/retryer.go b/src/pkg/egress/syslog/retryer.go new file mode 100644 index 000000000..5d8700af5 --- /dev/null +++ b/src/pkg/egress/syslog/retryer.go @@ -0,0 +1,52 @@ +package syslog + +import ( + "log" + "time" + + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" +) + +// RetryWriter wraps a WriteCloser and will retry writes if the first fails. +type Retryer struct { + retryDuration RetryDuration + maxRetries int + binding *URLBinding +} + +func NewBackoffRetryer( + urlBinding *URLBinding, + retryDuration RetryDuration, + maxRetries int, +) *Retryer { + return &Retryer{ + retryDuration: retryDuration, + maxRetries: maxRetries, + binding: urlBinding, + } +} + +// Write will retry writes unitl maxRetries has been reached. +func (r *Retryer) Retry(message []byte, fn func(msg []byte) error) error { + logTemplate := "failed to write to %s, retrying in %s, err: %s" + + var err error + + for i := 0; i < r.maxRetries; i++ { + err = fn(message) + if err == nil { + return nil + } + + if egress.ContextDone(r.binding.Context) { + return err + } + + sleepDuration := r.retryDuration(i) + log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err) + + time.Sleep(sleepDuration) + } + + return err +} diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 8ec8249ab..43acf57a8 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -96,6 +96,11 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { } converter := NewConverter(o...) + retryer := NewBackoffRetryer( + ub, + ExponentialDuration, + maxRetries) + var w egress.WriteCloser switch ub.URL.Scheme { case "https": @@ -113,6 +118,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { tlsCfg, egressMetric, converter, + retryer, ) case "syslog": w = NewTCPWriter(