Skip to content

Commit

Permalink
Scrape config and probe support in target allocator (#3394)
Browse files Browse the repository at this point in the history
* Enable scrape config and probe support in TA

* chlog

* fix the stopping

* remove log

* downgrade to 1.22, oops

* comments
  • Loading branch information
jaronoff97 authored Nov 5, 2024
1 parent ad83cb9 commit 05228b9
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 36 deletions.
16 changes: 16 additions & 0 deletions .chloggen/scrape-config-probe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: enables support for pulling scrape config and probe CRDs in the target allocator

# One or more tracking issues related to the change
issues: [1842]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
6 changes: 5 additions & 1 deletion cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ type Config struct {
type PrometheusCRConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
PodMonitorSelector *metav1.LabelSelector `yaml:"pod_monitor_selector,omitempty"`
PodMonitorNamespaceSelector *metav1.LabelSelector `yaml:"pod_monitor_namespace_selector,omitempty"`
ServiceMonitorSelector *metav1.LabelSelector `yaml:"service_monitor_selector,omitempty"`
ServiceMonitorNamespaceSelector *metav1.LabelSelector `yaml:"service_monitor_namespace_selector,omitempty"`
PodMonitorNamespaceSelector *metav1.LabelSelector `yaml:"pod_monitor_namespace_selector,omitempty"`
ScrapeConfigSelector *metav1.LabelSelector `yaml:"scrape_config_selector,omitempty"`
ScrapeConfigNamespaceSelector *metav1.LabelSelector `yaml:"scrape_config_namespace_selector,omitempty"`
ProbeSelector *metav1.LabelSelector `yaml:"probe_selector,omitempty"`
ProbeNamespaceSelector *metav1.LabelSelector `yaml:"probe_namespace_selector,omitempty"`
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
}

Expand Down
107 changes: 89 additions & 18 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
gokitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
Expand Down Expand Up @@ -53,6 +53,9 @@ const (
)

func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
// TODO: Remove this after go 1.23 upgrade
promLogger := level.NewFilter(gokitlog.NewLogfmtLogger(os.Stderr), level.AllowWarn())
slogger := slog.New(logr.ToSlogHandler(logger))
var resourceSelector *prometheus.ResourceSelector
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
Expand All @@ -79,18 +82,20 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
Spec: monitoringv1.PrometheusSpec{
CommonPrometheusFields: monitoringv1.CommonPrometheusFields{
ScrapeInterval: monitoringv1.Duration(cfg.PrometheusCR.ScrapeInterval.String()),
ServiceMonitorSelector: cfg.PrometheusCR.ServiceMonitorSelector,
PodMonitorSelector: cfg.PrometheusCR.PodMonitorSelector,
ServiceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
PodMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
ServiceMonitorSelector: cfg.PrometheusCR.ServiceMonitorSelector,
ServiceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
ScrapeConfigSelector: cfg.PrometheusCR.ScrapeConfigSelector,
ScrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
ProbeSelector: cfg.PrometheusCR.ProbeSelector,
ProbeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
ServiceDiscoveryRole: &serviceDiscoveryRole,
},
},
}

promOperatorLogger := level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.AllowWarn())
promOperatorSlogLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
generator, err := prometheus.NewConfigGenerator(promOperatorLogger, prom, true)
generator, err := prometheus.NewConfigGenerator(promLogger, prom, true)

if err != nil {
return nil, err
Expand All @@ -108,21 +113,21 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
logger.Error(err, "Retrying namespace informer creation in promOperator CRD watcher")
return true
}, func() error {
nsMonInf, err = getNamespaceInformer(ctx, map[string]struct{}{v1.NamespaceAll: {}}, promOperatorLogger, clientset, operatorMetrics)
nsMonInf, err = getNamespaceInformer(ctx, map[string]struct{}{v1.NamespaceAll: {}}, promLogger, clientset, operatorMetrics)
return err
})
if getNamespaceInformerErr != nil {
logger.Error(getNamespaceInformerErr, "Failed to create namespace informer in promOperator CRD watcher")
return nil, getNamespaceInformerErr
}

resourceSelector, err = prometheus.NewResourceSelector(promOperatorSlogLogger, prom, store, nsMonInf, operatorMetrics, eventRecorder)
resourceSelector, err = prometheus.NewResourceSelector(slogger, prom, store, nsMonInf, operatorMetrics, eventRecorder)
if err != nil {
logger.Error(err, "Failed to create resource selector in promOperator CRD watcher")
}

return &PrometheusCRWatcher{
logger: logger,
logger: slogger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
Expand All @@ -133,13 +138,15 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
kubeConfigPath: cfg.KubeConfigFilePath,
podMonitorNamespaceSelector: cfg.PrometheusCR.PodMonitorNamespaceSelector,
serviceMonitorNamespaceSelector: cfg.PrometheusCR.ServiceMonitorNamespaceSelector,
scrapeConfigNamespaceSelector: cfg.PrometheusCR.ScrapeConfigNamespaceSelector,
probeNamespaceSelector: cfg.PrometheusCR.ProbeNamespaceSelector,
resourceSelector: resourceSelector,
store: store,
}, nil
}

