Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Remove legacyImplementJoin #23962

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -764,46 +764,6 @@ public Optional<PreparedQuery> implementJoin(
}
}

@Deprecated
@Override
public Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
for (JdbcJoinCondition joinCondition : joinConditions) {
if (!isSupportedJoinCondition(session, joinCondition)) {
return Optional.empty();
}
}

try (Connection connection = this.connectionFactory.openConnection(session)) {
return Optional.of(queryBuilder.legacyPrepareJoinQuery(
this,
session,
connection,
joinType,
leftSource,
rightSource,
joinConditions,
leftAssignments,
rightAssignments));
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
return false;
}

protected PreparedQuery applyQueryTransformations(JdbcTableHandle tableHandle, PreparedQuery query)
{
PreparedQuery preparedQuery = query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,6 @@ public Optional<PreparedQuery> implementJoin(
return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics);
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics);
}

@Override
public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List<JdbcSortItem> sortOrder)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.JoinApplicationResult;
import io.trino.spi.connector.JoinCondition;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
Expand Down Expand Up @@ -728,101 +727,6 @@ public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
precalculateStatisticsForPushdown));
}

@Deprecated
@Override
public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
ConnectorSession session,
JoinType joinType,
ConnectorTableHandle left,
ConnectorTableHandle right,
List<JoinCondition> joinConditions,
Map<String, ColumnHandle> leftAssignments,
Map<String, ColumnHandle> rightAssignments,
JoinStatistics statistics)
{
if (isTableHandleForProcedure(left) || isTableHandleForProcedure(right)) {
return Optional.empty();
}

if (!isJoinPushdownEnabled(session)) {
return Optional.empty();
}

JdbcTableHandle leftHandle = flushAttributesAsQuery(session, (JdbcTableHandle) left);
JdbcTableHandle rightHandle = flushAttributesAsQuery(session, (JdbcTableHandle) right);

if (!leftHandle.getAuthorization().equals(rightHandle.getAuthorization())) {
return Optional.empty();
}
int nextSyntheticColumnId = max(leftHandle.getNextSyntheticColumnId(), rightHandle.getNextSyntheticColumnId());

ImmutableMap.Builder<JdbcColumnHandle, JdbcColumnHandle> newLeftColumnsBuilder = ImmutableMap.builder();
OptionalInt maxColumnNameLength = jdbcClient.getMaxColumnNameLength(session);
for (JdbcColumnHandle column : jdbcClient.getColumns(session, leftHandle)) {
newLeftColumnsBuilder.put(column, createSyntheticJoinProjectionColumn(column, nextSyntheticColumnId, maxColumnNameLength));
nextSyntheticColumnId++;
}
Map<JdbcColumnHandle, JdbcColumnHandle> newLeftColumns = newLeftColumnsBuilder.buildOrThrow();

ImmutableMap.Builder<JdbcColumnHandle, JdbcColumnHandle> newRightColumnsBuilder = ImmutableMap.builder();
for (JdbcColumnHandle column : jdbcClient.getColumns(session, rightHandle)) {
newRightColumnsBuilder.put(column, createSyntheticJoinProjectionColumn(column, nextSyntheticColumnId, maxColumnNameLength));
nextSyntheticColumnId++;
}
Map<JdbcColumnHandle, JdbcColumnHandle> newRightColumns = newRightColumnsBuilder.buildOrThrow();

ImmutableList.Builder<JdbcJoinCondition> jdbcJoinConditions = ImmutableList.builder();
for (JoinCondition joinCondition : joinConditions) {
Optional<JdbcColumnHandle> leftColumn = getVariableColumnHandle(leftAssignments, joinCondition.getLeftExpression());
Optional<JdbcColumnHandle> rightColumn = getVariableColumnHandle(rightAssignments, joinCondition.getRightExpression());
if (leftColumn.isEmpty() || rightColumn.isEmpty()) {
return Optional.empty();
}
jdbcJoinConditions.add(new JdbcJoinCondition(leftColumn.get(), joinCondition.getOperator(), rightColumn.get()));
}

Optional<PreparedQuery> joinQuery = jdbcClient.legacyImplementJoin(
session,
joinType,
asPreparedQuery(leftHandle),
asPreparedQuery(rightHandle),
jdbcJoinConditions.build(),
newRightColumns.entrySet().stream()
.collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().getColumnName())),
newLeftColumns.entrySet().stream()
.collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().getColumnName())),
statistics);

if (joinQuery.isEmpty()) {
return Optional.empty();
}

