Skip to content

Commit

Permalink
Messaging integration (GCP PubSub, AWS SQS, Kafka, etc) (#88)
Browse files Browse the repository at this point in the history
Add messaging integration (consume requests and produce responses via a
messaging system).

Implemented via [gocloud](https://gocloud.dev/) package to allow for
future cross-cloud support.

Also refactors configuration to use environment variables exclusively.

Fixes #86

---------

Co-authored-by: Sam Stoelinga <sammiestoel@gmail.com>
  • Loading branch information
nstogner and samos123 authored Apr 1, 2024
1 parent 48df469 commit 66cc783
Show file tree
Hide file tree
Showing 12 changed files with 1,147 additions and 140 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Start from the latest golang base image
FROM golang:1.21 as builder
FROM golang:1.22 as builder

# Set the Current Working Directory inside the container
WORKDIR /app
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test-unit:

.PHONY: test-integration
test-integration: envtest
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./tests/integration -v
go clean -testcache; KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./tests/integration -v

.PHONY: test-e2e
test-e2e:
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Lingo

Lingo is a lightweight, scale-from-zero ML model proxy and that runs on Kubernetes. Lingo allows you to run text-completion and embedding servers in your own project without changing any of your OpenAI client code.
Lingo is a lightweight, scale-from-zero ML model proxy and that runs on Kubernetes. Lingo allows you to run text-completion and embedding servers in your own project without changing any of your OpenAI client code. Large scale batch processing is as simple as configuring Lingo to consume-from and publish-to your favorite messaging system (AWS SQS, GCP PubSub, Azure Service Bus, Kafka, and more).

🚀 Serve OSS LLMs on CPUs or GPUs
✅️ Compatible with the OpenAI API
✉️ Plug-and-play with most messaging systems (Kafka, etc.)
⚖️ Scale from zero, autoscale based on load
… Queue requests to avoid overloading models
🛠️ Zero dependencies (no Istio, Knative, etc.)
Expand Down
137 changes: 99 additions & 38 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -20,10 +20,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/sethvargo/go-envconfig"
"github.com/substratusai/lingo/pkg/autoscaler"
"github.com/substratusai/lingo/pkg/deployments"
"github.com/substratusai/lingo/pkg/endpoints"
"github.com/substratusai/lingo/pkg/leader"
"github.com/substratusai/lingo/pkg/messenger"
"github.com/substratusai/lingo/pkg/proxy"
"github.com/substratusai/lingo/pkg/queue"
"github.com/substratusai/lingo/pkg/stats"
Expand All @@ -48,35 +50,9 @@ func main() {
}
}

func getEnvInt(key string, defaultValue int) int {
if envVar := os.Getenv(key); envVar != "" {
val, err := strconv.Atoi(envVar)
if err != nil {
log.Fatalf("invalid value for %s: %v", key, err)
}
return val
}
return defaultValue
}

func run() error {
namespace := os.Getenv("NAMESPACE")
if namespace == "" {
namespace = "default"
}

concurrency := getEnvInt("CONCURRENCY", 100)
scaleDownDelay := getEnvInt("SCALE_DOWN_DELAY", 30)
backendRetries := getEnvInt("BACKEND_RETRIES", 1)

var metricsAddr string
var probeAddr string
var concurrencyPerReplica int

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.IntVar(&concurrencyPerReplica, "concurrency", concurrency, "the number of simultaneous requests that can be processed by each replica")
flag.IntVar(&scaleDownDelay, "scale-down-delay", scaleDownDelay, "seconds to wait before scaling down")
// Flags are only used to control logging.
// TODO: Migrate to env variables.
opts := zap.Options{
Development: true,
}
Expand All @@ -85,18 +61,66 @@ func run() error {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

var cfg struct {
Namespace string `env:"NAMESPACE, default=default"`

// Concurrency per replica.
Concurrency int `env:"CONCURRENCY, default=100"`

ScaleDownDelay int `env:"SCALE_DOWN_DELAY, default=30"`
BackendRetries int `env:"BACKEND_RETRIES, default=1"`

// MessengerURLs is a list of (comma-separated) URLs to listen for requests and send responses on.
//
// Format: <request-subscription1>|<response-topic1>,<request-subscription2>|<response-topic2>,...
//
// Examples:
//
// Google PubSub: "gcppubsub://projects/my-project/subscriptions/my-subscription|gcppubsub://projects/myproject/topics/mytopic"
// Amazon SQS-to-SQS: "awssqs://sqs.us-east-2.amazonaws.com/123456789012/myqueue1?region=us-east-2|awssqs://sqs.us-east-2.amazonaws.com/123456789012/myqueue2?region=us-east-2"
// Amazon SQS-to-SNS: "awssqs://sqs.us-east-2.amazonaws.com/123456789012/myqueue1?region=us-east-2|awssns:///arn:aws:sns:us-east-2:123456789012:mytopic?region=us-east-2"
// (NOTE: 3 slashes for SNS)
// Azure Service Bus: "azuresb://mytopic1?subscription=mysubscription|azuresb://mytopic2"
// Rabbit MQ: "rabbit://myqueue|rabbit://myexchange"
// NATS: "nats://example.mysubject1|nats://example.mysubject2"
// Kafka: "kafka://my-group?topic=my-topic1|kafka://my-topic2"
MessengerURLs []string `env:"MESSENGER_URLS"`

MetricsBindAddress string `env:"METRICS_BIND_ADDRESS, default=:8082"`
HealthProbeBindAddress string `env:"HEALTH_PROBE_BIND_ADDRESS, default=:8081"`
}
if err := envconfig.Process(context.Background(), &cfg); err != nil {
return fmt.Errorf("parsing environment variables: %w", err)
}

type messengerURLPair struct {
requests string
responses string
}
var messengerURLPairs []messengerURLPair
for _, s := range cfg.MessengerURLs {
parts := strings.Split(s, "|")
if len(parts) != 2 {
return fmt.Errorf("invalid subscription URL: %q", s)
}
messengerURLPairs = append(messengerURLPairs, messengerURLPair{
requests: parts[0],
responses: parts[1],
})
}

// TODO: Add Deployments to cache list.
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
namespace: {},
cfg.Namespace: {},
},
},
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
BindAddress: cfg.MetricsBindAddress,
},
HealthProbeBindAddress: probeAddr,
HealthProbeBindAddress: cfg.HealthProbeBindAddress,
// LeaderElection is done in the Autoscaler.
LeaderElection: false,
})
Expand All @@ -114,9 +138,9 @@ func run() error {
if err != nil {
return fmt.Errorf("getting hostname: %v", err)
}
le := leader.NewElection(clientset, hostname, namespace)
le := leader.NewElection(clientset, hostname, cfg.Namespace)

queueManager := queue.NewManager(concurrencyPerReplica)
queueManager := queue.NewManager(cfg.Concurrency)
metricsRegistry := prometheus.WrapRegistererWithPrefix("lingo_", metrics.Registry)
queue.NewMetricsCollector(queueManager).MustRegister(metricsRegistry)

Expand All @@ -133,8 +157,8 @@ func run() error {
if err != nil {
return fmt.Errorf("setting up deloyment manager: %w", err)
}
deploymentManager.Namespace = namespace
deploymentManager.ScaleDownPeriod = time.Duration(scaleDownDelay) * time.Second
deploymentManager.Namespace = cfg.Namespace
deploymentManager.ScaleDownPeriod = time.Duration(cfg.ScaleDownDelay) * time.Second
deployments.NewMetricsCollector(deploymentManager).MustRegister(metricsRegistry)
if err := mgr.AddReadyzCheck("readyz", deploymentManager.ReadinessChecker); err != nil {
return fmt.Errorf("setup readiness handler: %w", err)
Expand All @@ -148,21 +172,41 @@ func run() error {
autoscaler.AverageCount = 10 // 10 * 3 seconds = 30 sec avg
autoscaler.LeaderElection = le
autoscaler.Deployments = deploymentManager
autoscaler.ConcurrencyPerReplica = concurrencyPerReplica
autoscaler.ConcurrencyPerReplica = cfg.Concurrency
autoscaler.Queues = queueManager
autoscaler.Endpoints = endpointManager
go autoscaler.Start()

proxy.MustRegister(metricsRegistry)
proxyHandler := proxy.NewHandler(deploymentManager, endpointManager, queueManager)
proxyHandler.MaxRetries = backendRetries
proxyHandler.MaxRetries = cfg.BackendRetries
proxyServer := &http.Server{Addr: ":8080", Handler: proxyHandler}

statsHandler := &stats.Handler{
Queues: queueManager,
}
statsServer := &http.Server{Addr: ":8083", Handler: statsHandler}

httpClient := &http.Client{
Timeout: 30 * time.Minute,
}
var msgrs []*messenger.Messenger
for i, msgURL := range messengerURLPairs {
msgr, err := messenger.NewMessenger(
ctx,
msgURL.requests,
msgURL.responses,
deploymentManager,
endpointManager,
queueManager,
httpClient,
)
if err != nil {
return fmt.Errorf("creating messenger[%d]: %w", i, err)
}
msgrs = append(msgrs, msgr)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -190,6 +234,16 @@ func run() error {
os.Exit(1)
}
}()
for i := range msgrs {
go func() {
setupLog.Info("Starting messenger", "index", i)
err := msgrs[i].Start(ctx)
if err != nil {
setupLog.Error(err, "starting messenger")
os.Exit(1)
}
}()
}
defer func() {
setupLog.Info("waiting on manager to stop")
wg.Wait()
Expand All @@ -207,5 +261,12 @@ func run() error {
return fmt.Errorf("listen and serve: %w", err)
}

for i := range msgrs {
// TODO: Investigate if in-progress message handling will exit cleanly.
// One concern is that responses might not be sent and messages may hang
// in an un-Ack'd state if the context is cancelled.
msgrs[i].Stop(ctx)
}

return nil
}
Loading

0 comments on commit 66cc783

Please sign in to comment.