Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators can suppress warnings for invalid drains #422

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions jobs/loggr-syslog-agent/spec
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ properties:
logging.format.timestamp:
description: "Format for timestamp in component logs. Valid values are 'deprecated' and 'rfc3339'."
default: "deprecated"

warn_on_invalid_drains:
description: "Whether to output log warnings on invalid drains"
default: true
1 change: 1 addition & 0 deletions jobs/loggr-syslog-agent/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"DEBUG_METRICS" => "#{p("metrics.debug")}",
"PPROF_PORT" => "#{p("metrics.pprof_port")}",
"USE_RFC3339" => "#{p("logging.format.timestamp") == "rfc3339"}",
"WARN_ON_INVALID_DRAINS" => "#{p("warn_on_invalid_drains")}",
}
}
if_p("drain_cipher_suites") do | ciphers |
Expand Down
1 change: 1 addition & 0 deletions src/cmd/syslog-agent/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
DrainTrustedCAFile string `env:"DRAIN_TRUSTED_CA_FILE, report"`
DefaultDrainMetadata bool `env:"DEFAULT_DRAIN_METADATA, report"`
IdleDrainTimeout time.Duration `env:"IDLE_DRAIN_TIMEOUT, report"`
WarnOnInvalidDrains bool `env:"WARN_ON_INVALID_DRAINS, report"`

