Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ignore.default.for.nullables #342

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

andyhuynh3
Copy link

Problem

confluentinc/schema-registry#2326 introduced the ignore.default.for.nullables Avro converter config property. However the storage connectors currently cannot take advantage of it as it's not an exposed config. For example, when using the S3 sink connector, null values are still being replaced with defaults as detailed in this issue. Because this config is currently not exposed, ignore.default.for.nullables will always come in with the default of false:

[2024-02-08 00:58:35,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] AvroDataConfig values:
	allow.optional.map.keys = false
	connect.meta.data = true
	discard.type.doc.default = false
	enhanced.avro.schema.support = true
	generalized.sum.type.support = false
	ignore.default.for.nullables = false
	schemas.cache.config = 1000
	scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)

Solution

Expose the the ignore.default.for.nullables option so that it can be configured.

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Test Strategy

I rebuilt the kafka-connect-storage-core-11.2.4.jar with the included changes in this PR, then ran some manual test with the S3 connector to confirm that the option takes. Here's what my S3 sink settings look like:

{
   "connector.class":"io.confluent.connect.s3.S3SinkConnector",
   "tasks.max":"1",
   "errors.deadletterqueue.context.headers.enable":"true",
   "errors.deadletterqueue.topic.name":"db_ingestion_dead_letter_queue",
   "errors.deadletterqueue.topic.replication.factor":"1",
   "filename.offset.zero.pad.widthrotate_interval_ms":"12",
   "flush.size":"500000",
   "locale":"en",
   "partition.duration.ms":"60000",
   "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
   "path.format": "'\''year'\''=YYYY/'\''month'\''=MM/'\''day'\''=dd/'\''hour'\''=HH",
   "retry.backoff.ms":"5000",
   "rotate.interval.ms":"15000",
   "rotate.schedule.interval.ms":"60000",
   "s3.bucket.name":"my-bucket",
   "s3.part.size":"5242880",
   "s3.region":"us-west-2",
   "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
   "schema.compability":"NONE ",
   "storage.class":"io.confluent.connect.s3.storage.S3Storage",
   "timezone":"UTC",
   "topics.dir":"developer/kafka-connect-avro/data/raw",
   "topics.regex":"dbzium\\.inventory\\..+",
   "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "key.converter.auto.registry.schemas": "true",
   "key.converter.ignore.default.for.nullables": "true",
   "schema.name.adjustment.mode":"avro",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "value.converter.auto.registry.schemas": "true",
   "value.converter.ignore.default.for.nullables": "true",
   "ignore.default.for.nullables": "true"
}

After starting the connector, I see that the ignore.default.for.nullables setting was correctly applied based on the logs below:

[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] AvroDataConfig values:
	allow.optional.map.keys = false
	connect.meta.data = true
	discard.type.doc.default = false
	enhanced.avro.schema.support = true
	generalized.sum.type.support = false
	ignore.default.for.nullables = true
	schemas.cache.config = 1000
	scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

@andyhuynh3 andyhuynh3 requested a review from a team as a code owner February 8, 2024 17:08
Copy link

cla-assistant bot commented Feb 8, 2024

CLA assistant check
All committers have signed the CLA.

Copy link

cla-assistant bot commented Feb 8, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@raphaelauv
Copy link

I don't understand why we need to add an option ignore.default.for.nullables here

do you know why the deser of the confluent-schema-registry lib with the option value.converter.ignore.default.for.nullables is not working ?

@andyhuynh3
Copy link
Author

andyhuynh3 commented Feb 13, 2024

I do not, but I suppose it's similar to why this PR is in place to expose the scrub.invalid.names config.

The config takes and does work with producers (e.g. Debezium), but I wasn't able to get it working with the S3 sink until I introduced the changes in this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants