Skip to content

Commit

Permalink
First retry implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklas-dohrn committed Oct 10, 2024
1 parent 800aa49 commit f00fb11
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
7 changes: 3 additions & 4 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewHTTPSWriter(
}
}

func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
func (w *HTTPSWriter) sendHttpRequest(msg []byte) error {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
Expand All @@ -63,8 +63,6 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
}

w.egressMetric.Add(msgCount)

return nil
}

Expand All @@ -76,10 +74,11 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
}

for _, msg := range msgs {
err = w.sendHttpRequest(msg, 1)
err = w.sendHttpRequest(msg)
if err != nil {
return err
}
w.egressMetric.Add(1)
}

return nil
Expand Down
16 changes: 12 additions & 4 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
*Retryer
msgs chan []byte
batchSize int
sendInterval time.Duration
Expand All @@ -27,6 +28,7 @@ func NewHTTPSBatchWriter(
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
retryer *Retryer,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
Expand All @@ -43,6 +45,7 @@ func NewHTTPSBatchWriter(
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
Retryer: retryer,
}
go BatchWriter.startSender()
return BatchWriter
Expand Down Expand Up @@ -74,6 +77,13 @@ func (w *HTTPSBatchWriter) startSender() {
msgCount = 0
t.Reset(w.sendInterval)
}
sendBatch := func() {
err := w.Retryer.Retry(msgBatch.Bytes(), w.sendHttpRequest)
if err == nil {
w.egressMetric.Add(msgCount)
}
reset()
}
for {
select {
case msg := <-w.msgs:
Expand All @@ -84,14 +94,12 @@ func (w *HTTPSBatchWriter) startSender() {
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ var _ = Describe("HTTPS_batch", func() {
"test-app-id",
"test-hostname",
)
retryer := syslog.NewBackoffRetryer(
b,
syslog.ExponentialDuration,
2,
)
writer = syslog.NewHTTPSBatchWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
retryer,
)
})

Expand Down
52 changes: 52 additions & 0 deletions src/pkg/egress/syslog/retryer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package syslog

import (
"log"
"time"

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

// RetryWriter wraps a WriteCloser and will retry writes if the first fails.
type Retryer struct {
retryDuration RetryDuration
maxRetries int
binding *URLBinding
}

func NewBackoffRetryer(
urlBinding *URLBinding,
retryDuration RetryDuration,
maxRetries int,
) *Retryer {
return &Retryer{
retryDuration: retryDuration,
maxRetries: maxRetries,
binding: urlBinding,
}
}

// Write will retry writes unitl maxRetries has been reached.
func (r *Retryer) Retry(message []byte, fn func(msg []byte) error) error {
logTemplate := "failed to write to %s, retrying in %s, err: %s"

var err error

for i := 0; i < r.maxRetries; i++ {
err = fn(message)
if err == nil {
return nil
}

if egress.ContextDone(r.binding.Context) {
return err
}

sleepDuration := r.retryDuration(i)
log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err)

time.Sleep(sleepDuration)
}

return err
}
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
}
converter := NewConverter(o...)

retryer := NewBackoffRetryer(
ub,
ExponentialDuration,
maxRetries)

var w egress.WriteCloser
switch ub.URL.Scheme {
case "https":
Expand All @@ -113,6 +118,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
tlsCfg,
egressMetric,
converter,
retryer,
)
case "syslog":
w = NewTCPWriter(
Expand Down

0 comments on commit f00fb11

Please sign in to comment.