Skip to content

Commit

Permalink
Add support for scaling based on Nakadi
Browse files Browse the repository at this point in the history
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
  • Loading branch information
mikkeloscar committed Nov 18, 2023
1 parent ae5c4af commit c1f6369
Show file tree
Hide file tree
Showing 5 changed files with 541 additions and 1 deletion.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,52 @@ you need to define a `key` or other `tag` with a "star" query syntax like
metric label definitions. If both annotations and corresponding label is
defined, then the annotation takes precedence.


## Nakadi collector

The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/)
`consumer_lag_seconds` or `unconsumed_events`.

```yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
metric-config.external.nakadi-consumer.nakadi/interval: "60s" # optional
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: custom-metrics-consumer
minReplicas: 0
maxReplicas: 8 # should match number of partitions for the event type
metrics:
- type: External
external:
metric:
name: nakadi-consumer
selector:
matchLabels:
type: nakadi
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
event-type: "custom.event" # optional field to filter if subscription includes multiple event types.
metric-type: "consumer-lag-seconds|unconsumed-events"
target:
# value is compatible with the consumer-lag-seconds metric type.
# It describes the amount of consumer lag in seconds before scaling
# additionally up.
# if an event-type has multiple partitions the value of
# consumer-lag-seconds is the max of all the partitions.
value: "600" # 10m
# averageValue is compatible with unconsumed-events metric type.
# This means for every 30 unconsumed events a pod is scaled up.
# unconsumed-events is the sum of of unconsumed_events over all
# partitions.
averageValue: "30"
type: AverageValue
```

## HTTP Collector

The http collector allows collecting metrics from an external endpoint specified in the HPA.
Expand Down
124 changes: 124 additions & 0 deletions pkg/collector/nakadi_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package collector

import (
"context"
"fmt"
"time"

"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
// NakadiMetricType defines the metric type for metrics based on Nakadi
// subscriptions.
NakadiMetricType = "nakadi"
nakadiSubscriptionIDKey = "subscription-id"
nakadiEventTypeKey = "event-type"
nakadiMetricTypeKey = "metric-type"
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
)

// NakadiCollectorPlugin defines a plugin for creating collectors that can get
// unconsumed events from Nakadi.
type NakadiCollectorPlugin struct {
nakadi nakadi.Nakadi
}

// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
return &NakadiCollectorPlugin{
nakadi: nakadi,
}, nil
}

// NewCollector initializes a new Nakadi collector from the specified HPA.
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewNakadiCollector(c.nakadi, hpa, config, interval)
}

// NakadiCollector defines a collector that is able to collect metrics from
// Nakadi.
type NakadiCollector struct {
nakadi nakadi.Nakadi
interval time.Duration
subscriptionID string
eventType string
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
}

// NewNakadiCollector initializes a new NakadiCollector.
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for nakadi is not specified")
}

subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
if !ok {
return nil, fmt.Errorf("subscription-id not specified on metric")
}

metricType, ok := config.Config[nakadiMetricTypeKey]
if !ok {
return nil, fmt.Errorf("metric-type not specified on metric")
}

if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
}

return &NakadiCollector{
nakadi: nakadi,
interval: interval,
subscriptionID: subscriptionID,
// this field is optional
eventType: config.Config[nakadiEventTypeKey],
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}, nil
}

// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
var value int
var err error
switch c.nakadiMetricType {
case nakadiMetricTypeConsumerLagSeconds:
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID, c.eventType)
if err != nil {
return nil, err
}
case nakadiMetricTypeUnconsumedEvents:
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID, c.eventType)
if err != nil {
return nil, err
}
}

metricValue := CollectedMetric{
Namespace: c.namespace,
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
}

return []CollectedMetric{metricValue}, nil
}

// Interval returns the interval at which the collector should run.
func (c *NakadiCollector) Interval() time.Duration {
return c.interval
}
144 changes: 144 additions & 0 deletions pkg/nakadi/nakadi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package nakadi

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
)

// Nakadi defines an interface for talking to the Nakadi API.
type Nakadi interface {
ConsumerLagSeconds(ctx context.Context, subscriptionID, eventType string) (int, error)
UnconsumedEvents(ctx context.Context, subscriptionID, eventType string) (int, error)
}

// Client defines client for interfacing with the Nakadi API.
type Client struct {
nakadiEndpoint string
http *http.Client
}

// NewNakadiClient initializes a new Nakadi Client.
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
return &Client{
nakadiEndpoint: nakadiEndpoint,
http: client,
}
}

func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID, eventType string) (int, error) {
stats, err := c.stats(ctx, subscriptionID, eventType)
if err != nil {
return 0, err
}

var maxConsumerLagSeconds int
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
if partition.ConsumerLagSeconds > maxConsumerLagSeconds {
maxConsumerLagSeconds = partition.ConsumerLagSeconds
}
}
}

return maxConsumerLagSeconds, nil
}

func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID, eventType string) (int, error) {
stats, err := c.stats(ctx, subscriptionID, eventType)
if err != nil {
return 0, err
}

var unconsumedEvents int
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
unconsumedEvents += partition.UnconsumedEvents
}
}

return unconsumedEvents, nil
}

type statsResp struct {
Items []statsEventType `json:"items"`
}

type statsEventType struct {
EventType string `json:"event_type"`
Partitions []statsPartition `json:"partitions"`
}

type statsPartition struct {
Partiton string `json:"partition"`
State string `json:"state"`
UnconsumedEvents int `json:"unconsumed_events"`
ConsumerLagSeconds int `json:"consumer_lag_seconds"`
StreamID string `json:"stream_id"`
AssignmentType string `json:"assignment_type"`
}

// stats returns the Nakadi stats for a given subscription ID.
//
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
func (c *Client) stats(ctx context.Context, subscriptionID, eventType string) ([]statsEventType, error) {
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}

endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)

q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()

req, err := http.NewRequest(http.MethodGet, endpoint.String(), nil)
if err != nil {
return nil, err
}

resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}

var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}

var eventTypes []statsEventType
if eventType != "" {
for _, et := range result.Items {
et := et
if eventType == et.EventType {
eventTypes = append(eventTypes, et)
break
}
}
} else {
eventTypes = result.Items
}

if len(eventTypes) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}

return eventTypes, nil
}
Loading

0 comments on commit c1f6369

Please sign in to comment.