Skip to content

Commit

Permalink
Recreate subscription on receive message error and add pubsub e2e test (
Browse files Browse the repository at this point in the history
#116)

Co-authored-by: Nick Stogner <nicholas.stogner@gmail.com>
  • Loading branch information
samos123 and nstogner authored Jul 11, 2024
1 parent c4dc1da commit a7c24a7
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 87 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Dockerfile
skaffold.yaml
deploy/
tests/
32 changes: 22 additions & 10 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,30 @@ jobs:
run: make test-integration

e2e:
permissions:
contents: 'read'
id-token: 'write'
strategy:
matrix:
replicas: ["1", "3"]
test_cases:
- { requests: 60, expected_replicas: 1 }
- { requests: 300, expected_replicas: 2 }
- { case: "openai", lingo_replicas: "1", requests: "60", expected_replicas: "1" }
- { case: "openai", lingo_replicas: "3", requests: "60", expected_replicas: "1" }
- { case: "openai", lingo_replicas: "1", requests: "300", expected_replicas: "2" }
- { case: "openai", lingo_replicas: "3", requests: "300", expected_replicas: "2" }
- { case: "pubsub"}
runs-on: ubuntu-latest
name: E2E Lingo.replicas=${{ matrix.replicas }} requests=${{ matrix.test_cases.requests }} expected_replicas=${{ matrix.test_cases.expected_replicas }}
name: E2E ${{ toJson(matrix.test_cases) }}
steps:
- name: Checkout code
uses: actions/checkout@v2

- uses: 'google-github-actions/auth@v2'
with:
project_id: 'substratus-integration-tests'
workload_identity_provider: 'projects/489627518739/locations/global/workloadIdentityPools/github-identities/providers/github-com'
create_credentials_file: true
export_environment_variables: true
- name: Set project ID
run: echo "PROJECT_ID=$GCP_PROJECT" >> $GITHUB_ENV
- name: Install kind
run: |
# For AMD64 / x86_64
Expand All @@ -54,9 +66,9 @@ jobs:
chmod +x skaffold
sudo mv skaffold /usr/local/bin
- name: Run e2e tests
- name: Run e2e tests ${{ toJson(matrix.test_cases) }}
env:
REPLICAS: ${{ matrix.replicas }}
REQUESTS: ${{ matrix.test_cases.requests }}
EXPECTED_REPLICAS: ${{ matrix.test_cases.expected_replicas }}
run: make test-e2e
OPENAI_LINGO_REPLICAS: ${{ matrix.test_cases.lingo_replicas }}
OPENAI_REQUEST_COUNT: ${{ matrix.test_cases.requests }}
OPENAI_EXPECTED_REPLICAS: ${{ matrix.test_cases.expected_replicas }}
run: make test-e2e-${{ matrix.test_cases.case }}
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ test-unit:
test-integration: envtest
go clean -testcache; KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./tests/integration -v

# Make sure to set:
# export PROJECT_ID=$(gcloud config get-value project)
# export GCP_PUBSUB_KEYFILE_PATH=/tmp/lingo-test-pubsub-client.keyfile.json
.PHONY: test-e2e
test-e2e:
./tests/e2e/test.sh
test-e2e: test-e2e-openai test-e2e-pubsub

.PHONY: test-e2e-openai
test-e2e-openai:
TEST_CASE=openai_client ./tests/e2e/test.sh

.PHONY: test-e2e-pubsub
test-e2e-pubsub:
TEST_CASE=gcp_pubsub ./tests/e2e/test.sh

## Location to install dependencies to
LOCALBIN ?= $(shell pwd)/bin
Expand Down
5 changes: 5 additions & 0 deletions deploy/lingo_configmap_env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: lingo-env
namespace: default
13 changes: 12 additions & 1 deletion deploy/lingo_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
app: lingo
namespace: default
spec:
replicas: 3
replicas: 1
selector:
matchLabels:
app: lingo
Expand All @@ -24,6 +24,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
envFrom:
- configMapRef:
name: lingo-env
volumeMounts:
- name: secrets
mountPath: /secrets
readinessProbe:
httpGet:
path: /readyz
Expand All @@ -35,3 +41,8 @@ spec:
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
volumes:
- name: secrets
secret:
secretName: lingo-secrets
optional: true
24 changes: 24 additions & 0 deletions hack/create-test-pubsub-keyfile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

set -xe

SA_NAME=${SA_NAME:-lingo-test-pubsub-client}
GCP_PUBSUB_KEYFILE_PATH=${GCP_PUBSUB_KEYFILE_PATH:-/tmp/${SA_NAME}.keyfile.json}

# Create SA if it doesn't exist
if ! gcloud iam service-accounts list | grep -q $SA_NAME; then
gcloud iam service-accounts create $SA_NAME
fi

# Get the email of the SA
sa_email=$(gcloud iam service-accounts list --format="value(email)" --filter="name:$SA_NAME")

# Add PubSub roles
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$sa_email --role=roles/pubsub.publisher
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$sa_email --role=roles/pubsub.subscriber

# Create a keyfile for the SA
if [[ ! -e "$GCP_PUBSUB_KEYFILE_PATH" ]]; then
gcloud iam service-accounts keys create --iam-account=$sa_email $GCP_PUBSUB_KEYFILE_PATH
fi

48 changes: 45 additions & 3 deletions pkg/messenger/messager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -31,8 +32,9 @@ type Messenger struct {
MaxHandlers int
ErrorMaxBackoff time.Duration

requests *pubsub.Subscription
responses *pubsub.Topic
requestsURL string
requests *pubsub.Subscription
responses *pubsub.Topic

consecutiveErrorsMtx sync.RWMutex
consecutiveErrors int
Expand Down Expand Up @@ -64,6 +66,7 @@ func NewMessenger(
Endpoints: endpoints,
Queues: queues,
HTTPC: httpClient,
requestsURL: requestsURL,
requests: requests,
responses: responses,
MaxHandlers: maxHandlers,
Expand All @@ -74,11 +77,50 @@ func NewMessenger(
func (m *Messenger) Start(ctx context.Context) error {
sem := make(chan struct{}, m.MaxHandlers)

var restartAttempt int
const maxRestartAttempts = 20
const maxRestartBackoff = 10 * time.Second

recvLoop:
for {
msg, err := m.requests.Receive(ctx)
if err != nil {
return err
if errors.Is(err, context.Canceled) {
return err
}

if restartAttempt > maxRestartAttempts {
log.Printf("Error receiving message: %v. Restarted subscription %d times, giving up.",
err, restartAttempt)
return err
}

// If there is a non-recoverable error, recreate the
// subscription and continue receiving messages.
// This is important so existing handlers can continue.
log.Printf("Error receiving message: %v", err)
// Shutdown isn't strictly necessary, but it's good practice.
shutdownErr := m.requests.Shutdown(ctx)
if shutdownErr != nil {
log.Printf("Error shutting down requests topic: %v. Continuing to recreate subscription.",
shutdownErr)
}
restartWait := min(time.Duration(restartAttempt)*time.Second, maxRestartBackoff)
log.Printf("Waiting %v before recreating requests subscription %v", restartWait, m.requestsURL)
time.Sleep(restartWait)

var subErr error
m.requests, subErr = pubsub.OpenSubscription(ctx, m.requestsURL)
if subErr != nil {
log.Printf("Error recreating requests subscription %v: %v",
m.requestsURL, subErr)
return subErr
}

restartAttempt++
continue
} else {
restartAttempt = 0
}

log.Println("Received message:", msg.LoggableID)
Expand Down
1 change: 1 addition & 0 deletions skaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ manifests:
rawYaml:
- deploy/backend_deploy.yaml
- deploy/backend_service.yaml
- deploy/lingo_configmap_env.yaml
- deploy/lingo_serviceaccount.yaml
- deploy/lingo_role.yaml
- deploy/lingo_rolebinding.yaml
Expand Down
124 changes: 124 additions & 0 deletions tests/e2e/gcp_pubsub/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env bash

set -xe

# NOTE: The keyfile created by hack/create-test-pubsub-keyfile.sh defaults to:
# GOOGLE_APPLICATION_CREDENTIALS=/tmp/lingo-test-pubsub-client.keyfile.json
if [ -z "$GOOGLE_APPLICATION_CREDENTIALS" ]; then
echo "GOOGLE_APPLICATION_CREDENTIALS not set. Exiting."
exit 1
fi

if [ -z "$PROJECT_ID" ]; then
echo "PROJECT_ID not set. Exiting."
exit 1
fi

random_id=$(openssl rand -hex 2)
test_prefix=lingo-test-${random_id}
requests_topic=${test_prefix}-requests
responses_topic=${test_prefix}-responses
requests_subscription=${test_prefix}-requests-sub
responses_subscription=${test_prefix}-responses-sub

function cleanup() {
kubectl logs -l app=lingo
if [ "$CLEANUP_PUBSUB" != "false" ]; then
if gcloud pubsub subscriptions list | grep -q $requests_subscription; then
gcloud pubsub subscriptions delete $requests_subscription --quiet
fi
if gcloud pubsub subscriptions list | grep -q $responses_subscription; then
gcloud pubsub subscriptions delete $responses_subscription --quiet
fi
if gcloud pubsub topics list | grep -q $requests_topic; then
gcloud pubsub topics delete $requests_topic --quiet
fi
if gcloud pubsub topics list | grep -q $responses_topic; then
gcloud pubsub topics delete $responses_topic --quiet
fi
fi
}

trap cleanup EXIT

# Create pubsub topics
if ! gcloud pubsub topics list | grep -q $requests_topic; then
gcloud pubsub topics create $requests_topic
fi
if ! gcloud pubsub topics list | grep -q $responses_topic; then
gcloud pubsub topics create $responses_topic
fi

# Create pubsub subscriptions
if ! gcloud pubsub subscriptions list | grep -q $requests_subscription; then
gcloud pubsub subscriptions create $requests_subscription --topic=$requests_topic
fi
if ! gcloud pubsub subscriptions list | grep -q $responses_subscription; then
gcloud pubsub subscriptions create $responses_subscription --topic=$responses_topic
fi

# Configure Lingo to use the pubsub topics and subscriptions.
messenger_urls="gcppubsub://projects/$PROJECT_ID/subscriptions/$requests_subscription|gcppubsub://projects/$PROJECT_ID/topics/$responses_topic|10"
configmap_patch=$(cat <<EOF
{"data": {"GOOGLE_APPLICATION_CREDENTIALS": "/secrets/gcp-keyfile.json", "MESSENGER_URLS": "$messenger_urls"}}
EOF
)

kubectl patch configmap lingo-env --patch "$configmap_patch"
kubectl create secret generic lingo-secrets --from-file=gcp-keyfile.json=$GOOGLE_APPLICATION_CREDENTIALS -oyaml --dry-run=client | kubectl apply -f -
kubectl delete pods -l app=lingo
# Wait for pods to become ready again.
kubectl wait --for=condition=ready --timeout=60s pod -l app=lingo

function test_message() {
# Send a request to the requests topic and expect a response on the responses topic.
meta_msg_id=$(date +%s)
msg=$(cat <<EOF
{"path":"/v1/embeddings", "metadata":{"test-id":"$meta_msg_id"}, "body": {"model": "text-embedding-ada-002", "input": "Lingo rocks!"}}
EOF
)
published_msg_id=$(gcloud pubsub topics publish $requests_topic \
--message="$msg" \
--format=json | jq -r '.messageIds[0]'
)
expected_msg=$(cat <<EOF
"metadata":{"test-id":"$meta_msg_id"},"status_code":200
EOF
)

# Wait for the response message to be published to the responses topic.
msg_path="/tmp/substratus-test-pubsub-message-$meta_msg_id.txt"
set +e
for i in {1..120}; do
gcloud pubsub subscriptions pull $responses_subscription --auto-ack --filter="message.attributes.request_message_id = \"$published_msg_id\"" > $msg_path
if [ -s $msg_path ]; then
echo "Received response message"
break
fi
sleep 10
done
set -e
cat $msg_path | grep -q "$expected_msg"
rm $msg_path
}

# Test that Lingo processes a message.

test_message

# Test that Lingo restarts the subscription if an error occurs without Lingo itself restarting
# and goes on to process a message.

gcloud pubsub subscriptions delete $requests_subscription
sleep 60
kubectl logs -l app=lingo --tail=-1 | grep -q 'recreating requests subscription'
gcloud pubsub subscriptions create $requests_subscription --topic=$requests_topic

test_message

# Make sure no restarts occurred.
lingo_restart_count=$(kubectl get pods -l app=lingo -o jsonpath='{.items[*].status.containerStatuses[*].restartCount}')
if [ "$lingo_restart_count" -ne 0 ]; then
echo "Expected 0 restarts, got $lingo_restart_count"
exit 1
fi
Loading

0 comments on commit a7c24a7

Please sign in to comment.