Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support executing start multiple times from same instance of producer #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tiny-dancer
Copy link

@tiny-dancer tiny-dancer commented Sep 6, 2019

Ran into an issue in lambda where every execution after the cold start would fail due to panic: close of closed channel. Therefore leading to a 50% failure rate.

Root cause was defining the producer as a global variable outside the Handler function. This pull request assumes Start() desires idempotency. Please correct me if that is not desired and the appropriate solution is to call .New for every execution within the Handler.

kinesisProducer = producer.New(&producer.Config{
		StreamName:    kinesisStream,
		Client:        kinesisClient,
		BatchCount:    500,
		FlushInterval: 60 * time.Second,
		Logger:        &kplogrus.Logger{Logger: logs}, 
	})

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	kinesisProducer.Start() // <-- always fail after initial cold start

	var wg sync.WaitGroup
	wg.Add(1)

	// concurrently handle failures
	go processPutFailures(&wg)

	processSqsEvent(sqsEvent)

	kinesisProducer.Stop()

	wg.Wait()
        ...
}

@tiny-dancer tiny-dancer mentioned this pull request Oct 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant