diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonContainsPredicate.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonContainsPredicate.java new file mode 100644 index 0000000000000..76e99febeaa0f --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonContainsPredicate.java @@ -0,0 +1,186 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.Slice; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.block.VariableWidthBlock; +import io.trino.spi.expression.Call; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Constant; +import io.trino.spi.expression.FunctionName; +import io.trino.spi.expression.Variable; + +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class PinotJsonContainsPredicate + implements PinotJsonPredicate +{ + private final String columnName; + private final String jsonPath; + private final List values; + private final boolean valuesContainsStrings; + private final String type; + + @JsonCreator + public PinotJsonContainsPredicate( + @JsonProperty("columnName") String columnName, + @JsonProperty("jsonPath") String jsonPath, + @JsonProperty("values") List values, + @JsonProperty("valuesContainsStrings") boolean valuesContainsStrings, + @JsonProperty("type") String type) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.jsonPath = requireNonNull(jsonPath, "jsonPath is null"); + this.values = requireNonNull(values, "values is null"); + this.valuesContainsStrings = valuesContainsStrings; + this.type = "contains"; + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public String getJsonPath() + { + return jsonPath; + } + + @JsonProperty + public List getValues() + { + return values; + } + + @JsonProperty + public boolean getValuesContainsStrings() + { + return valuesContainsStrings; + } + + @JsonProperty + public String getType() + { + return type; + } + + public PinotJsonContainsPredicate(Call call) + { + List containsCallArgs = call.getArguments(); + Constant arrayArg = (Constant) containsCallArgs.getFirst(); + values = new ArrayList<>(); + if (arrayArg.getValue() instanceof VariableWidthBlock stringArray) { + valuesContainsStrings = true; + for (int index = 0; index < stringArray.getPositionCount(); index++) { + values.add(stringArray.getSlice(index).toStringUtf8()); + } + } + else if (arrayArg.getValue() instanceof IntArrayBlock intArray) { + valuesContainsStrings = false; + for (int index = 0; index < intArray.getPositionCount(); index++) { + values.add(String.valueOf(intArray.getInt(index))); + } + } + else { + throw new IllegalArgumentException("Unsupported array argument type: " + arrayArg.getValue()); + } + + Call innerCall = (Call) containsCallArgs.get(1); + Call jsonExtractScalarCall = innerCall; + if (new FunctionName("$cast").equals(innerCall.getFunctionName())) { + jsonExtractScalarCall = (Call) innerCall.getArguments().getFirst(); + } + + List args = jsonExtractScalarCall.getArguments(); + columnName = ((Variable) args.get(0)).getName(); + jsonPath = ((Slice) ((Constant) args.get(1)).getValue()).toStringUtf8(); + type = "contains"; + } + + public static boolean supportsCall(Call call) + { + if (!new FunctionName("contains").equals(call.getFunctionName())) { + return false; + } + + List arguments = call.getArguments(); + ConnectorExpression arrayArg = arguments.get(0); + if (!(arrayArg instanceof Constant) || !(arguments.get(1) instanceof Call innerCall)) { + return false; + } + + Constant constant = (Constant) arrayArg; + if (!(constant.getValue() instanceof VariableWidthBlock || constant.getValue() instanceof IntArrayBlock)) { + return false; + } + + if (new FunctionName("$cast").equals(innerCall.getFunctionName())) { + List castArguments = innerCall.getArguments(); + if (!(castArguments.getFirst() instanceof Call jsonExtracatScalarCall)) { + return false; + } + return isSupportedJsonExtractScalarCall(jsonExtracatScalarCall); + } + else { + return isSupportedJsonExtractScalarCall(innerCall); + } + } + + private static boolean isSupportedJsonExtractScalarCall(Call call) + { + if (!new FunctionName("json_extract_scalar").equals(call.getFunctionName())) { + return false; + } + + List arguments = call.getArguments(); + if (!(arguments.get(0) instanceof Variable) || !(arguments.get(1) instanceof Constant)) { + return false; + } + + // TODO: resolve dependency issues to allow usage of io.trino.type.JsonType + // return arguments.get(0).getType() instanceof JsonType; + + return true; + } + + @Override + public String toPQL() + { + String escape = valuesContainsStrings ? "''" : ""; + String values = String.join(String.format("%s,%s", escape, escape), this.values); + return String.format("JSON_MATCH(%s, '\"%s\" in (%s%s%s)')", + columnName, jsonPath, escape, values, escape); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("jsonPath", jsonPath) + .add("values", values) + .add("valuesContainsStrings", valuesContainsStrings) + .add("type", type) + .toString(); + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonPredicate.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonPredicate.java new file mode 100644 index 0000000000000..1f11ea59a6941 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotJsonPredicate.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.trino.spi.expression.Call; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = PinotJsonContainsPredicate.class, name = "contains") +}) +public interface PinotJsonPredicate +{ + static boolean supportsCall(Call call) + { + return false; + } + + String toPQL(); +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java index 6e389cecde1e4..e452a1240b9c2 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java @@ -24,6 +24,7 @@ import io.trino.plugin.base.aggregation.AggregateFunctionRewriter; import io.trino.plugin.base.aggregation.AggregateFunctionRule; import io.trino.plugin.base.expression.ConnectorExpressionRewriter; +import io.trino.plugin.base.expression.ConnectorExpressions; import io.trino.plugin.pinot.client.PinotClient; import io.trino.plugin.pinot.query.AggregateExpression; import io.trino.plugin.pinot.query.DynamicTable; @@ -51,6 +52,7 @@ import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.expression.Call; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; @@ -60,6 +62,7 @@ import io.trino.spi.type.Type; import org.apache.pinot.spi.data.Schema; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -162,7 +165,7 @@ public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName if (tableName.getTableName().trim().contains("select ")) { DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient, typeConverter); - return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.tableName(), false, TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable)); + return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.tableName(), false, TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable), List.of()); } String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableNameIfExists(tableName.getTableName()); if (pinotTableName == null) { @@ -174,7 +177,8 @@ public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName getFromCache(pinotTableSchemaCache, pinotTableName).isEnableColumnBasedNullHandling(), TupleDomain.all(), OptionalLong.empty(), - Optional.empty()); + Optional.empty(), + List.of()); } @Override @@ -288,7 +292,8 @@ public Optional> applyLimit(Connect handle.enableNullHandling(), handle.constraint(), OptionalLong.of(limit), - dynamicTable); + dynamicTable, + List.of()); boolean singleSplit = dynamicTable.isPresent(); return Optional.of(new LimitApplicationResult<>(handle, singleSplit, false)); } @@ -330,7 +335,40 @@ else if (isFilterPushdownUnsupported(entry.getValue())) { remainingFilter = TupleDomain.withColumnDomains(unsupported); } - if (oldDomain.equals(newDomain)) { + ConnectorExpression expression = constraint.getExpression(); + List jsonPredicates = new ArrayList<>(); + List notHandledExpressions = new ArrayList<>(); + if (expression instanceof Call call) { + if (call.getFunctionName().getName().equals("$and")) { + List innerExpressions = ConnectorExpressions.extractConjuncts(constraint.getExpression()); + for (ConnectorExpression innerExpression : innerExpressions) { + if (innerExpression instanceof Call innerCall) { + Optional jsonPredicate = getJsonPredicate(innerCall); + if (jsonPredicate.isPresent()) { + jsonPredicates.add(jsonPredicate.get()); + } + else { + notHandledExpressions.add(innerExpression); + } + } + } + } + else { + Optional jsonPredicate = getJsonPredicate(call); + if (jsonPredicate.isPresent()) { + jsonPredicates.add(jsonPredicate.get()); + } + else { + notHandledExpressions.add(expression); + } + } + } + else { + notHandledExpressions.add(expression); + } + ConnectorExpression newExpression = ConnectorExpressions.and(notHandledExpressions); + + if (oldDomain.equals(newDomain) && expression.equals(newExpression)) { return Optional.empty(); } @@ -340,8 +378,23 @@ else if (isFilterPushdownUnsupported(entry.getValue())) { handle.enableNullHandling(), newDomain, handle.limit(), - handle.query()); - return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter, constraint.getExpression(), false)); + handle.query(), + jsonPredicates); + + return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter, newExpression, false)); + } + + private static PinotJsonContainsPredicate toJsonPredicate(Call call) + { + return new PinotJsonContainsPredicate(call); + } + + private Optional getJsonPredicate(Call call) + { + if (PinotJsonContainsPredicate.supportsCall(call)) { + return Optional.of(new PinotJsonContainsPredicate(call)); + } + return Optional.empty(); } // IS NULL and IS NOT NULL are handled differently in Pinot, pushing down would lead to inconsistent results. @@ -472,7 +525,8 @@ public Optional> applyAggrega tableHandle.enableNullHandling(), tableHandle.constraint(), tableHandle.limit(), - Optional.of(dynamicTable)); + Optional.of(dynamicTable), + tableHandle.jsonPredicates()); return Optional.of(new AggregationApplicationResult<>(tableHandle, projections.build(), resultAssignments.build(), ImmutableMap.of(), false)); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java index 5188f4997fa4f..56a06cd68e06e 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java @@ -88,7 +88,7 @@ public ConnectorPageSource createPageSource( if (pinotTableHandle.query().isPresent()) { DynamicTable dynamicTable = pinotTableHandle.query().get(); pinotQueryInfo = new PinotQueryInfo(dynamicTable.tableName(), - extractPql(dynamicTable, pinotTableHandle.constraint()), + extractPql(dynamicTable, pinotTableHandle.constraint(), pinotTableHandle.jsonPredicates()), dynamicTable.groupingColumns().size()); } else { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java index 0219e73953067..f69fb9dccb003 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java @@ -18,6 +18,7 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.predicate.TupleDomain; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -30,7 +31,8 @@ public record PinotTableHandle( boolean enableNullHandling, TupleDomain constraint, OptionalLong limit, - Optional query) + Optional query, + List jsonPredicates) implements ConnectorTableHandle { public PinotTableHandle @@ -40,6 +42,7 @@ public record PinotTableHandle( requireNonNull(constraint, "constraint is null"); requireNonNull(limit, "limit is null"); requireNonNull(query, "query is null"); + requireNonNull(jsonPredicates, "jsonPredicates is null"); } @Override diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java index 3eb3a2a020465..2df98a06807e0 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java @@ -14,9 +14,11 @@ package io.trino.plugin.pinot.query; import io.trino.plugin.pinot.PinotColumnHandle; +import io.trino.plugin.pinot.PinotJsonPredicate; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.predicate.TupleDomain; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,7 +33,7 @@ private DynamicTablePqlExtractor() { } - public static String extractPql(DynamicTable table, TupleDomain tupleDomain) + public static String extractPql(DynamicTable table, TupleDomain tupleDomain, List jsonPredicates) { StringBuilder builder = new StringBuilder(); Map queryOptions = table.queryOptions(); @@ -62,7 +64,7 @@ public static String extractPql(DynamicTable table, TupleDomain tu builder.append(table.tableName()); builder.append(table.suffix().orElse("")); - Optional filter = getFilter(table.filter(), tupleDomain, false); + Optional filter = getFilter(table.filter(), tupleDomain, false, jsonPredicates); if (filter.isPresent()) { builder.append(" WHERE ") .append(filter.get()); @@ -73,7 +75,7 @@ public static String extractPql(DynamicTable table, TupleDomain tu .map(PinotColumnHandle::getExpression) .collect(joining(", "))); } - Optional havingClause = getFilter(table.havingExpression(), tupleDomain, true); + Optional havingClause = getFilter(table.havingExpression(), tupleDomain, true, List.of()); if (havingClause.isPresent()) { builder.append(" HAVING ") .append(havingClause.get()); @@ -95,9 +97,9 @@ public static String extractPql(DynamicTable table, TupleDomain tu return builder.toString(); } - private static Optional getFilter(Optional filter, TupleDomain tupleDomain, boolean forHavingClause) + private static Optional getFilter(Optional filter, TupleDomain tupleDomain, boolean forHavingClause, List jsonPredicates) { - Optional tupleFilter = getFilterClause(tupleDomain, Optional.empty(), forHavingClause); + Optional tupleFilter = getFilterClause(tupleDomain, Optional.empty(), forHavingClause, jsonPredicates); if (tupleFilter.isPresent() && filter.isPresent()) { return Optional.of(format("%s AND %s", encloseInParentheses(tupleFilter.get()), encloseInParentheses(filter.get()))); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java index 864e71db47660..96fe9cfd58cf2 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.plugin.pinot.PinotColumnHandle; +import io.trino.plugin.pinot.PinotJsonPredicate; import io.trino.plugin.pinot.PinotTableHandle; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.predicate.Domain; @@ -96,11 +97,11 @@ private static String getTableName(PinotTableHandle tableHandle, Optional timePredicate) { - getFilterClause(tableHandle.constraint(), timePredicate, false) + getFilterClause(tableHandle.constraint(), timePredicate, false, List.of()) .ifPresent(filterClause -> pqlBuilder.append(" WHERE ").append(filterClause)); } - public static Optional getFilterClause(TupleDomain tupleDomain, Optional timePredicate, boolean forHavingClause) + public static Optional getFilterClause(TupleDomain tupleDomain, Optional timePredicate, boolean forHavingClause, List jsonPredicates) { checkState(!tupleDomain.isNone(), "Pinot does not support 1 = 0 syntax, as a workaround use != "); ImmutableList.Builder conjunctsBuilder = ImmutableList.builder(); @@ -116,6 +117,9 @@ public static Optional getFilterClause(TupleDomain tupleDo toPredicate(pinotColumnHandle, entry.getValue()).ifPresent(conjunctsBuilder::add); } } + for (PinotJsonPredicate jsonPredicate : jsonPredicates) { + conjunctsBuilder.add(jsonPredicate.toPQL()); + } List conjuncts = conjunctsBuilder.build(); if (!conjuncts.isEmpty()) { return Optional.of(Joiner.on(" AND ").join(conjuncts)); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotTpchTables.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotTpchTables.java index fcc36dc0912e3..b1a71e34d65ac 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotTpchTables.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotTpchTables.java @@ -128,6 +128,7 @@ private static void createOrdersTable(TestingKafka kafka, TestingPinotCluster pi .name("clerk").type().stringType().noDefault() .name("shippriority").type().intType().noDefault() .name("comment").type().stringType().noDefault() + .name("json").type().stringType().noDefault() .name("updated_at").type().longType().noDefault() .endRecord(); ImmutableList.Builder> ordersRowsBuilder = ImmutableList.builder(); @@ -143,6 +144,7 @@ private static void createOrdersTable(TestingKafka kafka, TestingPinotCluster pi .set("clerk", row.getField(6)) .set("shippriority", row.getField(7)) .set("comment", row.getField(8)) + .set("json", "{ \"string\": \"value\", \"int\": 1 }") .set("updated_at", INITIAL_UPDATED_AT.plusMillis(1000).toEpochMilli()) .build())); } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java index 6402217590ffa..889a71554558b 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java @@ -106,7 +106,7 @@ WHERE OR(AND("CancellationCode" IN ('strike', 'weather', 'pilot_bac'), ("Origin" AND(("DepDelayMinutes") < '10', ("Distance") >= '3', ("ArrDelay") > '4', ("SecurityDelay") < '5', ("LateAircraftDelay") <= '7'))\ LIMIT 60""".formatted(tableName); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expected); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expected); } @Test @@ -130,7 +130,7 @@ WHERE AND(("string_col") = 'string', ("long_col") = '12345678901',\ ("int_col") = '123456789', ("double_col") = '3.56', ("float_col") = '3.56', ("bytes_col") = 'abcd')\ LIMIT 60"""; DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expected); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expected); } @Test @@ -141,7 +141,7 @@ public void testDoubleWithScientificNotation() String expected = """ SELECT "string_col" FROM primitive_types_table WHERE ("double_col") = '350000.0' LIMIT 10"""; DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expected); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expected); } @Test @@ -157,7 +157,7 @@ public void testFilterWithCast() WHERE AND(("string_col") = '123', ("long_col") = '123')\ LIMIT 60"""; DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expected); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expected); } @Test @@ -178,7 +178,7 @@ WHERE AND((CASE WHEN equals("CancellationCode", 'strike') THEN '3' ELSE '4' END) ELSE 'burger' END) != 'salad') LIMIT 10""".formatted(tableName); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expected); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expected); } @Test @@ -196,7 +196,7 @@ public void testFilterWithPushdownConstraint() FROM realtimeOnly\ WHERE ("OriginCityName" = 'Catfish Paradise')\ LIMIT 60"""; - assertThat(extractPql(dynamicTable, tupleDomain)).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, tupleDomain, List.of())).isEqualTo(expectedPql); } @Test @@ -212,7 +212,7 @@ public void testFilterWithUdf() FROM realtimeOnly\ WHERE AND(("DivLongestGTimes") = '9.0', (exp("CarrierDelay")) > '5')\ LIMIT 60"""; - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); } @Test @@ -222,7 +222,7 @@ public void testSelectStarDynamicTable() String query = "SELECT * FROM %s LIMIT 70".formatted(tableName.toLowerCase(ENGLISH)); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = "SELECT %s FROM %s LIMIT 70".formatted(getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableName); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); } @Test @@ -233,7 +233,7 @@ public void testOfflineDynamicTable() String query = "SELECT * FROM %s LIMIT 70".formatted(tableNameWithSuffix); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = "SELECT %s FROM %s LIMIT 70".formatted(getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -245,7 +245,7 @@ public void testRealtimeOnlyDynamicTable() String query = "SELECT * FROM %s LIMIT 70".formatted(tableNameWithSuffix); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = "SELECT %s FROM %s LIMIT 70".formatted(getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -257,7 +257,7 @@ public void testLimitAndOffset() String query = "SELECT * FROM %s LIMIT 70, 40".formatted(tableNameWithSuffix); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = "SELECT %s FROM %s LIMIT 70, 40".formatted(getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -275,7 +275,7 @@ public void testRegexpLike() DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = """ SELECT "OriginCityName" FROM %s WHERE regexp_like("OriginCityName", '.*york.*') LIMIT 70""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -288,7 +288,7 @@ public void testTextMatch() DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = """ SELECT "OriginCityName" FROM %s WHERE text_match("OriginCityName", 'new and york') LIMIT 70""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -302,7 +302,7 @@ public void testJsonMatch() DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = """ SELECT "OriginCityName" FROM %s WHERE json_match("OriginCityName", '"$.name"=''new york''') LIMIT 70""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -327,7 +327,7 @@ SELECT datetimeconvert("DaysSinceEpoch", '1:SECONDS:EPOCH', '1:MILLISECONDS:EPOC FROM %s \ LIMIT 70""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -359,7 +359,7 @@ GROUP BY datetimeconvert("DaysSinceEpoch", '1:SECONDS:EPOCH', '1:MILLISECONDS:EP timeconvert("DaysSinceEpoch", 'SECONDS', 'MINUTES') \ LIMIT 70""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -375,7 +375,7 @@ SELECT plus("ArrDelay", '34') - "DaysSinceEpoch", "FlightNum"\ FROM %s\ ORDER BY "ArrDelay", "DaysSinceEpoch" DESC\ LIMIT 10""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -391,7 +391,7 @@ SELECT count(*)\ FROM %s\ ORDER BY count(*)\ LIMIT 10""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -407,7 +407,7 @@ SELECT plus("ArrDelay", '34') - "DaysSinceEpoch", "FlightNum"\ FROM %s\ ORDER BY plus("ArrDelay", '34') - "DaysSinceEpoch" DESC\ LIMIT 10""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -425,7 +425,7 @@ public void testQuotesInAlias() SELECT "non_quoted" AS "non""quoted"\ FROM %s\ LIMIT 50""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -443,7 +443,7 @@ public void testQuotesInColumnName() SELECT "qu""ot""ed"\ FROM %s\ LIMIT 50""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } @@ -466,7 +466,7 @@ public void testQueryOptions() SELECT "FlightNum" \ FROM %s \ LIMIT 50""".formatted(tableNameWithSuffix); - assertThat(extractPql(dynamicTable, TupleDomain.all())).isEqualTo(expectedPql); + assertThat(extractPql(dynamicTable, TupleDomain.all(), List.of())).isEqualTo(expectedPql); assertThat(dynamicTable.tableName()).isEqualTo(tableName); } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java index 7fbb1260b209f..f4bc49cd2ae5b 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java @@ -53,7 +53,7 @@ public void testSplitsBroker() SchemaTableName schemaTableName = new SchemaTableName("default", format("SELECT %s, %s FROM %s LIMIT %d", "AirlineID", "OriginStateName", "airlineStats", 100)); DynamicTable dynamicTable = buildFromPql(pinotMetadata, schemaTableName, mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); - PinotTableHandle pinotTableHandle = new PinotTableHandle("default", dynamicTable.tableName(), false, TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable)); + PinotTableHandle pinotTableHandle = new PinotTableHandle("default", dynamicTable.tableName(), false, TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable), List.of()); List splits = getSplitsHelper(pinotTableHandle, 1, false); assertSplits(splits, 1, BROKER); } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotTableHandle.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotTableHandle.java index d4d4be9c25211..8e2b76a8f094e 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotTableHandle.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotTableHandle.java @@ -18,6 +18,7 @@ import io.trino.spi.predicate.TupleDomain; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -55,6 +56,6 @@ public void testEquivalence() public static PinotTableHandle newTableHandle(String schemaName, String tableName) { - return new PinotTableHandle(schemaName, tableName, false, TupleDomain.all(), OptionalLong.empty(), Optional.empty()); + return new PinotTableHandle(schemaName, tableName, false, TupleDomain.all(), OptionalLong.empty(), Optional.empty(), List.of()); } } diff --git a/plugin/trino-pinot/src/test/resources/orders_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/orders_realtimeSpec.json index 007fe0b41ac21..03e7c2a3ec37c 100644 --- a/plugin/trino-pinot/src/test/resources/orders_realtimeSpec.json +++ b/plugin/trino-pinot/src/test/resources/orders_realtimeSpec.json @@ -21,6 +21,7 @@ "noDictionaryColumns": [], "aggregateMetrics": "false", "nullHandlingEnabled": "true", + "jsonIndexColumns": ["json"], "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", diff --git a/plugin/trino-pinot/src/test/resources/orders_schema.json b/plugin/trino-pinot/src/test/resources/orders_schema.json index e7544419b3370..f02c54b304c51 100644 --- a/plugin/trino-pinot/src/test/resources/orders_schema.json +++ b/plugin/trino-pinot/src/test/resources/orders_schema.json @@ -32,6 +32,10 @@ { "name": "comment", "dataType": "STRING" + }, + { + "name": "json", + "dataType": "JSON" } ], "dateTimeFieldSpecs": [