Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for scaling based on Nakadi #636

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,92 @@ you need to define a `key` or other `tag` with a "star" query syntax like
metric label definitions. If both annotations and corresponding label is
defined, then the annotation takes precedence.


## Nakadi collector

The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/)
`consumer_lag_seconds` or `unconsumed_events`.
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved

### Supported metrics

| Metric Type | Description | Type | K8s Versions |
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved
| ------------ | ------- | -- | -- |
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi subscription | External | `>=1.24` |
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi subscription | External | `>=1.24` |
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved

```yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
# metric-config.<metricType>.<metricName>.<collectorType>/<configKey>
metric-config.external.my-nakadi-consumer.nakadi/interval: "60s" # optional
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: custom-metrics-consumer
minReplicas: 0
maxReplicas: 8 # should match number of partitions for the event type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can improve it in the future that autoscaler does that on its own. it is possible to know number of partitions for Subscription

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need some abstraction on top of HPA to hide this setting for users. Not saying we should not, but it's more complex than what we can implement with the native resource.

metrics:
- type: External
external:
metric:
name: my-nakadi-consumer
selector:
matchLabels:
type: nakadi
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
Copy link
Member

@AlexanderYastrebov AlexanderYastrebov Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think subscription is uniquely identified by the owning_application, event_types and consumer_group https://nakadi.io/manual.html#creating-subscriptions so maybe (as a future improvement) this metric handler could also support these three fields and figure out subscription id by listing subscriptions https://nakadi.io/manual.html#getting-and-listing-subscriptions

This could be handy to define the same logical subscription (same owning_application, event_types and consumer_group) in staging and production environments (instead of templating subscription-id value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jFYI: this could be a feature improvement but not prioritized for now. But it's simple to add later even if we don't do it in the first iteration.

metric-type: "consumer-lag-seconds|unconsumed-events"
target:
# value is compatible with the consumer-lag-seconds metric type.
# It describes the amount of consumer lag in seconds before scaling
# additionally up.
# if an event-type has multiple partitions the value of
# consumer-lag-seconds is the max of all the partitions.
value: "600" # 10m
# averageValue is compatible with unconsumed-events metric type.
# This means for every 30 unconsumed events a pod is scaled up.
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved
# unconsumed-events is the sum of of unconsumed_events over all
# partitions.
averageValue: "30"
type: AverageValue
```

The `subscription-id` is the ID of the relevant consumer subscription. The
mikkeloscar marked this conversation as resolved.
Show resolved Hide resolved
`metric-type` indicates whether to scale on `consumer-lag-seconds` or
`unconsumed-events` as outlined below.

`unconsumed-events` - is the total number of unconsumed events over all
partitions. When using this `metric-type` you should also use the target
`averageValue` which indicates the number of events which can be handled per
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that really per pod or per the whole the stack? so if I have 2 pods and average number goes beyond 30 then a pod is added. it means it is 30 per 2 pods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is how you define it. The autoscaler will always scale higher rather than lower. So if the averageTarget is 30 and you have 31 unconsumed events, it will scale to 2 pods, but it will not scale higher until 61 unconsumed events where it then needs 3 pods.

pod. To best estimate the number of events per pods, you need to understand the
average time for processing an event as well as the rate of events.

*Example*: You have an event type producing 100 events per second between 00:00
and 08:00. Between 08:01 to 23:59 it produces 400 events per second.
Let's assume that on average a single pod can consume 100 events per second,
then we can define 100 as `averageValue` and the HPA would scale to 1 between
00:00 and 08:00, and scale to 4 between 08:01 and 23:59. If there for some
reason is a short spike of 800 events per second, then it would scale to 8 pods
to process those events until the rate goes down again.

`consumer-lag-seconds` - describes the age of the oldest unconsumed event for
a subscription. If the event type has multiple partitions the lag is defined as
the max age over all partitions. When using this `metric-type` you should use
the target `value` to indicate the max lag (in seconds) before the HPA should
scale.

*Example*: You have a subscription with a defined SLO of "99.99 of events are
consumed within 30 min.". In this case you can define a target `value` of e.g.
20 min. (1200s) (to include a safety buffer) such that the HPA only scales up
from 1 to 2 if the target of 20 min. is breached and it needs to work faster
with more consumers.
For this case you should also account for the average time for processing an
event when defining the target.


## HTTP Collector