type PrometheusCRWatcher struct {
logger logr.Logger
logger *slog.Logger
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
Expand All @@ -150,12 +157,13 @@ type PrometheusCRWatcher struct {
kubeConfigPath string
podMonitorNamespaceSelector *metav1.LabelSelector
serviceMonitorNamespaceSelector *metav1.LabelSelector
scrapeConfigNamespaceSelector *metav1.LabelSelector
probeNamespaceSelector *metav1.LabelSelector
resourceSelector *prometheus.ResourceSelector
store *assets.StoreBuilder
}

func getNamespaceInformer(ctx context.Context, allowList map[string]struct{}, promOperatorLogger log.Logger, clientset kubernetes.Interface, operatorMetrics *operator.Metrics) (cache.SharedIndexInformer, error) {

func getNamespaceInformer(ctx context.Context, allowList map[string]struct{}, promOperatorLogger gokitlog.Logger, clientset kubernetes.Interface, operatorMetrics *operator.Metrics) (cache.SharedIndexInformer, error) {
kubernetesVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
return nil, err
Expand Down Expand Up @@ -196,9 +204,21 @@ func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informe
return nil, err
}

probeInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName))
if err != nil {
return nil, err
}

scrapeConfigInformers, err := informers.NewInformersForResource(factory, promv1alpha1.SchemeGroupVersion.WithResource(promv1alpha1.ScrapeConfigName))
if err != nil {
return nil, err
}

return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
monitoringv1.ProbeName: probeInformers,
promv1alpha1.ScrapeConfigName: scrapeConfigInformers,
}, nil
}

Expand All @@ -210,7 +230,7 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch

if w.nsInformer != nil {
go w.nsInformer.Run(w.stopChannel)
if ok := cache.WaitForNamedCacheSync("namespace", w.stopChannel, w.nsInformer.HasSynced); !ok {
if ok := w.WaitForNamedCacheSync("namespace", w.nsInformer.HasSynced); !ok {
success = false
}

Expand All @@ -228,10 +248,12 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
for name, selector := range map[string]*metav1.LabelSelector{
"PodMonitorNamespaceSelector": w.podMonitorNamespaceSelector,
"ServiceMonitorNamespaceSelector": w.serviceMonitorNamespaceSelector,
"ProbeNamespaceSelector": w.probeNamespaceSelector,
"ScrapeConfigNamespaceSelector": w.scrapeConfigNamespaceSelector,
} {
sync, err := k8sutil.LabelSelectionHasChanged(old.Labels, cur.Labels, selector)
if err != nil {
w.logger.Error(err, "Failed to check label selection between namespaces while handling namespace updates", "selector", name)
w.logger.Error("Failed to check label selection between namespaces while handling namespace updates", "selector", name, "error", err)
return
}

Expand All @@ -252,8 +274,9 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
for name, resource := range w.informers {
resource.Start(w.stopChannel)

if ok := cache.WaitForNamedCacheSync(name, w.stopChannel, resource.HasSynced); !ok {
success = false
if ok := w.WaitForNamedCacheSync(name, resource.HasSynced); !ok {
w.logger.Info("skipping informer", "informer", name)
continue
}

// only send an event notification if there isn't one already
Expand Down Expand Up @@ -342,6 +365,16 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
return nil, err
}

probeInstances, err := w.resourceSelector.SelectProbes(ctx, w.informers[monitoringv1.ProbeName].ListAllByNamespace)
if err != nil {
return nil, err
}

scrapeConfigInstances, err := w.resourceSelector.SelectScrapeConfigs(ctx, w.informers[promv1alpha1.ScrapeConfigName].ListAllByNamespace)
if err != nil {
return nil, err
}

generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
"30s",
"",
Expand All @@ -352,8 +385,8 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
nil,
serviceMonitorInstances,
podMonitorInstances,
map[string]*monitoringv1.Probe{},
map[string]*promv1alpha1.ScrapeConfig{},
probeInstances,
scrapeConfigInstances,
w.store,
nil,
nil,
Expand Down Expand Up @@ -384,3 +417,41 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
return promCfg, nil
}
}

// WaitForNamedCacheSync adds a timeout to the informer's wait for the cache to be ready.
// If the PrometheusCRWatcher is unable to load an informer within 15 seconds, the method is
// cancelled and returns false. A successful informer load will return true. This method also
// will be cancelled if the target allocator's stopChannel is called before it returns.
//
// This method is inspired by the upstream prometheus-operator implementation, with a shorter timeout
// and support for the PrometheusCRWatcher's stopChannel.
// https://github.com/prometheus-operator/prometheus-operator/blob/293c16c854ce69d1da9fdc8f0705de2d67bfdbfa/pkg/operator/operator.go#L433
func (w *PrometheusCRWatcher) WaitForNamedCacheSync(controllerName string, inf cache.InformerSynced) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t := time.NewTicker(time.Second * 5)
defer t.Stop()

go func() {
for {
select {
case <-t.C:
w.logger.Debug("cache sync not yet completed")
case <-ctx.Done():
return
case <-w.stopChannel:
w.logger.Warn("stop received, shutting down cache syncing")
cancel()
return
}
}
}()

ok := cache.WaitForNamedCacheSync(controllerName, ctx.Done(), inf)
if !ok {
w.logger.Error("failed to sync cache")
} else {
w.logger.Debug("successfully synced cache")
}

return ok
}
Loading

0 comments on commit 05228b9

Please sign in to comment.