return Optional.of(new JoinApplicationResult<>(
new JdbcTableHandle(
new JdbcQueryRelationHandle(joinQuery.get()),
TupleDomain.all(),
ImmutableList.of(),
Optional.empty(),
OptionalLong.empty(),
Optional.of(
ImmutableList.<JdbcColumnHandle>builder()
.addAll(newLeftColumns.values())
.addAll(newRightColumns.values())
.build()),
leftHandle.getAllReferencedTables().flatMap(leftReferencedTables ->
rightHandle.getAllReferencedTables().map(rightReferencedTables ->
ImmutableSet.<SchemaTableName>builder()
.addAll(leftReferencedTables)
.addAll(rightReferencedTables)
.build())),
nextSyntheticColumnId,
leftHandle.getAuthorization(),
leftHandle.getUpdateAssignments()),
ImmutableMap.copyOf(newLeftColumns),
ImmutableMap.copyOf(newRightColumns),
precalculateStatisticsForPushdown));
}

@VisibleForTesting
static JdbcColumnHandle createSyntheticJoinProjectionColumn(JdbcColumnHandle column, int nextSyntheticColumnId, OptionalInt optionalMaxColumnNameLength)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,6 @@ public Optional<PreparedQuery> implementJoin(
return delegate().implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics);
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
return delegate().legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics);
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,6 @@ Optional<PreparedQuery> implementJoin(
List<ParameterizedExpression> joinConditions,
JoinStatistics statistics);

@Deprecated
Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics);

boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List<JdbcSortItem> sortOrder);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,6 @@ public Optional<PreparedQuery> implementJoin(ConnectorSession session, JoinType
return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics);
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, PreparedQuery rightSource, List<JdbcJoinCondition> joinConditions, Map<JdbcColumnHandle, String> rightAssignments, Map<JdbcColumnHandle, String> leftAssignments, JoinStatistics statistics)
{
// there should be no remote database interaction
return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics);
}

@Override
public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List<JdbcSortItem> sortOrder)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcExpression;
import io.trino.plugin.jdbc.JdbcJoinCondition;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcProcedureHandle;
import io.trino.plugin.jdbc.JdbcProcedureHandle.ProcedureQuery;
Expand Down Expand Up @@ -261,19 +260,6 @@ public Optional<PreparedQuery> implementJoin(ConnectorSession session,
return stats.getImplementJoin().wrap(() -> delegate().implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics));
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
return stats.getImplementJoin().wrap(() -> delegate().legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics));
}

@Override
public Optional<String> getTableComment(ResultSet resultSet)
throws SQLException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcExpression;
import io.trino.plugin.jdbc.JdbcJoinCondition;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
Expand Down Expand Up @@ -184,13 +183,6 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming tables");
}

@Override
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
// Deactivated because test 'testJoinPushdown()' requires write access which is not implemented for Exasol
return false;
}

@Override
public Optional<JdbcExpression> implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcExpression;
import io.trino.plugin.jdbc.JdbcJoinCondition;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcSortItem;
import io.trino.plugin.jdbc.JdbcTableHandle;
Expand Down Expand Up @@ -592,31 +591,6 @@ public Optional<PreparedQuery> implementJoin(
return super.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics);
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
// Ignite does not support FULL JOIN
if (joinType == JoinType.FULL_OUTER) {
return Optional.empty();
}

return super.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics);
}

@Override
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
return true;
}

@Override
public void createSchema(ConnectorSession session, String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcExpression;
import io.trino.plugin.jdbc.JdbcJoinCondition;
import io.trino.plugin.jdbc.JdbcSortItem;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.JdbcTableHandle;
Expand Down Expand Up @@ -56,7 +55,6 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinCondition;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -662,38 +660,6 @@ public Optional<PreparedQuery> implementJoin(
return super.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics);
}

@Override
public Optional<PreparedQuery> legacyImplementJoin(
ConnectorSession session,
JoinType joinType,
PreparedQuery leftSource,
PreparedQuery rightSource,
List<JdbcJoinCondition> joinConditions,
Map<JdbcColumnHandle, String> rightAssignments,
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
if (joinType == JoinType.FULL_OUTER) {
// Not supported in MariaDB
return Optional.empty();
}
return super.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics);
}

@Override
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
if (joinCondition.getOperator() == JoinCondition.Operator.IDENTICAL) {
// Not supported in MariaDB
return false;
}

// Remote database can be case insensitive.
return Stream.of(joinCondition.getLeftColumn(), joinCondition.getRightColumn())
.map(JdbcColumnHandle::getColumnType)
.noneMatch(type -> type instanceof CharType || type instanceof VarcharType);
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle)
{
Expand Down
Loading