The http collector allows collecting metrics from an external endpoint specified in the HPA.
Expand Down
120 changes: 120 additions & 0 deletions pkg/collector/nakadi_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package collector

import (
"context"
"fmt"
"time"

"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
// NakadiMetricType defines the metric type for metrics based on Nakadi
// subscriptions.
NakadiMetricType = "nakadi"
nakadiSubscriptionIDKey = "subscription-id"
nakadiMetricTypeKey = "metric-type"
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
)

// NakadiCollectorPlugin defines a plugin for creating collectors that can get
// unconsumed events from Nakadi.
type NakadiCollectorPlugin struct {
nakadi nakadi.Nakadi
}

// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
return &NakadiCollectorPlugin{
nakadi: nakadi,
}, nil
}

// NewCollector initializes a new Nakadi collector from the specified HPA.
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewNakadiCollector(c.nakadi, hpa, config, interval)
}

// NakadiCollector defines a collector that is able to collect metrics from
// Nakadi.
type NakadiCollector struct {
nakadi nakadi.Nakadi
interval time.Duration
subscriptionID string
nakadiMetricType string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
namespace string
}

// NewNakadiCollector initializes a new NakadiCollector.
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for nakadi is not specified")
}

subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
if !ok {
return nil, fmt.Errorf("subscription-id not specified on metric")
}

metricType, ok := config.Config[nakadiMetricTypeKey]
if !ok {
return nil, fmt.Errorf("metric-type not specified on metric")
}

if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
}

return &NakadiCollector{
nakadi: nakadi,
interval: interval,
subscriptionID: subscriptionID,
nakadiMetricType: metricType,
metric: config.Metric,
metricType: config.Type,
namespace: hpa.Namespace,
}, nil
}

// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
var value int64
var err error
switch c.nakadiMetricType {
case nakadiMetricTypeConsumerLagSeconds:
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
case nakadiMetricTypeUnconsumedEvents:
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID)
if err != nil {
return nil, err
}
}

metricValue := CollectedMetric{
Namespace: c.namespace,
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Now(),
Value: *resource.NewQuantity(value, resource.DecimalSI),
},
}

return []CollectedMetric{metricValue}, nil
}

// Interval returns the interval at which the collector should run.
func (c *NakadiCollector) Interval() time.Duration {
return c.interval
}
124 changes: 124 additions & 0 deletions pkg/nakadi/nakadi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package nakadi

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
)

// Nakadi defines an interface for talking to the Nakadi API.
type Nakadi interface {
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
}

// Client defines client for interfacing with the Nakadi API.
type Client struct {
nakadiEndpoint string
http *http.Client
}

// NewNakadiClient initializes a new Nakadi Client.
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
return &Client{
nakadiEndpoint: nakadiEndpoint,
http: client,
}
}

func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}

var maxConsumerLagSeconds int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
maxConsumerLagSeconds = max(maxConsumerLagSeconds, partition.ConsumerLagSeconds)
}
}

return maxConsumerLagSeconds, nil
}

func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
stats, err := c.stats(ctx, subscriptionID)
if err != nil {
return 0, err
}

var unconsumedEvents int64
for _, eventType := range stats {
for _, partition := range eventType.Partitions {
unconsumedEvents += partition.UnconsumedEvents
}
}

return unconsumedEvents, nil
}

type statsResp struct {
Items []statsEventType `json:"items"`
}

type statsEventType struct {
EventType string `json:"event_type"`
Partitions []statsPartition `json:"partitions"`
}

type statsPartition struct {
Partiton string `json:"partition"`
State string `json:"state"`
UnconsumedEvents int64 `json:"unconsumed_events"`
ConsumerLagSeconds int64 `json:"consumer_lag_seconds"`
StreamID string `json:"stream_id"`
AssignmentType string `json:"assignment_type"`
}

// stats returns the Nakadi stats for a given subscription ID.
//
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
endpoint, err := url.Parse(c.nakadiEndpoint)
if err != nil {
return nil, err
}

endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)

q := endpoint.Query()
q.Set("show_time_lag", "true")
endpoint.RawQuery = q.Encode()

resp, err := c.http.Get(endpoint.String())
if err != nil {
return nil, err
}
defer resp.Body.Close()

d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
}

var result statsResp
err = json.Unmarshal(d, &result)
if err != nil {
return nil, err
}

if len(result.Items) == 0 {
return nil, errors.New("expected at least 1 event-type, 0 returned")
}

return result.Items, nil
}
Loading
Loading