diff --git a/events/events.go b/events/events.go deleted file mode 100644 index c287757137..0000000000 --- a/events/events.go +++ /dev/null @@ -1,92 +0,0 @@ -// Package events is for event streaming and storage -package events - -import ( - "encoding/json" - "errors" - "time" -) - -var ( - // DefaultStream is the default events stream implementation. - DefaultStream Stream - // DefaultStore is the default events store implementation. - DefaultStore Store -) - -var ( - // ErrMissingTopic is returned if a blank topic was provided to publish. - ErrMissingTopic = errors.New("Missing topic") - // ErrEncodingMessage is returned from publish if there was an error encoding the message option. - ErrEncodingMessage = errors.New("Error encoding message") -) - -// Stream is an event streaming interface. -type Stream interface { - Publish(topic string, msg interface{}, opts ...PublishOption) error - Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) -} - -// Store is an event store interface. -type Store interface { - Read(topic string, opts ...ReadOption) ([]*Event, error) - Write(event *Event, opts ...WriteOption) error -} - -type AckFunc func() error -type NackFunc func() error - -// Event is the object returned by the broker when you subscribe to a topic. -type Event struct { - // Timestamp of the event - Timestamp time.Time - // Metadata contains the values the event was indexed by - Metadata map[string]string - - ackFunc AckFunc - nackFunc NackFunc - // ID to uniquely identify the event - ID string - // Topic of event, e.g. "registry.service.created" - Topic string - // Payload contains the encoded message - Payload []byte -} - -// Unmarshal the events message into an object. -func (e *Event) Unmarshal(v interface{}) error { - return json.Unmarshal(e.Payload, v) -} - -// Ack acknowledges successful processing of the event in ManualAck mode. -func (e *Event) Ack() error { - return e.ackFunc() -} - -func (e *Event) SetAckFunc(f AckFunc) { - e.ackFunc = f -} - -// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode. -func (e *Event) Nack() error { - return e.nackFunc() -} - -func (e *Event) SetNackFunc(f NackFunc) { - e.nackFunc = f -} - -// Publish an event to a topic. -func Publish(topic string, msg interface{}, opts ...PublishOption) error { - return DefaultStream.Publish(topic, msg, opts...) -} - -// Consume to events. -func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { - return DefaultStream.Consume(topic, opts...) -} - -// Read events for a topic. -func Read(topic string, opts ...ReadOption) ([]*Event, error) { - return DefaultStore.Read(topic, opts...) -} diff --git a/events/memory.go b/events/memory.go deleted file mode 100644 index 5a1f78a25d..0000000000 --- a/events/memory.go +++ /dev/null @@ -1,242 +0,0 @@ -package events - -import ( - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/google/uuid" - "github.com/pkg/errors" - log "go-micro.dev/v5/logger" - "go-micro.dev/v5/store" -) - -// NewStream returns an initialized memory stream. -func NewStream(opts ...Option) (Stream, error) { - // parse the options - options := NewOptions(opts...) - - return &mem{store: store.NewMemoryStore(), options: options}, nil -} - -type subscriber struct { - Channel chan Event - - retryMap map[string]int - Group string - Topic string - retryLimit int - ackWait time.Duration - - sync.RWMutex - autoAck bool -} - -type mem struct { - options *Options - store store.Store - - subs []*subscriber - sync.RWMutex -} - -func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error { - // validate the topic - if len(topic) == 0 { - return ErrMissingTopic - } - - // parse the options - options := PublishOptions{ - Timestamp: time.Now(), - } - for _, o := range opts { - o(&options) - } - - // encode the message if it's not already encoded - var payload []byte - if p, ok := msg.([]byte); ok { - payload = p - } else { - p, err := json.Marshal(msg) - if err != nil { - return ErrEncodingMessage - } - payload = p - } - - // construct the event - event := &Event{ - ID: uuid.New().String(), - Topic: topic, - Timestamp: options.Timestamp, - Metadata: options.Metadata, - Payload: payload, - } - - // serialize the event to bytes - bytes, err := json.Marshal(event) - if err != nil { - return errors.Wrap(err, "Error encoding event") - } - - // write to the store - key := fmt.Sprintf("%v/%v", event.Topic, event.ID) - if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil { - return errors.Wrap(err, "Error writing event to store") - } - - // send to the subscribers async - go m.handleEvent(event) - - return nil -} - -func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { - // validate the topic - if len(topic) == 0 { - return nil, ErrMissingTopic - } - - // parse the options - options := ConsumeOptions{ - Group: uuid.New().String(), - AutoAck: true, - } - for _, o := range opts { - o(&options) - } - // TODO RetryLimit - - // setup the subscriber - sub := &subscriber{ - Channel: make(chan Event), - Topic: topic, - Group: options.Group, - retryMap: map[string]int{}, - autoAck: true, - retryLimit: options.GetRetryLimit(), - } - - if !options.AutoAck { - if options.AckWait == 0 { - return nil, fmt.Errorf("invalid AckWait passed, should be positive integer") - } - sub.autoAck = options.AutoAck - sub.ackWait = options.AckWait - } - - // register the subscriber - m.Lock() - m.subs = append(m.subs, sub) - m.Unlock() - - // lookup previous events if the start time option was passed - if options.Offset.Unix() > 0 { - go m.lookupPreviousEvents(sub, options.Offset) - } - - // return the channel - return sub.Channel, nil -} - -// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends -// them into the subscribers channel. -func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { - // lookup all events which match the topic (a blank topic will return all results) - recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) - if err != nil { - m.options.Logger.Logf(log.ErrorLevel, "Error looking up previous events: %v", err) - return - } - - // loop through the records and send it to the channel if it matches - for _, r := range recs { - var ev Event - if err := json.Unmarshal(r.Value, &ev); err != nil { - continue - } - if ev.Timestamp.Unix() < startTime.Unix() { - continue - } - sendEvent(&ev, sub) - } -} - -// handleEvents sends the event to any registered subscribers. -func (m *mem) handleEvent(ev *Event) { - m.RLock() - subs := m.subs - m.RUnlock() - logger := m.options.Logger - // filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message - // being sent to two subscribers with the same queue. - filteredSubs := map[string]*subscriber{} - - // filter down to subscribers who are interested in this topic - for _, sub := range subs { - if len(sub.Topic) == 0 || sub.Topic == ev.Topic { - filteredSubs[sub.Group] = sub - } - } - - // send the message to each channel async (since one channel might be blocked) - for _, sub := range filteredSubs { - go func(s *subscriber) { - if err := sendEvent(ev, s); err != nil { - logger.Log(log.ErrorLevel, err) - } - }(sub) - } -} - -func sendEvent(ev *Event, s *subscriber) error { - evCopy := *ev - if s.autoAck { - s.Channel <- evCopy - return nil - } - evCopy.SetAckFunc(ackFunc(s, evCopy)) - evCopy.SetNackFunc(nackFunc(s, evCopy)) - s.retryMap[evCopy.ID] = 0 - tick := time.NewTicker(s.ackWait) - defer tick.Stop() - for range tick.C { - s.Lock() - count, ok := s.retryMap[evCopy.ID] - s.Unlock() - if !ok { - // success - break - } - - if s.retryLimit > -1 && count > s.retryLimit { - s.Lock() - delete(s.retryMap, evCopy.ID) - s.Unlock() - return fmt.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) - } - s.Channel <- evCopy - s.Lock() - s.retryMap[evCopy.ID] = count + 1 - s.Unlock() - } - return nil -} - -func ackFunc(s *subscriber, evCopy Event) func() error { - return func() error { - s.Lock() - delete(s.retryMap, evCopy.ID) - s.Unlock() - return nil - } -} - -func nackFunc(s *subscriber, evCopy Event) func() error { - return func() error { - return nil - } -} diff --git a/events/options.go b/events/options.go deleted file mode 100644 index 00c6ef5290..0000000000 --- a/events/options.go +++ /dev/null @@ -1,173 +0,0 @@ -package events - -import ( - "context" - "time" - - "go-micro.dev/v5/logger" -) - -type Options struct { - Logger logger.Logger -} - -type Option func(o *Options) - -func NewOptions(opts ...Option) *Options { - options := Options{ - Logger: logger.DefaultLogger, - } - - for _, o := range opts { - o(&options) - } - - return &options -} - -type StoreOptions struct { - Backup Backup - Logger logger.Logger - TTL time.Duration -} - -type StoreOption func(o *StoreOptions) - -// WithLogger sets the underline logger. -func WithLogger(l logger.Logger) StoreOption { - return func(o *StoreOptions) { - o.Logger = l - } -} - -// PublishOptions contains all the options which can be provided when publishing an event. -type PublishOptions struct { - // Metadata contains any keys which can be used to query the data, for example a customer id - Metadata map[string]string - // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used - Timestamp time.Time -} - -// PublishOption sets attributes on PublishOptions. -type PublishOption func(o *PublishOptions) - -// WithMetadata sets the Metadata field on PublishOptions. -func WithMetadata(md map[string]string) PublishOption { - return func(o *PublishOptions) { - o.Metadata = md - } -} - -// WithTimestamp sets the timestamp field on PublishOptions. -func WithTimestamp(t time.Time) PublishOption { - return func(o *PublishOptions) { - o.Timestamp = t - } -} - -// ConsumeOptions contains all the options which can be provided when subscribing to a topic. -type ConsumeOptions struct { - // Offset is the time from which the messages should be consumed from. If not provided then - // the messages will be consumed starting from the moment the Subscription starts. - Offset time.Time - // Group is the name of the consumer group, if two consumers have the same group the events - // are distributed between them - Group string - AckWait time.Duration - // RetryLimit indicates number of times a message is retried - RetryLimit int - // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. - // If false specifies that each message need ts to be manually acknowledged by the subscriber. - // If processing is successful the message should be ack'ed to remove the message from the stream. - // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will - // remain on the stream to be processed again. - AutoAck bool - // CustomRetries indicates whether to use RetryLimit - CustomRetries bool -} - -// ConsumeOption sets attributes on ConsumeOptions. -type ConsumeOption func(o *ConsumeOptions) - -// WithGroup sets the consumer group to be part of when consuming events. -func WithGroup(q string) ConsumeOption { - return func(o *ConsumeOptions) { - o.Group = q - } -} - -// WithOffset sets the offset time at which to start consuming events. -func WithOffset(t time.Time) ConsumeOption { - return func(o *ConsumeOptions) { - o.Offset = t - } -} - -// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received -// the message is requeued in case auto ack is turned off. -func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption { - return func(o *ConsumeOptions) { - o.AutoAck = ack - o.AckWait = ackWait - } -} - -// WithRetryLimit sets the RetryLimit field on ConsumeOptions. -// Set to -1 for infinite retries (default). -func WithRetryLimit(retries int) ConsumeOption { - return func(o *ConsumeOptions) { - o.RetryLimit = retries - o.CustomRetries = true - } -} - -func (s ConsumeOptions) GetRetryLimit() int { - if !s.CustomRetries { - return -1 - } - return s.RetryLimit -} - -// WriteOptions contains all the options which can be provided when writing an event to a store. -type WriteOptions struct { - // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should - // be stored indefinitely - TTL time.Duration -} - -// WriteOption sets attributes on WriteOptions. -type WriteOption func(o *WriteOptions) - -// WithTTL sets the TTL attribute on WriteOptions. -func WithTTL(d time.Duration) WriteOption { - return func(o *WriteOptions) { - o.TTL = d - } -} - -// ReadOptions contains all the options which can be provided when reading events from a store. -type ReadOptions struct { - // Limit the number of results to return - Limit uint - // Context should contain all implementation specific options, using context.WithValue. - Context context.Context - // Offset the results by this number, useful for paginated queries - Offset uint -} - -// ReadOption sets attributes on ReadOptions. -type ReadOption func(o *ReadOptions) - -// ReadLimit sets the limit attribute on ReadOptions. -func ReadLimit(l uint) ReadOption { - return func(o *ReadOptions) { - o.Limit = l - } -} - -// ReadOffset sets the offset attribute on ReadOptions. -func ReadOffset(l uint) ReadOption { - return func(o *ReadOptions) { - o.Offset = l - } -} diff --git a/events/store.go b/events/store.go deleted file mode 100644 index 07398eac2a..0000000000 --- a/events/store.go +++ /dev/null @@ -1,129 +0,0 @@ -package events - -import ( - "encoding/json" - "time" - - "github.com/pkg/errors" - log "go-micro.dev/v5/logger" - "go-micro.dev/v5/store" -) - -const joinKey = "/" - -// NewStore returns an initialized events store. -func NewStore(opts ...StoreOption) Store { - // parse the options - var options StoreOptions - for _, o := range opts { - o(&options) - } - if options.TTL.Seconds() == 0 { - options.TTL = time.Hour * 24 - } - - options.Logger = log.LoggerOrDefault(options.Logger) - - // return the store - evs := &evStore{ - opts: options, - store: store.NewMemoryStore(), - } - if options.Backup != nil { - go evs.backupLoop() - } - return evs -} - -type evStore struct { - opts StoreOptions - store store.Store -} - -// Read events for a topic. -func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) { - // validate the topic - if len(topic) == 0 { - return nil, ErrMissingTopic - } - - // parse the options - options := ReadOptions{ - Offset: 0, - Limit: 250, - } - for _, o := range opts { - o(&options) - } - - // execute the request - recs, err := s.store.Read(topic+joinKey, - store.ReadPrefix(), - store.ReadLimit(options.Limit), - store.ReadOffset(options.Offset), - ) - if err != nil { - return nil, errors.Wrap(err, "Error reading from store") - } - - // unmarshal the result - result := make([]*Event, len(recs)) - for i, r := range recs { - var e Event - if err := json.Unmarshal(r.Value, &e); err != nil { - return nil, errors.Wrap(err, "Invalid event returned from stroe") - } - result[i] = &e - } - - return result, nil -} - -// Write an event to the store. -func (s *evStore) Write(event *Event, opts ...WriteOption) error { - // parse the options - options := WriteOptions{ - TTL: s.opts.TTL, - } - for _, o := range opts { - o(&options) - } - - // construct the store record - bytes, err := json.Marshal(event) - if err != nil { - return errors.Wrap(err, "Error mashaling event to JSON") - } - // suffix event ID with hour resolution for easy retrieval in batches - timeSuffix := time.Now().Format("2006010215") - - record := &store.Record{ - // key is such that reading by prefix indexes by topic and reading by suffix indexes by time - Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix, - Value: bytes, - Expiry: options.TTL, - } - - // write the record to the store - if err := s.store.Write(record); err != nil { - return errors.Wrap(err, "Error writing to the store") - } - - return nil -} - -func (s *evStore) backupLoop() { - for { - err := s.opts.Backup.Snapshot(s.store) - if err != nil { - s.opts.Logger.Logf(log.ErrorLevel, "Error running backup %s", err) - } - - time.Sleep(1 * time.Hour) - } -} - -// Backup is an interface for snapshotting the events store to long term storage. -type Backup interface { - Snapshot(st store.Store) error -} diff --git a/events/store_test.go b/events/store_test.go deleted file mode 100644 index 817c1f83af..0000000000 --- a/events/store_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package events - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -func TestStore(t *testing.T) { - store := NewStore() - - t.Parallel() - - testData := []Event{ - {ID: uuid.New().String(), Topic: "foo"}, - {ID: uuid.New().String(), Topic: "foo"}, - {ID: uuid.New().String(), Topic: "bar"}, - } - - // write the records to the store - t.Run("Write", func(t *testing.T) { - t.Parallel() - for _, event := range testData { - err := store.Write(&event) - assert.Nilf(t, err, "Writing an event should not return an error") - } - }) - - // should not be able to read events from a blank topic - t.Run("ReadMissingTopic", func(t *testing.T) { - t.Parallel() - evs, err := store.Read("") - assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error") - assert.Nil(t, evs, "No events should be returned") - }) - - // should only get the events from the topic requested - t.Run("ReadTopic", func(t *testing.T) { - t.Parallel() - evs, err := store.Read("foo") - assert.Nilf(t, err, "No error should be returned") - assert.Len(t, evs, 2, "Only the events for this topic should be returned") - }) - - // limits should be honored - t.Run("ReadTopicLimit", func(t *testing.T) { - t.Parallel() - evs, err := store.Read("foo", ReadLimit(1)) - assert.Nilf(t, err, "No error should be returned") - assert.Len(t, evs, 1, "The result should include no more than the read limit") - }) -} diff --git a/events/stream_test.go b/events/stream_test.go deleted file mode 100644 index f6c9fbfc7e..0000000000 --- a/events/stream_test.go +++ /dev/null @@ -1,250 +0,0 @@ -package events - -import ( - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -type testPayload struct { - Message string -} - -type testCase struct { - str Stream - name string -} - -func TestStream(t *testing.T) { - tcs := []testCase{} - - t.Parallel() - - stream, err := NewStream() - assert.Nilf(t, err, "NewStream should not return an error") - assert.NotNilf(t, stream, "NewStream should return a stream object") - tcs = append(tcs, testCase{str: stream, name: "memory"}) - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - runTestStream(t, tc.str) - }) - } -} - -func runTestStream(t *testing.T, stream Stream) { - // TestMissingTopic will test the topic validation on publish - t.Run("TestMissingTopic", func(t *testing.T) { - err := stream.Publish("", nil) - assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error") - }) - - // TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the - // same test topic. - t.Run("TestConsumeTopic", func(t *testing.T) { - t.Parallel() - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the subscriber - evChan, err := stream.Consume("test") - assert.Nilf(t, err, "Consume should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not received") - } - }() - - err = stream.Publish("test", payload, WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(1) - - // wait for the subscriber to receive the message or timeout - wg.Wait() - }) - - // TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume - // the message from the firehose topic with different queues. The second subscriber will be registered - // after the message is published to test durability. - t.Run("TestConsumeGroup", func(t *testing.T) { - t.Parallel() - - topic := uuid.New().String() - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the first subscriber - evChan1, err := stream.Consume(topic) - assert.Nilf(t, err, "Consume should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan1: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not received") - } - }() - - err = stream.Publish(topic, payload, WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(2) - - // create the second subscriber - evChan2, err := stream.Consume(topic, - WithGroup("second_queue"), - WithOffset(time.Now().Add(time.Minute*-1)), - ) - assert.Nilf(t, err, "Consume should not return an error") - - go func() { - timeout := time.NewTimer(time.Second * 1) - - select { - case event, _ := <-evChan2: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not received") - } - }() - - // wait for the subscriber to receive the message or timeout - wg.Wait() - }) - - t.Run("AckingNacking", func(t *testing.T) { - t.Parallel() - - ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second)) - assert.NoError(t, err, "Unexpected error subscribing") - assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"})) - assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"})) - - ev := <-ch - ev.Ack() - ev = <-ch - nacked := ev.ID - ev.Nack() - select { - case ev = <-ch: - assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again") - assert.NoError(t, ev.Ack()) - case <-time.After(7 * time.Second): - t.Fatalf("Timed out waiting for message to be put back on queue") - } - }) - - t.Run("Retries", func(t *testing.T) { - t.Parallel() - - ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1)) - assert.NoError(t, err, "Unexpected error subscribing") - assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"})) - - ev := <-ch - id := ev.ID - ev.Nack() - ev = <-ch - assert.Equal(t, id, ev.ID, "Nacked message should have been received again") - ev.Nack() - select { - case ev = <-ch: - t.Fatalf("Unexpected event received") - case <-time.After(7 * time.Second): - } - }) - - t.Run("InfiniteRetries", func(t *testing.T) { - t.Parallel() - - ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second)) - assert.NoError(t, err, "Unexpected error subscribing") - assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"})) - - count := 0 - id := "" - for { - select { - case ev := <-ch: - if id != "" { - assert.Equal(t, id, ev.ID, "Nacked message should have been received again") - } - id = ev.ID - case <-time.After(3 * time.Second): - t.Fatalf("Unexpected event received") - } - - count++ - if count == 11 { - break - } - } - }) - - t.Run("twoSubs", func(t *testing.T) { - t.Parallel() - - ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second)) - assert.NoError(t, err, "Unexpected error subscribing to topic 1") - ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second)) - assert.NoError(t, err, "Unexpected error subscribing to topic 2") - - assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"})) - assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"})) - - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - ev := <-ch1 - assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic") - wg.Done() - }() - go func() { - ev := <-ch2 - assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic") - wg.Done() - }() - wg.Wait() - }) -}