-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for scaling based on Nakadi
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
- Loading branch information
1 parent
ae5c4af
commit 8db240c
Showing
5 changed files
with
411 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package collector | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" | ||
autoscalingv2 "k8s.io/api/autoscaling/v2" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
) | ||
|
||
const ( | ||
// NakadiMetricType defines the metric type for metrics based on Nakadi | ||
// subscriptions. | ||
NakadiMetricType = "nakadi" | ||
nakadiSubscriptionIDKey = "subscription-id" | ||
nakadiMetricTypeKey = "metric-type" | ||
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds" | ||
nakadiMetricTypeUnconsumedEvents = "unconsumed-events" | ||
) | ||
|
||
// NakadiCollectorPlugin defines a plugin for creating collectors that can get | ||
// unconsumed events from Nakadi. | ||
type NakadiCollectorPlugin struct { | ||
nakadi nakadi.Nakadi | ||
} | ||
|
||
// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin. | ||
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) { | ||
return &NakadiCollectorPlugin{ | ||
nakadi: nakadi, | ||
}, nil | ||
} | ||
|
||
// NewCollector initializes a new Nakadi collector from the specified HPA. | ||
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { | ||
return NewNakadiCollector(c.nakadi, hpa, config, interval) | ||
} | ||
|
||
// NakadiCollector defines a collector that is able to collect metrics from | ||
// Nakadi. | ||
type NakadiCollector struct { | ||
nakadi nakadi.Nakadi | ||
interval time.Duration | ||
subscriptionID string | ||
nakadiMetricType string | ||
metric autoscalingv2.MetricIdentifier | ||
metricType autoscalingv2.MetricSourceType | ||
namespace string | ||
} | ||
|
||
// NewNakadiCollector initializes a new NakadiCollector. | ||
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { | ||
if config.Metric.Selector == nil { | ||
return nil, fmt.Errorf("selector for nakadi is not specified") | ||
} | ||
|
||
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey] | ||
if !ok { | ||
return nil, fmt.Errorf("subscription-id not specified on metric") | ||
} | ||
|
||
metricType, ok := config.Config[nakadiMetricTypeKey] | ||
if !ok { | ||
return nil, fmt.Errorf("metric-type not specified on metric") | ||
} | ||
|
||
if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents { | ||
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType) | ||
} | ||
|
||
return &NakadiCollector{ | ||
nakadi: nakadi, | ||
interval: interval, | ||
subscriptionID: subscriptionID, | ||
nakadiMetricType: metricType, | ||
metric: config.Metric, | ||
metricType: config.Type, | ||
namespace: hpa.Namespace, | ||
}, nil | ||
} | ||
|
||
// GetMetrics returns a list of collected metrics for the Nakadi subscription ID. | ||
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) { | ||
var value int | ||
var err error | ||
switch c.nakadiMetricType { | ||
case nakadiMetricTypeConsumerLagSeconds: | ||
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
case nakadiMetricTypeUnconsumedEvents: | ||
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
metricValue := CollectedMetric{ | ||
Namespace: c.namespace, | ||
Type: c.metricType, | ||
External: external_metrics.ExternalMetricValue{ | ||
MetricName: c.metric.Name, | ||
MetricLabels: c.metric.Selector.MatchLabels, | ||
Timestamp: metav1.Time{Time: time.Now().UTC()}, | ||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), | ||
}, | ||
} | ||
|
||
return []CollectedMetric{metricValue}, nil | ||
} | ||
|
||
// Interval returns the interval at which the collector should run. | ||
func (c *NakadiCollector) Interval() time.Duration { | ||
return c.interval | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
package nakadi | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
) | ||
|
||
// Nakadi defines an interface for talking to the Nakadi API. | ||
type Nakadi interface { | ||
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int, error) | ||
UnconsumedEvents(ctx context.Context, subscriptionID string) (int, error) | ||
} | ||
|
||
// Client defines client for interfacing with the Nakadi API. | ||
type Client struct { | ||
nakadiEndpoint string | ||
http *http.Client | ||
} | ||
|
||
// NewNakadiClient initializes a new Nakadi Client. | ||
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client { | ||
return &Client{ | ||
nakadiEndpoint: nakadiEndpoint, | ||
http: client, | ||
} | ||
} | ||
|
||
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int, error) { | ||
stats, err := c.stats(ctx, subscriptionID) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
var maxConsumerLagSeconds int | ||
for _, partition := range stats.Partitions { | ||
if partition.ConsumerLagSeconds > maxConsumerLagSeconds { | ||
maxConsumerLagSeconds = partition.ConsumerLagSeconds | ||
} | ||
} | ||
|
||
return maxConsumerLagSeconds, nil | ||
} | ||
|
||
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int, error) { | ||
stats, err := c.stats(ctx, subscriptionID) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
var unconsumedEvents int | ||
for _, partition := range stats.Partitions { | ||
unconsumedEvents += partition.UnconsumedEvents | ||
} | ||
|
||
return unconsumedEvents, nil | ||
} | ||
|
||
type statsResp struct { | ||
Items []statsSubscription `json:"items"` | ||
} | ||
|
||
type statsSubscription struct { | ||
Partitions []statsPartition `json:"partitions"` | ||
} | ||
|
||
type statsPartition struct { | ||
Partiton string `json:"partition"` | ||
State string `json:"state"` | ||
UnconsumedEvents int `json:"unconsumed_events"` | ||
ConsumerLagSeconds int `json:"consumer_lag_seconds"` | ||
StreamID string `json:"stream_id"` | ||
AssignmentType string `json:"assignment_type"` | ||
} | ||
|
||
// stats returns the Nakadi stats for a given subscription ID. | ||
// | ||
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get | ||
func (c *Client) stats(ctx context.Context, subscriptionID string) (*statsSubscription, error) { | ||
endpoint, err := url.Parse(c.nakadiEndpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) | ||
|
||
q := endpoint.Query() | ||
q.Set("show_time_lag", "true") | ||
endpoint.RawQuery = q.Encode() | ||
|
||
req, err := http.NewRequest(http.MethodGet, endpoint.String(), nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
resp, err := c.http.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer resp.Body.Close() | ||
|
||
d, err := io.ReadAll(resp.Body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) | ||
} | ||
|
||
var result statsResp | ||
err = json.Unmarshal(d, &result) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(result.Items) != 1 { | ||
return nil, fmt.Errorf("expected 1 subscription, %d returned", len(result.Items)) | ||
} | ||
|
||
return &result.Items[0], nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package nakadi | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestQuery(tt *testing.T) { | ||
client := &http.Client{} | ||
for _, ti := range []struct { | ||
msg string | ||
status int | ||
responseBody string | ||
err error | ||
unconsumedEvents int | ||
consumerLagSeconds int | ||
}{ | ||
{ | ||
msg: "test getting back a single data point", | ||
status: http.StatusOK, | ||
responseBody: `{ | ||
"items": [ | ||
{ | ||
"event_type": "example-event", | ||
"partitions": [ | ||
{ | ||
"partition": "0", | ||
"state": "assigned", | ||
"unconsumed_events": 4, | ||
"consumer_lag_seconds": 2, | ||
"stream_id": "example-id", | ||
"assignment_type": "auto" | ||
}, | ||
{ | ||
"partition": "0", | ||
"state": "assigned", | ||
"unconsumed_events": 5, | ||
"consumer_lag_seconds": 1, | ||
"stream_id": "example-id", | ||
"assignment_type": "auto" | ||
} | ||
] | ||
} | ||
] | ||
}`, | ||
unconsumedEvents: 9, | ||
consumerLagSeconds: 2, | ||
}, | ||
{ | ||
msg: "test call with invalid response", | ||
status: http.StatusInternalServerError, | ||
responseBody: `{"error": 500}`, | ||
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), | ||
}, | ||
{ | ||
msg: "test getting back a single data point", | ||
status: http.StatusOK, | ||
responseBody: `{ | ||
"items": [] | ||
}`, | ||
err: errors.New("expected 1 subscription, 0 returned"), | ||
}, | ||
} { | ||
tt.Run(ti.msg, func(t *testing.T) { | ||
ts := httptest.NewServer(http.HandlerFunc( | ||
func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(ti.status) | ||
_, err := w.Write([]byte(ti.responseBody)) | ||
assert.NoError(t, err) | ||
}), | ||
) | ||
defer ts.Close() | ||
|
||
nakadiClient := NewNakadiClient(ts.URL, client) | ||
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id") | ||
assert.Equal(t, ti.err, err) | ||
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds) | ||
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id") | ||
assert.Equal(t, ti.err, err) | ||
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents) | ||
}) | ||
} | ||
|
||
} |
Oops, something went wrong.