Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate using Confluent Cloud API to obtain Consumer Group's current_offset data #114

Open
detro opened this issue Jan 7, 2024 · 1 comment
Labels

Comments

@detro
Copy link
Contributor

detro commented Jan 7, 2024

This would enable Kommitted to work with Confluent Cloud. Unfortunately the API don't seem to return the timestamp of when the value current_offset was read, and so we will have to work on the assumption it's the wall clock value.

The "Emitter -> Register" design choice still works pretty well here: it will require to introduce a new Emitter that loops over the list of currently known Consumer Groups. That list can be obtained via the consumer_groups module, and then multiple reads will have to be arranged via API.

Potentially, to save requests, the listKafkaConsumerLags API could be used.

API doc:

API path:

/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags/{topic_name}/partitions/{partition_id}

Request example:

curl --request GET \
  --url https://pkc-00000.region.provider.confluent.cloud/kafka/v3/clusters/cluster-1/consumer-groups/consumer-group-1/lags/topic-1/partitions/0 \
  --header 'Authorization: Basic REPLACE_BASIC_AUTH'

Response example:

{
  "kind": "KafkaConsumerLag",
  "metadata": {
    "self": "https://pkc-00000.region.provider.confluent.cloud/kafka/v3/clusters/cluster-1/consumer-groups/consumer-group-1/lags/topic-1/partitions/1",
    "resource_name": "crn:///kafka=cluster-1/consumer-group=consumer-group-1/lag=topic-1/partition=1"
  },
  "cluster_id": "cluster-1",
  "consumer_group_id": "consumer-group-1",
  "topic_name": "topic-1",
  "partition_id": 1,
  "consumer_id": "consumer-1",
  "instance_id": "consumer-instance-1",
  "client_id": "client-1",
  "current_offset": 1,
  "log_end_offset": 101,
  "lag": 100
}
@detro
Copy link
Contributor Author

detro commented Jan 12, 2024

First few notes

  1. module consumer_groups must have it's own ConsumerGroupsRegister and return that on ::init() instead of the ConsumerGroups channel receiver
  2. This new Register has to be configurable: how long after a consumer group stops being reported by the Kafka API, we keep it around in the data (eg application has a total failure, we don't want the CG Lag info to entirely disappear immediately)
  3. Lag Register should use the new register as source of truth for the set of known groups, instead of relying on the data it contains
  4. An entire "emit handling branch" in the Lag Register can be removed: no more need to populate it's internal structure, if the new Register is the source of truth

These changes are valid and doable even before we start consuming Confluent API.

Once the changes above are done, a new emitter, alternative to the KonsumerOffsetsDataEmitter can be created: this would also consume the new ConsumerGroupsRegister, and loop over the list of groups. It will:

  1. list groups
  2. for each group, query the Confluent Cloud API
  3. Create and emit objects OffsetCommit

This emitter would "slotted" in as alternative to KonsumerOffsetsDataEmitter, based on command line arguments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant