Skip to content

A Go broker module that integrates with Apache Kafka, Redis, and AWS SNS/SQS.

Notifications You must be signed in to change notification settings

rafaelsouzaribeiro/golang-broker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

how to use aws sns and sqs?


install aws cli

run docker container:

sudo docker pull localstack/localstack
sudo docker container run -it -d -p 4566:4566 localstack/localstack start

How can I set up the local development environment?


aws configure
AWS Access Key ID [None]: fakeAccessKeyId
AWS Secret Access Key [None]: fakeSecretAccessKey
Default region name [us-east-1]: us-east-1
Default output format [None]: json

aws configure --profile localstack
AWS Access Key ID [None]: nome_perfil_novo
AWS Secret Access Key [None]: senha_perfil_novo
Default region name [None]: us-east-1
Default output format [None]: json

How to create topic and queue?


Create topic:

aws --endpoint-url=http://localhost:4566 sns create-topic --name my-topic

Create queue

aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue --region us-east-1

Create QueueArn

aws --endpoint-url=http://localhost:4566 sqs get-queue-attributes --queue-url http://localhost:4566/000000000000/my-queue --attribute-names QueueArn --region us-east-1

Subscribe SNS Topic to SQS Queue Endpoint

aws --endpoint-url=http://localhost:4566 sns subscribe --topic-arn $TOPIC_ARN --protocol sqs --notification-endpoint $QUEUE_ARN --region us-east-1

To use SQN and SNS, follow the code below:

Sqs:
configs := payload.SNSSQSMessage{
	Endpoint: aws.String("http://localhost:4566"),
	Region:   aws.String("us-east-1"),
	QueueURL: "http://localhost:4566/000000000000/my-queue",
}

messageChan := make(chan payload.SNSSQSMessage)

factory := factory.ISQSSNSBroker(&configs)
go factory.Receive(messageChan)

for message := range messageChan {
	fmt.Printf("Received message: %s Message Id: %s Topic: %s Time: %s\n",
		message.Message, message.MessageId, message.TopicArn, message.Timestamp)
}

select {}
	

SNS:

configs := payload.SNSSQSMessage{
	Endpoint: aws.String("http://localhost:4566"),
	Region:   aws.String("us-east-1"),
	Message:  "Message Test",
	TopicArn: "arn:aws:sns:us-east-1:000000000000:my-topic",
}

var wg sync.WaitGroup
wg.Add(1)

factory := factory.ISQSSNSBroker(&configs)

go func() {
	factory.Send()
	wg.Done()
}()

wg.Wait()

To use apache Kafka, follow the code below:


Consumer:
func main() {

	data := payload.Message{
		Topics:    &[]string{"contact-adm-insert", "testar"},
		Topic:     "contact-adm-insert",
		GroupID:   "contacts",
		Partition: 0,
		Offset:    -1,
	}
	canal := make(chan payload.Message)
	broker := factory.NewBroker(factory.Kafka, "springboot:9092")
	go broker.Consumer(&data, canal)
	go broker.ListenPartition(&data, canal)

	for msgs := range canal {
		printMessage(&msgs)
	}

	close(canal)

	select {}

}

func printMessage(msgs *payload.Message) {
	fmt.Printf("topic: %s, Message: %s, Partition: %d, Key: %s, time: %s\n", msgs.Topic, msgs.Value, msgs.Partition, msgs.Key, msgs.Time.Format("2006-01-02 15:04:05"))

	println("Headers:")
	for _, header := range *msgs.Headers {
		fmt.Printf("Key: %s, Value: %s\n", header.Key, header.Value)
	}
}

Producer:

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		Producer()
		wg.Done()
	}()

	wg.Wait()
}

func Producer() {

	message := payload.Message{
		Value: []byte("Testar"),
		Topic: "contact-adm-insert",
		Headers: &[]payload.Header{
			{
				Key:   "your-header-key1",
				Value: "your-header-value1",
			},
			{
				Key:   "your-header-key2",
				Value: "your-header-value2",
			},
		},
	}

	pro := factory.NewBroker(factory.Kafka, "springboot:9092")
	pro.SendMessage(&message)

}

To use Redis, follow the code below:


Consumer:
func main() {

	data := payload.Message{
		Topics: &[]string{"contact-adm-insert", "testar"},
	}
	canal := make(chan payload.Message)
	broker := factory.NewBroker(factory.Redis, "springboot:6379")
	go broker.Consumer(&data, canal)

	for msgs := range canal {
		printMessage(&msgs)
	}

	close(canal)

	select {}

}

func printMessage(msgs *payload.Message) {
	fmt.Printf("topic: %s, Message: %s\n", msgs.Topic, msgs.Value)

	println("Headers:")
	for _, header := range *msgs.Headers {
		fmt.Printf("Key: %s, Value: %s\n", header.Key, header.Value)
	}
}

Producer:

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		Producer()
		wg.Done()
	}()

	wg.Wait()
}

func Producer() {

	message := payload.Message{
		Value: []byte("testar"),
		Topic: "contact-adm-insert",
		Headers: &[]payload.Header{
			{
				Key:   "your-header-key1",
				Value: "your-header-value1",
			},
			{
				Key:   "your-header-key2",
				Value: "your-header-value2",
			},
		},
	}

	pro := factory.NewBroker(factory.Redis, "springboot:6379")
	pro.SendMessage(&message)

}

About

A Go broker module that integrates with Apache Kafka, Redis, and AWS SNS/SQS.

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages