Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unclosed AIOKafkaConnection that does not cause the consumer to crash #1019

Open
YraganTron opened this issue Jun 26, 2024 · 1 comment
Open

Comments

@YraganTron
Copy link

Describe the bug
I have a consumer group, which consists of pods of my service. Pods appear and disappear depending on the load. At some point, my pod appeared and at the start there was an error Unclosed AIOKafkaConnection. After which the service launched successfully and the consumer did not receive a single message. At the same time, checking the connections from the consumer to the broker, it was established. I have a method that is called once every 10 seconds to check the connection,

    async def ping(self) -> bool:
        return await self._client.ready(self._client.get_random_node())

This way I got a non-working consumer that didn't fail, but didn't actually work and Kafka considered it part of the group

Here is the full event log, including the Kafka log

12:12:01.736 - a new pod my-pod-8f45d9497-xvtnx
12:12:02.319 - Unclosed AIOKafkaConnection error in my-pod-8f45d9497-xvtnx pod
12:17:45 - pod my-pod-8f45d9497-xvtnx died

Kafka logs:
[12:12:01,799] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group device-shadow-prod in Stable state. Created a new member id aiokafka-0.7.2-3b825121-fe0f-45fd-b92e-996ffc08d6ba for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[12:12:01,799] INFO [GroupCoordinator 1]: Preparing to rebalance group device-shadow-prod in state PreparingRebalance with old generation 3128 (__consumer_offsets-5) (reason: Adding new member aiokafka-0.7.2-3b825121-fe0f- 45fd-b92e-996ffc08d6ba with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[12:12:02,273] INFO [GroupCoordinator 1]: Stabilized group device-shadow-prod generation 3129 (__consumer_offsets-5) with 2 members (kafka.coordinator.group.GroupCoordinator)
[12:12:02,274] INFO [GroupCoordinator 1]: Assignment received from leader aiokafka-0.7.2-46313310-5ad0-463a-adc4-fa5cdeac7d7a for group device-shadow-prod for generation 3129. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[12:17:49,359] INFO [GroupCoordinator 1]: Member aiokafka-0.7.2-3b825121-fe0f-45fd-b92e-996ffc08d6ba in group device-shadow-prod has failed, removing it from the group (kafka.coordinator.group .GroupCoordinator)
[12:17:49,359] INFO [GroupCoordinator 1]: Preparing to rebalance group device-shadow-prod in state PreparingRebalance with old generation 3129 (__consumer_offsets-5) (reason: removing member aiokafka-0.7.2-3b825121-fe0f-45fd -b92e-996ffc08d6ba on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[12:17:50,453] INFO [GroupCoordinator 1]: Stabilized group device-shadow-prod generation 3130 (__consumer_offsets-5) with 1 members (kafka.coordinator.group.GroupCoordinator)
[12:17:50,454] INFO [GroupCoordinator 1]: Assignment received from leader aiokafka-0.7.2-46313310-5ad0-463a-adc4-fa5cdeac7d7a for group device-shadow-prod for generation 3130. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

I'm using the default settings for consumer. The situation looks like rebalancing occurred again after max_poll_interval_ms expired. Rebalancing occurred after ~345 seconds instead of 300. But this still does not explain the problem with the fact that the consumer was not actually working

Expected behaviour
I expected an exception to be raised after the Unclosed AIOKafkaConnection error or if this was not a critical error, then I expected the consumer to work

Environment (please complete the following information):

  • aiokafka version 0.7.2
  • Kafka Broker version 3.6.0:

Reproducible example
I tried for a long time to reproduce the situation with receiving the Unclosed AIOKafkaConnection error so that it did not end with an execution, and it did not work. Looking at the consumer's code, I don't even have any guesses as to how exactly this could have happened, I would be glad to have any suggestions

@YraganTron
Copy link
Author

It seems that the only place where the connection is deleted until the consumer is running is the exit from the method

async def start(self):
    self._client.bootstrap()
    # Here

The next assumption is that when the connection's closing method is called, the connection is not closed immediately. Related comment

    def close(self, reason=None, exc=None):
        self.log.debug("Closing connection at %s:%s", self._host, self._port)
        if self._reader is not None:
            self._writer.close()
            self._writer = self._reader = None
            if not self._read_task.done():
                self._read_task.cancel()
                self._read_task = None
            for _, _, fut in self._requests:
                if not fut.done():
                    error = Errors.KafkaConnectionError(
                        f"Connection at {self._host}:{self._port} closed"
                    )
                    if exc is not None:
                        error.__cause__ = exc
                        error.__context__ = exc
                    fut.set_exception(error)
            self._requests = collections.deque()
            if self._on_close_cb is not None:
                self._on_close_cb(self, reason)
                self._on_close_cb = None
        if self._idle_handle is not None:
            self._idle_handle.cancel()

        # transport.close() will close socket, but not right ahead. Return
        # a future in case we need to wait on it.
        return self._closed_fut

We have a workaround with a future self._closed_fut, waiting for which we guarantee that the socket was closed. It seems we are missing waiting for this future when calling bootstrap method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant