Skip to content

Commit

Permalink
Only allow whitelisted extra properties
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Oct 29, 2024
1 parent 1437d15 commit 71dd6dc
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 22 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ implementation is used:
- Enable [sorted writing](iceberg-sorted-files) to tables with a specified sort order. Equivalent
session property is `sorted_writing_enabled`.
- `true`
* - `iceberg.allowed-extra-properties`
- List of extra properties that are allowed to be set on Iceberg tables.
Use `*` to allow all properties.
- `[]`
* - `iceberg.split-manager-threads`
- Number of threads to use for generating splits.
- Double the number of processors on the coordinator node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
Expand All @@ -28,6 +29,7 @@
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.util.List;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -85,6 +87,7 @@ public class IcebergConfig
private boolean queryPartitionFilterRequired;
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private List<String> allowedExtraProperties = ImmutableList.of();
private boolean incrementalRefreshEnabled = true;
private boolean metadataCacheEnabled = true;

Expand Down Expand Up @@ -469,6 +472,19 @@ public IcebergConfig setSplitManagerThreads(int splitManagerThreads)
return this;
}

public List<String> getAllowedExtraProperties()
{
return allowedExtraProperties;
}

@Config("iceberg.allowed-extra-properties")
@ConfigDescription("List of extra properties that are allowed to be set on Iceberg tables")
public IcebergConfig setAllowedExtraProperties(List<String> allowedExtraProperties)
{
this.allowedExtraProperties = ImmutableList.copyOf(allowedExtraProperties);
return this;
}

public boolean isIncrementalRefreshEnabled()
{
return incrementalRefreshEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public class IcebergMetadata
private final TableStatisticsWriter tableStatisticsWriter;
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;

private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -399,7 +400,8 @@ public IcebergMetadata(
IcebergFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter,
Optional<HiveMetastoreFactory> metastoreFactory,
boolean addFilesProcedureEnabled)
boolean addFilesProcedureEnabled,
Predicate<String> allowedExtraProperties)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -409,6 +411,7 @@ public IcebergMetadata(
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.addFilesProcedureEnabled = addFilesProcedureEnabled;
this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null");
}

@Override
Expand Down Expand Up @@ -1059,7 +1062,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
tableLocation = getTableLocation(tableMetadata.getProperties())
.orElseGet(() -> catalog.defaultTableLocation(session, tableMetadata.getTable()));
}
transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation);
transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation, allowedExtraProperties);
Location location = Location.of(transaction.table().location());
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), transaction.table().io().properties());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
Expand All @@ -23,6 +25,7 @@
import io.trino.spi.type.TypeManager;

import java.util.Optional;
import java.util.function.Predicate;

import static java.util.Objects.requireNonNull;

Expand All @@ -36,6 +39,7 @@ public class IcebergMetadataFactory
private final TableStatisticsWriter tableStatisticsWriter;
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;

@Inject
public IcebergMetadataFactory(
Expand All @@ -56,6 +60,12 @@ public IcebergMetadataFactory(
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled();
if (config.getAllowedExtraProperties().equals(ImmutableList.of("*"))) {
this.allowedExtraProperties = _ -> true;
}
else {
this.allowedExtraProperties = ImmutableSet.copyOf(requireNonNull(config.getAllowedExtraProperties(), "allowedExtraProperties is null"))::contains;
}
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -68,6 +78,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
fileSystemFactory,
tableStatisticsWriter,
metastoreFactory,
addFilesProcedureEnabled);
addFilesProcedureEnabled,
allowedExtraProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -792,20 +793,20 @@ public static List<ViewColumn> viewColumnsFromSchema(TypeManager typeManager, Sc
.toList();
}

public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session, boolean replace, String tableLocation)
public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session, boolean replace, String tableLocation, Predicate<String> allowedExtraProperties)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
Schema schema = schemaFromMetadata(tableMetadata.getColumns());
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties()));

if (replace) {
return catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata));
return catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata, allowedExtraProperties));
}
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata));
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata, allowedExtraProperties));
}

public static Map<String, String> createTableProperties(ConnectorTableMetadata tableMetadata)
public static Map<String, String> createTableProperties(ConnectorTableMetadata tableMetadata, Predicate<String> allowedExtraProperties)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
Expand Down Expand Up @@ -838,14 +839,19 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
Map<String, String> baseProperties = propertiesBuilder.buildOrThrow();
Map<String, String> extraProperties = IcebergTableProperties.getExtraProperties(tableMetadata.getProperties()).orElseGet(ImmutableMap::of);

Set<String> illegalExtraProperties = Sets.intersection(
ImmutableSet.<String>builder()
.add(TABLE_COMMENT)
.addAll(baseProperties.keySet())
.addAll(SUPPORTED_PROPERTIES)
.addAll(PROTECTED_ICEBERG_NATIVE_PROPERTIES)
.build(),
extraProperties.keySet());
Set<String> illegalExtraProperties = ImmutableSet.<String>builder()
.addAll(Sets.intersection(
ImmutableSet.<String>builder()
.add(TABLE_COMMENT)
.addAll(baseProperties.keySet())
.addAll(SUPPORTED_PROPERTIES)
.addAll(PROTECTED_ICEBERG_NATIVE_PROPERTIES)
.build(),
extraProperties.keySet()))
.addAll(extraProperties.keySet().stream()
.filter(name -> !allowedExtraProperties.test(name))
.collect(toImmutableSet()))
.build();

if (!illegalExtraProperties.isEmpty()) {
throw new TrinoException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ protected Location createMaterializedViewStorage(
Schema schema = schemaFromMetadata(columns);
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(materializedViewProperties));
SortOrder sortOrder = parseSortFields(schema, getSortOrder(materializedViewProperties));
Map<String, String> properties = createTableProperties(new ConnectorTableMetadata(storageTableName, columns, materializedViewProperties, Optional.empty()));
Map<String, String> properties = createTableProperties(new ConnectorTableMetadata(storageTableName, columns, materializedViewProperties, Optional.empty()), _ -> false);

TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, tableLocation, properties);

Expand Down Expand Up @@ -347,7 +347,7 @@ protected SchemaTableName createMaterializedViewStorageTable(
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, materializedViewProperties, Optional.empty());
String tableLocation = getTableLocation(tableMetadata.getProperties())
.orElseGet(() -> defaultTableLocation(session, tableMetadata.getTable()));
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false, tableLocation);
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false, tableLocation, _ -> false);
AppendFiles appendFiles = transaction.newAppend();
commit(appendFiles, session);
transaction.commitTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ protected IcebergQueryRunner.Builder createQueryRunnerBuilder()
return IcebergQueryRunner.builder()
.setIcebergProperties(ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.put("iceberg.allowed-extra-properties", "extra.property.one,extra.property.two,extra.property.three,sorted_by")
// Allows testing the sorting writer flushing to the file system with smaller tables
.put("iceberg.writer-sort-buffer-size", "1MB")
.buildOrThrow())
Expand Down Expand Up @@ -8515,6 +8516,10 @@ public void testIllegalExtraPropertyKey()
assertQueryFails(
"CREATE TABLE test_create_table_with_as_illegal_extra_properties WITH (extra_properties = MAP(ARRAY['comment'], ARRAY['some comment'])) AS SELECT 1 as c1",
"\\QIllegal keys in extra_properties: [comment]");

assertQueryFails(
"CREATE TABLE test_create_table_with_as_illegal_extra_properties WITH (extra_properties = MAP(ARRAY['not_allowed_property'], ARRAY['foo'])) AS SELECT 1 as c1",
"\\QIllegal keys in extra_properties: [not_allowed_property]");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void testDefaults()
.setQueryPartitionFilterRequired(false)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setAllowedExtraProperties(ImmutableList.of())
.setIncrementalRefreshEnabled(true)
.setMetadataCacheEnabled(true));
}
Expand Down Expand Up @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
.put("iceberg.split-manager-threads", "42")
.put("iceberg.allowed-extra-properties", "propX,propY")
.put("iceberg.incremental-refresh-enabled", "false")
.put("iceberg.metadata-cache.enabled", "false")
.buildOrThrow();
Expand Down Expand Up @@ -138,6 +141,7 @@ public void testExplicitPropertyMappings()
.setQueryPartitionFilterRequired(true)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
.setSplitManagerThreads(42)
.setAllowedExtraProperties(ImmutableList.of("propX", "propY"))
.setIncrementalRefreshEnabled(false)
.setMetadataCacheEnabled(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected QueryRunner createQueryRunner()
.put("s3.streaming.part-size", "5MB") // minimize memory usage
.put("s3.max-connections", "8") // verify no leaks
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.allowed-extra-properties", "extra.property.one,extra.property.two,extra.property.three")
// Allows testing the sorting writer flushing to the file system with smaller tables
.put("iceberg.writer-sort-buffer-size", "1MB")
.buildOrThrow())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public void testNonLowercaseNamespace()
},
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false);
false,
_ -> false);
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void testNonLowercaseGlueDatabase()
},
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false);
false,
_ -> false);
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public void testNonLowercaseNamespace()
},
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false);
false,
_ -> false);
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void testNonLowercaseNamespace()
},
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false);
false,
_ -> false);
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void testNonLowercaseNamespace()
},
new TableStatisticsWriter(new NodeVersion("test-version")),
Optional.empty(),
false);
false,
_ -> false);
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ iceberg.jdbc-catalog.connection-password=test
iceberg.jdbc-catalog.catalog-name=iceberg_test
iceberg.jdbc-catalog.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse
hive.hdfs.socks-proxy=hadoop-master:1180
iceberg.allowed-extra-properties=custom.table-property
fs.hadoop.enabled=true
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-with-rest:8181/
iceberg.allowed-extra-properties=custom.table-property
fs.hadoop.enabled=true
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
connector.name=iceberg
hive.metastore.uri=thrift://hadoop-master:9083
iceberg.register-table-procedure.enabled=true
iceberg.allowed-extra-properties=*
fs.hadoop.enabled=true

0 comments on commit 71dd6dc

Please sign in to comment.