Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry refactor to allow for batching to retry. #617

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewHTTPSWriter(
}
}

func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
func (w *HTTPSWriter) sendHttpRequest(msg []byte) error {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
Expand All @@ -63,8 +63,6 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
}

w.egressMetric.Add(msgCount)

return nil
}

Expand All @@ -76,10 +74,11 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
}

for _, msg := range msgs {
err = w.sendHttpRequest(msg, 1)
err = w.sendHttpRequest(msg)
if err != nil {
return err
}
w.egressMetric.Add(1)
}

return nil
Expand Down
16 changes: 12 additions & 4 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
*Retryer
msgs chan []byte
batchSize int
sendInterval time.Duration
Expand All @@ -27,6 +28,7 @@ func NewHTTPSBatchWriter(
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
retryer *Retryer,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
Expand All @@ -43,6 +45,7 @@ func NewHTTPSBatchWriter(
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
Retryer: retryer,
}
go BatchWriter.startSender()
return BatchWriter
Expand Down Expand Up @@ -74,6 +77,13 @@ func (w *HTTPSBatchWriter) startSender() {
msgCount = 0
t.Reset(w.sendInterval)
}
sendBatch := func() {
err := w.Retryer.Retry(msgBatch.Bytes(), w.sendHttpRequest)
if err == nil {
w.egressMetric.Add(msgCount)
}
reset()
}
for {
select {
case msg := <-w.msgs:
Expand All @@ -84,14 +94,12 @@ func (w *HTTPSBatchWriter) startSender() {
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ var _ = Describe("HTTPS_batch", func() {
"test-app-id",
"test-hostname",
)
retryer := syslog.NewBackoffRetryer(
b,
syslog.ExponentialDuration,
2,
)
writer = syslog.NewHTTPSBatchWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
retryer,
)
})

Expand Down
52 changes: 52 additions & 0 deletions src/pkg/egress/syslog/retryer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package syslog

import (
"log"
"time"

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

// RetryWriter wraps a WriteCloser and will retry writes if the first fails.
type Retryer struct {
retryDuration RetryDuration
maxRetries int
binding *URLBinding
}

func NewBackoffRetryer(
urlBinding *URLBinding,
retryDuration RetryDuration,
maxRetries int,
) *Retryer {
return &Retryer{
retryDuration: retryDuration,
maxRetries: maxRetries,
binding: urlBinding,
}
}

// Write will retry writes unitl maxRetries has been reached.
func (r *Retryer) Retry(message []byte, fn func(msg []byte) error) error {
logTemplate := "failed to write to %s, retrying in %s, err: %s"

var err error

for i := 0; i < r.maxRetries; i++ {
err = fn(message)
if err == nil {
return nil
}

if egress.ContextDone(r.binding.Context) {
return err
}

sleepDuration := r.retryDuration(i)
log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err)

time.Sleep(sleepDuration)
}

return err
}
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
}
converter := NewConverter(o...)

retryer := NewBackoffRetryer(
ub,
ExponentialDuration,
maxRetries)

var w egress.WriteCloser
switch ub.URL.Scheme {
case "https":
Expand All @@ -113,6 +118,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
tlsCfg,
egressMetric,
converter,
retryer,
)
case "syslog":
w = NewTCPWriter(
Expand Down
Loading