Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got " Failed to create consumer at reconnect" when stop pulsar broker first and restart it later #1290

Open
zhangbiao2009 opened this issue Sep 25, 2024 · 3 comments

Comments

@zhangbiao2009
Copy link

zhangbiao2009 commented Sep 25, 2024

Expected behavior

Expect no error happens.

Actual behavior

image

Steps to reproduce

  1. run the go example program
  2. stop pulsar broker
  3. observe the output of go example, start pulsar broker after you see this error message: error="dial tcp [::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
  4. you can see the Failed to create consumer at reconnect now.

program example to reproduce:

package main

import (
	"context"
	"fmt"
	"log"

	"net/http"
	_ "net/http/pprof"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {

	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	// Create a Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatalf("Could not create Pulsar client: %v", err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "my-topic",
		SubscriptionName: "my-subscription",
		Type:             pulsar.KeyShared,
	})
	if err != nil {
		fmt.Printf("Could not create consumer: %v\n", err)
		return
	}
	go func() {
		for {
			msg, err := consumer.Receive(context.Background())
			if err != nil {
				log.Printf("Error receiving message: %v", err)
				break
			}

			// Process the message
			fmt.Printf("Received message msgId: %v -- content: '%s'\n",
				msg.ID(), string(msg.Payload()))

			// Acknowledge the message
			consumer.Ack(msg)
		}
	}()

	// Keep the main function alive to allow the goroutine to run
	select {}
}

System configuration

Macbook Pro 2021
go version: 1.23.0
pulsar go sdk version: 0.13.1

pulsar broker version: latest docker image. I run it with this command:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ apachepulsar/pulsar:latest \ bin/pulsar standalone

@nodece
Copy link
Member

nodece commented Oct 1, 2024

my-topic does not exist.

@zhangbiao2009
Copy link
Author

zhangbiao2009 commented Oct 1, 2024

my-topic does not exist.

@nodece Thanks for the reply.

I think I got what you mean, the reason why it fails to reconnect because the topic doesn't exist after I re-ran the docker pulsar command, because the topic was stored locally in the docker container, they were lost after I restarted the docker command. This error wouldn't happen in a real environment because topics are stored in BookKeeper. Is that correct?

I also tried specifying a volume for it to store topics persistently:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ -v pulsar_data:/pulsar/data \ apachepulsar/pulsar:latest \ bin/pulsar standalone
It can reconnect successfully when I repeat my test.

However, I still think this is an issue, I checked the running goroutines when the Topic Not Found error happened, the consumer.Receive() still blocks in this case. I think if we cannot reconnect successfully, consumer.Receive() should return an error, otherwise the caller can do nothing about it but getting stuck there. Do you think so?

@nodece
Copy link
Member

nodece commented Oct 2, 2024

We need to introduce the Golang context, which can help us break the receiving process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants