Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: context cleanup and graceful shutdown #16

Merged
merged 2 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,9 @@ https://lucid.app/documents/view/c9b677a7-2b60-4bec-89c7-43d10e2a262e

https://lucid.app/documents/view/9fb65517-add6-48f5-a35b-e8c5835a9762

## TODOs (roughly ordered by priority)
## TODOs

### Major

- [ ] Metrics
- [ ] Performance tests
- [ ] Clean up logging in `go-sqs` and `go-cas`
- [ ] Graceful shutdown
- [ ] Prepare Anchor Worker code to post anchor results to a queue

### Minor

- [ ] Clarify contexts being used in various spots - operation ctx vs. server ctx

### Development

Expand Down
2 changes: 1 addition & 1 deletion ci/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func pushImage(ctx context.Context, client *dagger.Client, container *dagger.Con
}

func getEcrToken(ctx context.Context) string {
awsCfg, err := config.AwsConfig()
awsCfg, err := config.AwsConfig(ctx)
if err != nil {
log.Fatalf("build: error creating aws cfg: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion ci/cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func createJob(ctx context.Context) error {
}
}); err != nil {
return err
} else if awsCfg, err := config.AwsConfig(); err != nil {
} else if awsCfg, err := config.AwsConfig(ctx); err != nil {
return err
} else {
envTag := os.Getenv(Env_EnvTag)
Expand Down
122 changes: 83 additions & 39 deletions cmd/cas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"

"github.com/joho/godotenv"

Expand All @@ -31,9 +33,12 @@ func main() {
}
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)

awsCfg, err := config.AwsConfig()
// Set up a server context
serverCtx, serverCtxCancel := context.WithCancel(context.Background())

awsCfg, err := config.AwsConfig(serverCtx)
if err != nil {
log.Fatalf("error creating aws cfg: %v", err)
log.Fatalf("main: error creating aws cfg: %v", err)
}

anchorDb := db.NewAnchorDb(db.AnchorDbOpts{
Expand All @@ -49,31 +54,30 @@ func main() {
dbAwsCfg := awsCfg
stateDbEndpoint := os.Getenv("DB_AWS_ENDPOINT")
if len(stateDbEndpoint) > 0 {
log.Printf("using custom state db endpoint: %s", stateDbEndpoint)
dbAwsCfg, err = config.AwsConfigWithOverride(stateDbEndpoint)
log.Printf("main: using custom state db endpoint: %s", stateDbEndpoint)
dbAwsCfg, err = config.AwsConfigWithOverride(serverCtx, stateDbEndpoint)
if err != nil {
log.Fatalf("failed to create aws cfg: %v", err)
log.Fatalf("main: failed to create aws cfg: %v", err)
}
}

// HTTP clients
dynamoDbClient := dynamodb.NewFromConfig(dbAwsCfg)
sqsClient := sqs.NewFromConfig(awsCfg)

stateDb := ddb.NewStateDb(dynamoDbClient)
jobDb := ddb.NewJobDb(dynamoDbClient)
stateDb := ddb.NewStateDb(serverCtx, dynamoDbClient)
jobDb := ddb.NewJobDb(serverCtx, dynamoDbClient)

discordHandler, err := notifs.NewDiscordHandler()
if err != nil {
log.Fatalf("failed to create discord handler: %v", err)
log.Fatalf("main: failed to create discord handler: %v", err)
}

collectorHost := os.Getenv("COLLECTOR_HOST")
metricService, err := metrics.NewMetricService(context.Background(), collectorHost)
metricService, err := metrics.NewMetricService(serverCtx, collectorHost)
if err != nil {
log.Fatalf("failed to create metric service: %v", err)
log.Fatalf("main: failed to create metric service: %v", err)
}
defer metricService.Shutdown(context.Background())

// Flow:
// ====
Expand All @@ -92,66 +96,106 @@ func main() {
// Queue publishers

// Create the DLQ and prepare the redrive policy for the other queues
deadLetterQueue, err := queue.NewPublisher(queue.QueueType_DLQ, sqsClient, nil)
deadLetterQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_DLQ, sqsClient, nil)
if err != nil {
log.Fatalf("failed to create dead-letter queue: %v", err)
log.Fatalf("main: failed to create dead-letter queue: %v", err)
}
dlqArn, err := queue.GetQueueArn(deadLetterQueue.QueueUrl, sqsClient)
dlqArn, err := queue.GetQueueArn(serverCtx, deadLetterQueue.QueueUrl, sqsClient)
if err != nil {
log.Fatalf("failed to fetch dead-letter queue arn: %v", err)
log.Fatalf("main: failed to fetch dead-letter queue arn: %v", err)
}
redrivePolicy := &queue.QueueRedrivePolicy{
DeadLetterTargetArn: dlqArn,
MaxReceiveCount: queue.QueueMaxReceiveCount,
}
validateQueue, err := queue.NewPublisher(queue.QueueType_Validate, sqsClient, redrivePolicy)
validateQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_Validate, sqsClient, redrivePolicy)
if err != nil {
log.Fatalf("failed to create validate queue: %v", err)
log.Fatalf("main: failed to create validate queue: %v", err)
}
readyQueue, err := queue.NewPublisher(queue.QueueType_Ready, sqsClient, redrivePolicy)
readyQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_Ready, sqsClient, redrivePolicy)
if err != nil {
log.Fatalf("failed to create ready queue: %v", err)
log.Fatalf("main: failed to create ready queue: %v", err)
}
batchQueue, err := queue.NewPublisher(queue.QueueType_Batch, sqsClient, redrivePolicy)
batchQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_Batch, sqsClient, redrivePolicy)
if err != nil {
log.Fatalf("failed to create batch queue: %v", err)
log.Fatalf("main: failed to create batch queue: %v", err)
}
statusQueue, err := queue.NewPublisher(queue.QueueType_Status, sqsClient, redrivePolicy)
statusQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_Status, sqsClient, redrivePolicy)
if err != nil {
log.Fatalf("failed to create status queue: %v", err)
log.Fatalf("main: failed to create status queue: %v", err)
}
// TODO: Could this become recursive since the failure handler also consumes from the DLQ? The inability to handle
// failures could put messages back in the DLQ that are then re-consumed by the handler.
failureQueue, err := queue.NewPublisher(queue.QueueType_Failure, sqsClient, redrivePolicy)
failureQueue, err := queue.NewPublisher(serverCtx, queue.QueueType_Failure, sqsClient, redrivePolicy)
if err != nil {
log.Fatalf("failed to create failure queue: %v", err)
log.Fatalf("main: failed to create failure queue: %v", err)
}
// Start the queue consumers. These consumers will be responsible for scaling event processing up based on load, and
// also maintaining backpressure on the queues.

// Create the queue consumers. These consumers will be responsible for scaling event processing up based on load,
// and also maintaining backpressure on the queues.

// The Failure handling Service reads from the Failure and Dead-Letter queues
failureHandlingService := services.NewFailureHandlingService(discordHandler)
queue.NewConsumer(failureQueue, failureHandlingService.Failure).Start()
queue.NewConsumer(deadLetterQueue, failureHandlingService.DLQ).Start()
dlqConsumer := queue.NewConsumer(deadLetterQueue, failureHandlingService.DLQ)
failureConsumer := queue.NewConsumer(failureQueue, failureHandlingService.Failure)

// The Status Service reads from the Status queue and updates the Anchor DB
statusService := services.NewStatusService(anchorDb)
queue.NewConsumer(statusQueue, statusService.Status).Start()
statusConsumer := queue.NewConsumer(statusQueue, statusService.Status)

// The Batching Service reads from the Ready queue and posts to the Batch queue
batchingService := services.NewBatchingService(batchQueue, metricService)
queue.NewConsumer(readyQueue, batchingService.Batch).Start()
batchingService := services.NewBatchingService(serverCtx, batchQueue, metricService)
batchingConsumer := queue.NewConsumer(readyQueue, batchingService.Batch)

// The Validation Service reads from the Validate queue and posts to the Ready and Status queues
validationService := services.NewValidationService(stateDb, readyQueue, statusQueue, metricService)
queue.NewConsumer(validateQueue, validationService.Validate).Start()
validationConsumer := queue.NewConsumer(validateQueue, validationService.Validate)

// Start the polling services last
wg := sync.WaitGroup{}
wg.Add(1)
// Monitor the Batch queue and spawn anchor workers accordingly
go services.NewWorkerService(queue.NewMonitor(batchQueue.QueueUrl, sqsClient), jobDb).Run(context.Background())
// Poll the Anchor DB for unprocessed requests and post to the Validate queue
// go services.NewRequestPoller(anchorDb, stateDb, validateQueue).Run(context.Background())
wg.Add(2)

// Set up graceful shutdown
go func() {
defer wg.Done()

interruptCh := make(chan os.Signal, 1)
signal.Notify(interruptCh, syscall.SIGTERM)
<-interruptCh
log.Println("main: shutdown started")

// Shut down services in the order in which data goes through the pipeline:
// - validation
// - batching
// - status updates
// - failure handling
// - DLQ
validationConsumer.Shutdown()
batchingConsumer.Shutdown()
statusConsumer.Shutdown()
failureConsumer.Shutdown()
dlqConsumer.Shutdown()

// Flush metrics
metricService.Shutdown(serverCtx)

// Cancel the server context
serverCtxCancel()

log.Println("main: shutdown complete")
}()

// Start pipeline components in the opposite order in which data goes through
go func() {
defer wg.Done()
// Monitor the Batch queue and spawn anchor workers accordingly
services.NewWorkerService(queue.NewMonitor(batchQueue.QueueUrl, sqsClient), jobDb).Run(serverCtx)
}()

dlqConsumer.Start()
failureConsumer.Start()
statusConsumer.Start()
batchingConsumer.Start()
validationConsumer.Start()

wg.Wait()
}
24 changes: 13 additions & 11 deletions common/aws/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,31 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

func AwsConfigWithOverride(customEndpoint string) (aws.Config, error) {
ctx, cancel := context.WithTimeout(context.Background(), models.DefaultHttpWaitTime)
defer cancel()

func AwsConfigWithOverride(ctx context.Context, customEndpoint string) (aws.Config, error) {
endpointResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: customEndpoint,
SigningRegion: os.Getenv("AWS_REGION"),
}, nil
})
return config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(endpointResolver))

httpCtx, httpCancel := context.WithTimeout(ctx, models.DefaultHttpWaitTime)
defer httpCancel()

return config.LoadDefaultConfig(httpCtx, config.WithEndpointResolverWithOptions(endpointResolver))
}

func AwsConfig() (aws.Config, error) {
func AwsConfig(ctx context.Context) (aws.Config, error) {
awsEndpoint := os.Getenv("AWS_ENDPOINT")
if len(awsEndpoint) > 0 {
log.Printf("config: using custom global aws endpoint: %s", awsEndpoint)
return AwsConfigWithOverride(awsEndpoint)
return AwsConfigWithOverride(ctx, awsEndpoint)
}
// Load the default configuration
ctx, cancel := context.WithTimeout(context.Background(), models.DefaultHttpWaitTime)
defer cancel()

return config.LoadDefaultConfig(ctx, config.WithRegion(os.Getenv("AWS_REGION")))
httpCtx, httpCancel := context.WithTimeout(ctx, models.DefaultHttpWaitTime)
defer httpCancel()

// Load the default configuration
return config.LoadDefaultConfig(httpCtx, config.WithRegion(os.Getenv("AWS_REGION")))
}
16 changes: 8 additions & 8 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ type JobDatabase struct {
jobTable string
}

func NewJobDb(ddbClient *dynamodb.Client) *JobDatabase {
func NewJobDb(ctx context.Context, ddbClient *dynamodb.Client) *JobDatabase {
jobTable := "ceramic-" + os.Getenv("ENV") + "-ops"
jdb := JobDatabase{ddbClient, jobTable}
if err := jdb.createJobTable(); err != nil {
if err := jdb.createJobTable(ctx); err != nil {
log.Fatalf("job: table creation failed: %v", err)
}
return &jdb
}

func (jdb *JobDatabase) createJobTable() error {
func (jdb *JobDatabase) createJobTable(ctx context.Context) error {
createJobTableInput := dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
{
Expand Down Expand Up @@ -58,10 +58,10 @@ func (jdb *JobDatabase) createJobTable() error {
WriteCapacityUnits: aws.Int64(1),
},
}
return createTable(jdb.ddbClient, &createJobTableInput)
return createTable(ctx, jdb.ddbClient, &createJobTableInput)
}

func (jdb *JobDatabase) CreateJob() error {
func (jdb *JobDatabase) CreateJob(ctx context.Context) error {
newJob := map[string]interface{}{
models.JobParam_Id: uuid.New().String(),
models.JobParam_Ts: time.Now(),
Expand All @@ -79,10 +79,10 @@ func (jdb *JobDatabase) CreateJob() error {
if err != nil {
return err
} else {
ctx, cancel := context.WithTimeout(context.Background(), models.DefaultHttpWaitTime)
defer cancel()
httpCtx, httpCancel := context.WithTimeout(ctx, models.DefaultHttpWaitTime)
defer httpCancel()

_, err = jdb.ddbClient.PutItem(ctx, &dynamodb.PutItemInput{
_, err = jdb.ddbClient.PutItem(httpCtx, &dynamodb.PutItemInput{
TableName: aws.String(jdb.jobTable),
Item: attributeValues,
})
Expand Down
Loading