GRPC GRPC
Cache Cache
Expand Down
1 change: 1 addition & 0 deletions src/cmd/syslog-agent/app/syslog_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewSyslogAgent(
&cfg.Cache.Blacklist,
bindings.NewBindingFetcher(cfg.BindingsPerAppLimit, cacheClient, m, l),
m,
cfg.WarnOnInvalidDrains,
l,
)
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
Expand Down
21 changes: 14 additions & 7 deletions src/pkg/ingress/bindings/filtered_binding_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ type metricsClient interface {
type FilteredBindingFetcher struct {
ipChecker IPChecker
br binding.Fetcher
warn bool
logger *log.Logger
invalidDrains metrics.Gauge
blacklistedDrains metrics.Gauge
}

func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, lc *log.Logger) *FilteredBindingFetcher {
func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher {
opt := metrics.WithMetricLabels(map[string]string{"unit": "total"})

invalidDrains := m.NewGauge(
Expand All @@ -44,10 +45,10 @@ func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient,
"Count of blacklisted drains encountered in last binding fetch.",
opt,
)

return &FilteredBindingFetcher{
ipChecker: c,
br: b,
warn: warn,
logger: lc,
invalidDrains: invalidDrains,
blacklistedDrains: blacklistedDrains,
Expand All @@ -71,7 +72,7 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {
u, err := url.Parse(b.Drain.Url)
if err != nil {
invalidDrains += 1
f.logger.Printf("Cannot parse syslog drain url for application %s", b.AppId)
f.printWarning("Cannot parse syslog drain url for application %s", b.AppId)
continue
}

Expand All @@ -80,28 +81,28 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {
anonymousUrl.RawQuery = ""

if invalidScheme(u.Scheme) {
f.logger.Printf("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId)
f.printWarning("Invalid scheme %s in syslog drain url %s for application %s", u.Scheme, anonymousUrl.String(), b.AppId)
continue
}

if len(u.Host) == 0 {
invalidDrains += 1
f.logger.Printf("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId)
f.printWarning("No hostname found in syslog drain url %s for application %s", anonymousUrl.String(), b.AppId)
continue
}

ip, err := f.ipChecker.ResolveAddr(u.Host)
if err != nil {
invalidDrains += 1
f.logger.Printf("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId)
f.printWarning("Cannot resolve ip address for syslog drain with url %s for application %s", anonymousUrl.String(), b.AppId)
continue
}

err = f.ipChecker.CheckBlacklist(ip)
if err != nil {
invalidDrains += 1
blacklistedDrains += 1
f.logger.Printf("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId)
f.printWarning("Resolved ip address for syslog drain with url %s for application %s is blacklisted", anonymousUrl.String(), b.AppId)
continue
}

Expand All @@ -113,6 +114,12 @@ func (f *FilteredBindingFetcher) FetchBindings() ([]syslog.Binding, error) {
return newBindings, nil
}

func (f FilteredBindingFetcher) printWarning(format string, v ...any) {
if f.warn {
f.logger.Printf(format, v...)
}
}

func invalidScheme(scheme string) bool {
for _, s := range allowedSchemes {
if s == scheme {
Expand Down
106 changes: 97 additions & 9 deletions src/pkg/ingress/bindings/filtered_binding_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
}
bindingReader := &SpyBindingReader{bindings: input}

filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, log)
filter = bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log)
actual, err := filter.FetchBindings()

Expect(err).ToNot(HaveOccurred())
Expand All @@ -43,7 +43,7 @@ var _ = Describe("FilteredBindingFetcher", func() {
It("returns an error if the binding reader cannot fetch bindings", func() {
bindingReader := &SpyBindingReader{nil, errors.New("Woops")}

filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, log)
filter := bindings.NewFilteredBindingFetcher(&spyIPChecker{}, bindingReader, metrics, true, log)
actual, err := filter.FetchBindings()

Expect(err).To(HaveOccurred())
Expand All @@ -52,17 +52,23 @@ var _ = Describe("FilteredBindingFetcher", func() {

Context("when syslog drain is unparsable", func() {
var logBuffer bytes.Buffer
var warn bool

BeforeEach(func() {
logBuffer = bytes.Buffer{}
log.SetOutput(&logBuffer)
warn = true
})

JustBeforeEach(func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "://"}},
}

log.SetOutput(&logBuffer)
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
})
Expand All @@ -75,21 +81,38 @@ var _ = Describe("FilteredBindingFetcher", func() {
Expect(logBuffer.String()).Should(MatchRegexp("Cannot parse syslog drain url for application"))
Expect(metrics.GetMetric("invalid_drains", map[string]string{"unit": "total"}).Value()).To(Equal(1.0))
})

Context("when configured not to warn", func() {
BeforeEach(func() {
warn = false
})
It("doesn't log the warning", func() {
_, err := filter.FetchBindings()
Expect(err).ToNot(HaveOccurred())
Expect(logBuffer.String()).ToNot(MatchRegexp("Cannot parse syslog drain url for application"))
})
})
})

Context("when drain has no host", func() {
var logBuffer bytes.Buffer
var warn bool

BeforeEach(func() {
logBuffer = bytes.Buffer{}
log.SetOutput(&logBuffer)
warn = true
})

JustBeforeEach(func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "https:///path"}},
}

log.SetOutput(&logBuffer)
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
})
Expand All @@ -102,12 +125,24 @@ var _ = Describe("FilteredBindingFetcher", func() {
Expect(logBuffer.String()).Should(MatchRegexp("No hostname found in syslog drain url (.*) for application"))
Expect(metrics.GetMetric("invalid_drains", map[string]string{"unit": "total"}).Value()).To(Equal(1.0))
})

Context("when configured not to warn", func() {
BeforeEach(func() {
warn = false
})
It("doesn't log the warning", func() {
_, err := filter.FetchBindings()
Expect(err).ToNot(HaveOccurred())
Expect(logBuffer.String()).ToNot(MatchRegexp("No hostname found"))
})
})
})

Context("when syslog drain has unsupported scheme", func() {
var (
input []syslog.Binding
logBuffer bytes.Buffer
warn bool
)

BeforeEach(func() {
Expand All @@ -120,12 +155,19 @@ var _ = Describe("FilteredBindingFetcher", func() {
{AppId: "app-id", Hostname: "unknown", Drain: syslog.Drain{Url: "blah://10.10.10.10"}},
}

logBuffer = bytes.Buffer{}
log.SetOutput(&logBuffer)
warn = true

metrics = metricsHelpers.NewMetricsRegistry()
})

JustBeforeEach(func() {
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
})
Expand All @@ -138,23 +180,39 @@ var _ = Describe("FilteredBindingFetcher", func() {
Expect(logBuffer.String()).Should(MatchRegexp("Invalid scheme (.*) in syslog drain url (.*) for application"))
Expect(metrics.GetMetric("invalid_drains", map[string]string{"unit": "total"}).Value()).To(Equal(0.0))
})
Context("when configured not to warn", func() {
BeforeEach(func() {
warn = false
})
It("doesn't log the warning", func() {
_, err := filter.FetchBindings()
Expect(err).ToNot(HaveOccurred())
Expect(logBuffer.String()).ToNot(MatchRegexp("Invalid scheme"))
})
})
})

Context("when the drain host fails to resolve", func() {
var logBuffer bytes.Buffer
var warn bool

BeforeEach(func() {
logBuffer = bytes.Buffer{}
log.SetOutput(&logBuffer)
warn = true
})

JustBeforeEach(func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}},
}

log.SetOutput(&logBuffer)
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{
resolveAddrError: errors.New("resolve error"),
},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
})
Expand All @@ -167,24 +225,42 @@ var _ = Describe("FilteredBindingFetcher", func() {
Expect(logBuffer.String()).Should(MatchRegexp("Cannot resolve ip address for syslog drain with url"))
Expect(metrics.GetMetric("invalid_drains", map[string]string{"unit": "total"}).Value()).To(Equal(1.0))
})

Context("when configured not to warn", func() {
BeforeEach(func() {
warn = false
})
It("doesn't log the warning", func() {
_, err := filter.FetchBindings()
Expect(err).ToNot(HaveOccurred())
Expect(logBuffer.String()).ToNot(MatchRegexp("Cannot resolve ip address for syslog drain with url"))
})
})
})

Context("when the syslog drain has been blacklisted", func() {
var logBuffer bytes.Buffer
var warn bool

BeforeEach(func() {
logBuffer = bytes.Buffer{}
log.SetOutput(&logBuffer)
warn = true
})

JustBeforeEach(func() {
input := []syslog.Binding{
{AppId: "app-id", Hostname: "we.dont.care", Drain: syslog.Drain{Url: "syslog://some.invalid.host"}},
}

log.SetOutput(&logBuffer)
filter = bindings.NewFilteredBindingFetcher(
&spyIPChecker{
checkBlacklistError: errors.New("blacklist error"),
resolvedIP: net.ParseIP("127.0.0.1"),
},
&SpyBindingReader{bindings: input},
metrics,
warn,
log,
)
})
Expand All @@ -198,6 +274,18 @@ var _ = Describe("FilteredBindingFetcher", func() {
Expect(metrics.GetMetric("invalid_drains", map[string]string{"unit": "total"}).Value()).To(Equal(1.0))
Expect(metrics.GetMetric("blacklisted_drains", map[string]string{"unit": "total"}).Value()).To(Equal(1.0))
})

Context("when configured not to warn", func() {
BeforeEach(func() {
warn = false
})
It("doesn't log the warning", func() {
_, err := filter.FetchBindings()

Expect(err).ToNot(HaveOccurred())
Expect(logBuffer.String()).ToNot(MatchRegexp("Resolved ip address for syslog drain with url (.*) for application (.*) is blacklisted"))
})
})
})
})

Expand Down
Loading