Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long running consumer stop receiving messages and it's state is not closed #1037

Open
Tinanuaa opened this issue Aug 1, 2024 · 4 comments
Open

Comments

@Tinanuaa
Copy link

Tinanuaa commented Aug 1, 2024

Describe the bug
I have a long running fastapi app which will start a consumer on app start and consume messages from kafka. The consumer works fine for maybe a week and then it suddenly stop receiving any messages while the consumer is not closed. It happened twice and I checked the log, the common error in the log before the consumer stopped consuming messages was Failed fetch messages from 1001: [Error 7] RequestTimedOutError. After I restarted the service, the consumer can pick up from the last stopped offset and reconsume all the missed messages.

Expected behaviour
A long running consumer can keep running for long time and consume messages.

Environment (please complete the following information):

  • aiokafka version: 0.10.0 and 0.11.0
  • Kafka Broker version: 3.1.0
  • Other information: aiokafka 0.7.2 works fine

Reproducible example

I use the following scripts to reproduce the problem. So the sleep(40) will make the request timeout, conn.send() will raise Timeout exception in version 0.10.0 and 0.11.0, which make client.send() raise RequestTimedOutError. But this not happen in aiokafka 0.7.2. I've went through these issues (#983 and #802 ), and found that in conn.send, if we use asyncio.wait_for instead of utils.wait_for, then the consumer can work after timeout. My guess is these two methods have some differences which cause the problem, I tried to find the root cause but failed, so I just post my findings here.

import asyncio
import logging
import time

import aiokafka


async def consume_task(consumer):
    async for msg in consumer:
        print("New Message Received",msg)
        # await consumer.commit()


async def lag_task(producer):
    while True:
        print("sleeping")
        await asyncio.sleep(10)
        print("inducing lag")
        time.sleep(40)
        print("sending message")
        await producer.send_and_wait(
            topic='test_topic',
            value=b'a message',
        )


async def main():
    async with (
        aiokafka.AIOKafkaProducer(bootstrap_servers='kafka:29092') as producer,
        aiokafka.AIOKafkaConsumer(
            'test_topic',
            bootstrap_servers='kafka:29092',
            group_id='some_group',
            enable_auto_commit=False 
        ) as consumer,
    ):
        consumer._fetcher._fetch_task.add_done_callback(lambda a: print("fetch task done"))
        consumer._client._sync_task.add_done_callback(lambda a: print("client sync task done"))
        consumer._coordinator._heartbeat_task.add_done_callback(lambda a: print("coordinator heartbeat task done"))
        consumer._coordinator._commit_refresh_task.add_done_callback(lambda a: print("coordinator commit offset refresh task done"))
        await consumer.seek_to_end()

        task1 = asyncio.create_task(consume_task(consumer))
        task2 = asyncio.create_task(lag_task(producer))

        await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
        print("something finished")


if __name__ == '__main__':
    logging.basicConfig(level=logging.ERROR,format = '[%(asctime)s] p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s' )
    asyncio.run(main())

With the above script, I used aiokafka 0.7.2, aiokafka 0.10.0 and an updated conn.py (change line 453 from return wait_for(fut, self._request_timeout) to return asyncio.wait_for(fut, self._request_timeout)) in aiokafka 0.10.0. The output is like below

aiokafka 0.7.2

inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
coordinator heartbeat task done
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=11, timestamp=1722407528852, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
coordinator commit offset refresh task done
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=12, timestamp=1722407578861, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
inducing lag

aiokafka 0.10.0

inducing lag
sending message
Failed fetch messages from 1001: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
coordinator heartbeat task done
sleeping
coordinator commit offset refresh task done
fetch task done
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
sleeping
inducing lag

aiokafka 0.11.0

sleeping
inducing lag
sending message
[2024-08-05 07:41:49,015] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/fetcher.py:693} ERROR - Failed fetch messages from 1001: [Error 7] RequestTimedOutError
[2024-08-05 07:41:49,018] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
coordinator heartbeat task done
sleeping
coordinator commit offset refresh task done
inducing lag
sending message
[2024-08-05 07:42:39,027] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/fetcher.py:693} ERROR - Failed fetch messages from 1001: [Error 7] RequestTimedOutError
[2024-08-05 07:42:39,028] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
sleeping
inducing lag
sending message
[2024-08-05 07:43:29,031] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/fetcher.py:693} ERROR - Failed fetch messages from 1001: [Error 7] RequestTimedOutError
[2024-08-05 07:43:29,032] p2865 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
sleeping
inducing lag

aiokafka 0.10.0 after change wait_for to asyncio.wait_for() in conn.py::send()

inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
coordinator heartbeat task done
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=20, timestamp=1722408250464, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
sleeping
coordinator commit offset refresh task done
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=21, timestamp=1722408300480, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1001)for group some_group.
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=22, timestamp=1722408350485, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())

aiokafka 0.11.0 after change wait_for to asyncio.wait_for() in conn.py::send()

sleeping
inducing lag
sending message
[2024-08-05 07:48:29,823] p4507 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
coordinator heartbeat task done
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=522, timestamp=1722844109821, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
coordinator commit offset refresh task done
inducing lag
sending message
[2024-08-05 07:49:19,830] p4507 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
sleeping
inducing lag
sending message
[2024-08-05 07:50:09,835] p4507 {/usr/local/lib/python3.11/site-packages/aiokafka/consumer/group_coordinator.py:798} ERROR - Heartbeat session expired - marking coordinator dead
sleeping
New Message Received ConsumerRecord(topic='test_topic', partition=0, offset=524, timestamp=1722844209833, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())
inducing lag
@ods
Copy link
Collaborator

ods commented Aug 3, 2024

Hi @Tinanuaa, thank you for reporting the issue. It's unclear from the report, have you tried version 0.11.0? The symptom looks pretty much like #983, fixed in the latest release.

@Tinanuaa
Copy link
Author

Tinanuaa commented Aug 5, 2024

Hi @Tinanuaa, thank you for reporting the issue. It's unclear from the report, have you tried version 0.11.0? The symptom looks pretty much like #983, fixed in the latest release.

Yes, I did test on version 0.11.0 as well, I updated the tests result in the issue description above. It seems that when the RequestTimedOutError happen, the consumer can't receive messages, so no "New Messages Received..." printed out in the console. But after change the wait_for method to use asyncio.wait_for, it can receive messages and no RequestTimedOutError printed in the console.

@sidpremkumar
Copy link

sidpremkumar commented Aug 21, 2024

I'm also experiencing the same issue

Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group python-model-io-event-processor.
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin

@Tinanuaa any luck? Also tried upgrading to 0.11 with no luck

@sidpremkumar
Copy link

For anyone else that ends up here, it was so dumb. Just turned out to be timeout vars, try that first (added all the timeout ones under client_id and it fixed everything)

    consumer = AIOKafkaConsumer(
        bootstrap_servers=[KAFKA_URL],
        group_id=f"something",
        client_id=f"something-{uuid.uuid4()}",
        # Increase session timeout (default is 10 seconds)
        session_timeout_ms=60000,
        # Increase heartbeat interval (default is 3 seconds)
        heartbeat_interval_ms=20000,
        # Increase max poll interval (default is 5 minutes)
        max_poll_interval_ms=600000,
        # Enable auto commit (if not already enabled)
        enable_auto_commit=True,
        # Increase auto commit interval (default is 5 seconds)
        auto_commit_interval_ms=15000,
        # Adjust fetch max wait time
        fetch_max_wait_ms=2000,
        # Increase request timeout
        request_timeout_ms=65000,
    )````

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants