Skip to content

Commit

Permalink
Add support for JSON/camelcase conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
malthe committed Jul 25, 2023
1 parent d2b6fec commit 3511242
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public class ProtobufData {
private boolean useWrapperForRawPrimitives;
private boolean generateStructForNulls;
private boolean generateIndexForUnions;
private boolean useJsonFieldNames;

public ProtobufData() {
this(new ProtobufDataConfig.Builder().with(
Expand All @@ -332,6 +333,7 @@ public ProtobufData(ProtobufDataConfig protobufDataConfig) {
this.useWrapperForRawPrimitives = protobufDataConfig.useWrapperForRawPrimitives();
this.generateStructForNulls = protobufDataConfig.generateStructForNulls();
this.generateIndexForUnions = protobufDataConfig.generateIndexForUnions();
this.useJsonFieldNames = protobufDataConfig.useJsonFieldNames();
}

/**
Expand Down Expand Up @@ -519,10 +521,14 @@ private Object fromConnectData(
throw new DataException("Invalid message name: " + scopedStructName);
}
for (Field field : schema.fields()) {
String fieldName = scrubName(field.name());
Object fieldCtx = getFieldType(ctx, fieldName);
String fieldName = field.name();
if (useJsonFieldNames) {
fieldName = fromJsonCase(fieldName);
}
String scrubbedFieldName = scrubName(fieldName);
Object fieldCtx = getFieldType(ctx, scrubbedFieldName);
Object connectFieldVal = ignoreDefaultForNullables
? struct.getWithoutDefault(field.name()) : struct.get(field);
? struct.getWithoutDefault(fieldName) : struct.get(field);
Object fieldValue = fromConnectData(
fieldCtx,
field.schema(),
Expand All @@ -539,10 +545,10 @@ private Object fromConnectData(
fieldValue = union.getValue();
} else {
fieldDescriptor = messageBuilder.getDescriptorForType()
.findFieldByName(fieldName);
.findFieldByName(scrubbedFieldName);
}
if (fieldDescriptor == null) {
throw new DataException("Cannot find field with name " + fieldName);
throw new DataException("Cannot find field with name " + scrubbedFieldName);
}
if (fieldValue != null) {
messageBuilder.setField(fieldDescriptor, fieldValue);
Expand Down Expand Up @@ -727,12 +733,16 @@ private MessageDefinition messageDefinitionFromConnectSchema(
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
.get(PROTOBUF_TYPE_TAG) : null;
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : index++;
String fieldName = field.name();
if (useJsonFieldNames) {
fieldName = fromJsonCase(fieldName);
}
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
ctx,
schema,
message,
fieldSchema,
scrubName(field.name()),
scrubName(fieldName),
tag
);
if (fieldDef != null) {
Expand Down Expand Up @@ -763,6 +773,22 @@ private MessageDefinition messageDefinitionFromConnectSchema(
return message.build();
}

private static String fromJsonCase(final String str) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < str.length(); i++) {
char c = str.charAt(i);
if (Character.isUpperCase(c)) {
if (i != 0) {
sb.append("_");
}
sb.append(Character.toLowerCase(c));
} else {
sb.append(c);
}
}
return sb.toString();
}

private void oneofDefinitionFromConnectSchema(
FromConnectContext ctx,
DynamicSchema.Builder schema,
Expand Down Expand Up @@ -1363,7 +1389,7 @@ private void setStructField(
Struct result,
FieldDescriptor fieldDescriptor
) {
final String fieldName = fieldDescriptor.getName();
final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
final Field field = schema.field(fieldName);
if ((isPrimitiveOrRepeated(fieldDescriptor) && !isOptional(fieldDescriptor))
|| (generateStructForNulls || message.hasField(fieldDescriptor))) {
Expand Down Expand Up @@ -1425,7 +1451,8 @@ private SchemaBuilder toConnectSchema(
// Already added field as oneof
continue;
}
builder.field(fieldDescriptor.getName(), toConnectSchema(ctx, fieldDescriptor));
final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
builder.field(fieldName, toConnectSchema(ctx, fieldDescriptor));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class ProtobufDataConfig extends AbstractDataConfig {
public static final String GENERATE_INDEX_FOR_UNIONS_DOC = "Whether to suffix union"
+ "names with an underscore followed by an index";

public static final String JSON_FIELD_NAMES_CONFIG = "json.field.names";
public static final boolean JSON_FIELD_NAMES_DEFAULT = false;
public static final String JSON_FIELD_NAMES_DOC = "Whether to convert protobuf field names "
+ "to camelcase for internal data representation and vice-versa.";

public static ConfigDef baseConfigDef() {
return AbstractDataConfig.baseConfigDef()
.define(ENHANCED_PROTOBUF_SCHEMA_SUPPORT_CONFIG,
Expand Down Expand Up @@ -112,7 +117,12 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Type.BOOLEAN,
GENERATE_INDEX_FOR_UNIONS_DEFAULT,
ConfigDef.Importance.LOW,
GENERATE_INDEX_FOR_UNIONS_DOC
GENERATE_INDEX_FOR_UNIONS_DOC)
.define(JSON_FIELD_NAMES_CONFIG,
ConfigDef.Type.BOOLEAN,
JSON_FIELD_NAMES_DEFAULT,
ConfigDef.Importance.LOW,
JSON_FIELD_NAMES_DOC
);
}

Expand Down Expand Up @@ -156,6 +166,8 @@ public boolean generateIndexForUnions() {
return this.getBoolean(GENERATE_INDEX_FOR_UNIONS_CONFIG);
}

public boolean useJsonFieldNames() { return this.getBoolean(JSON_FIELD_NAMES_CONFIG); }

public static class Builder {

private final Map<String, Object> props = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue;
import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2;
import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue;
import java.util.List;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -39,9 +40,12 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import io.confluent.connect.protobuf.test.Key;
Expand Down Expand Up @@ -281,6 +285,23 @@ public void testFromConnectDataForValue() {
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataForValueUseJsonFieldNames() {
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();

Map<String, Object> configs = new HashMap<>(SR_CONFIG);
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
converter.configure(configs, false);

SchemaAndValue schemaAndValue = getExpectedTestMessageWithJsonFieldNames();

byte[] result = converter.fromConnectData("my-topic",
schemaAndValue.schema(), schemaAndValue.value()
);

assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataForValueWithNamespace() {
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
Expand Down Expand Up @@ -531,6 +552,44 @@ public void testToConnectDataForValue() throws Exception {
assertEquals(expected, result);
}

@Test
public void testToConnectDataForValueUseJsonFieldNames() throws Exception {
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
converter.configure(configs, false);
// extra byte for message index
final byte[] input = concat(new byte[]{0, 0, 0, 0, 1, 0}, HELLO_WORLD_MESSAGE.toByteArray());
schemaRegistry.register("my-topic-value", getSchema(TestMessage.getDescriptor()));
SchemaAndValue result = converter.toConnectData("my-topic", input);

SchemaAndValue expected = getExpectedTestMessageWithJsonFieldNames();

assertEquals(expected.schema(), result.schema());
assertEquals(expected, result);
}

private SchemaAndValue getExpectedTestMessageWithJsonFieldNames() {
Struct testMessageStruct = getTestMessageStruct(TEST_MSG_STRING, 123);
Schema testMessageSchema = getTestMessageSchema();

final SchemaBuilder builder = SchemaBuilder.struct();
builder.name("TestMessage").version(1);
List values = new ArrayList<>();
for (Field field : testMessageSchema.fields()) {
String jsonFieldName = TestMessage.getDescriptor()
.findFieldByName(field.name()).getJsonName();
builder.field(jsonFieldName, field.schema());
values.add(testMessageStruct.get(field));
}
final Schema jsonSchema = builder.build();
final Struct jsonStruct = new Struct(jsonSchema);
final Iterator<Object> valuesIt = values.iterator();
for (Field field : jsonSchema.fields()) {
jsonStruct.put(field, valuesIt.next());
}
return new SchemaAndValue(jsonSchema, jsonStruct);
}

@Test
public void testToConnectDataForValueWithSecondMessage() throws Exception {
converter.configure(SR_CONFIG, false);
Expand Down

0 comments on commit 3511242

Please sign in to comment.