diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 6fa721a1e63..5e9d3c431f7 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -4,7 +4,7 @@ ## Description -Write data to Apache Paimon. +Sink connector for Apache Paimon. It can support cdc mode 、auto create table. ## Key features @@ -12,40 +12,76 @@ Write data to Apache Paimon. ## Options -| name | type | required | default value | -|----------------|--------|----------|---------------| -| warehouse | String | Yes | - | -| database | String | Yes | - | -| table | String | Yes | - | -| hdfs_site_path | String | No | - | +| name | type | required | default value | Description | +|------------------|--------|----------|------------------------------|---------------------------------| +| warehouse | String | Yes | - | Paimon warehouse path | +| database | String | Yes | - | The database you want to access | +| table | String | Yes | - | The table you want to access | +| hdfs_site_path | String | No | - | | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode | +| data_save_mode | Enum | no | APPEND_DATA | The data save mode | -### warehouse [string] - -Paimon warehouse path - -### database [string] +## Examples -The database you want to access +### Single table -### table [String] +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} -The table you want to access +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} -## Examples +transform { +} -```hocon sink { Paimon { - warehouse = "/tmp/paimon" - database = "default" - table = "st_test" + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" } } ``` -## Changelog +### Multiple table + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} -### next version +transform { +} -- Add Paimon Sink Connector +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${database_name}" + table="${table_name}" + } +} +``` diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md new file mode 100644 index 00000000000..b1b4baef9b1 --- /dev/null +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -0,0 +1,87 @@ +# Paimon + +> Paimon 数据连接器 + +## 描述 + +Apache Paimon数据连接器。支持cdc写以及自动建表。 + +## 主要特性 + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## 连接器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|------------------|--------|------|------------------------------|--------------------| +| warehouse | String | Yes | - | Paimon warehouse路径 | +| database | String | Yes | - | 数据库名称 | +| table | String | Yes | - | 表名 | +| hdfs_site_path | String | No | - | | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式 | +| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式 | + +## 示例 + +### 单表 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" + } +} +``` + +### 多表 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${database_name}" + table="${table_name}" + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index 8bcb1c35070..499165ea6fb 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -34,6 +34,11 @@ + + org.apache.seatunnel + connector-common + ${project.version} + org.apache.paimon diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java new file mode 100644 index 00000000000..7312ed28b06 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +@Slf4j +public class PaimonCatalog implements Catalog, PaimonTable { + private static final String DEFAULT_DATABASE = "default"; + + private String catalogName; + private ReadonlyConfig readonlyConfig; + private PaimonCatalogLoader paimonCatalogLoader; + private org.apache.paimon.catalog.Catalog catalog; + + public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) { + this.readonlyConfig = readonlyConfig; + this.catalogName = catalogName; + this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonSinkConfig(readonlyConfig)); + } + + @Override + public void open() throws CatalogException { + this.catalog = paimonCatalogLoader.loadCatalog(); + } + + @Override + public void close() throws CatalogException { + if (catalog != null && catalog instanceof Closeable) { + try { + ((Closeable) catalog).close(); + } catch (IOException e) { + log.error("Error while closing IcebergCatalog.", e); + throw new CatalogException(e); + } + } + } + + @Override + public String name() { + return this.catalogName; + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return DEFAULT_DATABASE; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return catalog.databaseExists(databaseName); + } + + @Override + public List listDatabases() throws CatalogException { + return catalog.listDatabases(); + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + try { + return catalog.listTables(databaseName); + } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + return catalog.tableExists(toIdentifier(tablePath)); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + try { + FileStoreTable paimonFileStoreTableTable = (FileStoreTable) getPaimonTable(tablePath); + return toCatalogTable(paimonFileStoreTableTable, tablePath); + } catch (Exception e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public Table getPaimonTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + try { + return catalog.getTable(toIdentifier(tablePath)); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + try { + catalog.createTable( + toIdentifier(tablePath), + SchemaUtil.toPaimonSchema(table.getTableSchema()), + ignoreIfExists); + } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) { + throw new TableAlreadyExistException(this.catalogName, tablePath); + } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + try { + catalog.createDatabase(tablePath.getDatabaseName(), ignoreIfExists); + } catch (org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException e) { + throw new DatabaseAlreadyExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + try { + catalog.dropDatabase(tablePath.getDatabaseName(), ignoreIfNotExists, true); + } catch (Exception e) { + throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + private CatalogTable toCatalogTable( + FileStoreTable paimonFileStoreTableTable, TablePath tablePath) { + org.apache.paimon.schema.TableSchema schema = paimonFileStoreTableTable.schema(); + List dataFields = schema.fields(); + TableSchema.Builder builder = TableSchema.builder(); + dataFields.forEach( + dataField -> { + String name = dataField.name(); + SeaTunnelDataType seaTunnelType = + SchemaUtil.toSeaTunnelType(dataField.type()); + PhysicalColumn physicalColumn = + PhysicalColumn.of( + name, + seaTunnelType, + (Long) null, + true, + null, + dataField.description()); + builder.column(physicalColumn); + }); + + List partitionKeys = schema.partitionKeys(); + + return CatalogTable.of( + org.apache.seatunnel.api.table.catalog.TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()), + builder.build(), + paimonFileStoreTableTable.options(), + partitionKeys, + null, + catalogName); + } + + private Identifier toIdentifier(TablePath tablePath) { + return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java new file mode 100644 index 00000000000..4d94f385d9f --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class PaimonCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) { + return new PaimonCatalog(catalogName, readonlyConfig); + } + + @Override + public String factoryIdentifier() { + return "Paimon"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + PaimonSinkConfig.WAREHOUSE, + PaimonSinkConfig.DATABASE, + PaimonSinkConfig.TABLE) + .optional( + PaimonSinkConfig.HDFS_SITE_PATH, + PaimonSinkConfig.SCHEMA_SAVE_MODE, + PaimonSinkConfig.DATA_SAVE_MODE) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java new file mode 100644 index 00000000000..bec66dbe3f2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; + +@Slf4j +public class PaimonCatalogLoader implements Serializable { + private PaimonSinkConfig config; + + public PaimonCatalogLoader(PaimonSinkConfig config) { + this.config = config; + } + + public Catalog loadCatalog() { + // When using the seatunel engine, set the current class loader to prevent loading failures + Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader()); + final String warehouse = config.getWarehouse(); + final Map optionsMap = new HashMap<>(); + optionsMap.put(WAREHOUSE.key(), warehouse); + final Options options = Options.fromMap(optionsMap); + final Configuration hadoopConf = new Configuration(); + String hdfsSitePathOptional = config.getHdfsSitePath(); + if (StringUtils.isNotBlank(hdfsSitePathOptional)) { + hadoopConf.addResource(new Path(hdfsSitePathOptional)); + } + final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); + return CatalogFactory.createCatalog(catalogContext); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java new file mode 100644 index 00000000000..55b18f79ab3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; + +import org.apache.paimon.table.Table; + +public interface PaimonTable { + Table getPaimonTable(TablePath tablePath) throws CatalogException, TableNotExistException; +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index b5299d87559..0396e6223af 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -22,13 +22,14 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import java.io.Serializable; import java.util.List; /** * Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link * SeaTunnelSink}. */ -public class PaimonConfig { +public class PaimonConfig implements Serializable { public static final Option WAREHOUSE = Options.key("warehouse") @@ -36,6 +37,12 @@ public class PaimonConfig { .noDefaultValue() .withDescription("The warehouse path of paimon"); + public static final Option CATALOG_NAME = + Options.key("catalog_name") + .stringType() + .defaultValue("paimon") + .withDescription(" the iceberg catalog name"); + public static final Option DATABASE = Options.key("database") .stringType() diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java new file mode 100644 index 00000000000..589fd948167 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SchemaSaveMode; + +import lombok.Getter; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +@Getter +public class PaimonSinkConfig extends PaimonConfig { + public static final Option SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + public static final Option DATA_SAVE_MODE = + Options.key("data_save_mode") + .enumType(DataSaveMode.class) + .defaultValue(DataSaveMode.APPEND_DATA) + .withDescription("data_save_mode"); + + private String catalogName; + private String warehouse; + private String namespace; + private String table; + private String hdfsSitePath; + private SchemaSaveMode schemaSaveMode; + private DataSaveMode dataSaveMode; + + public PaimonSinkConfig(ReadonlyConfig readonlyConfig) { + this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME)); + this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE)); + this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE)); + this.table = checkArgumentNotNull(readonlyConfig.get(TABLE)); + this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); + this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE); + this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE); + } + + protected T checkArgumentNotNull(T argument) { + checkNotNull(argument); + return argument; + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java new file mode 100644 index 00000000000..1f8b1cff32f --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.data; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; + +import org.apache.paimon.types.DataType; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(TypeConverter.class) +public class PaimonTypeMapper implements TypeConverter { + public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper(); + + @Override + public String identifier() { + return PaimonSink.PLUGIN_NAME; + } + + @Override + public Column convert(DataType dataType) { + return PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build(); + } + + @Override + public DataType reconvert(Column column) { + return RowTypeConverter.reconvert(column.getDataType()); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java new file mode 100644 index 00000000000..b479ebf14b0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.handler; + +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable; + +import org.apache.paimon.table.Table; + +public class PaimonSaveModeHandler extends DefaultSaveModeHandler { + + private SupportLoadTable supportLoadTable; + private Catalog catalog; + private CatalogTable catalogTable; + + public PaimonSaveModeHandler( + SupportLoadTable supportLoadTable, + SchemaSaveMode schemaSaveMode, + DataSaveMode dataSaveMode, + Catalog catalog, + CatalogTable catalogTable, + String customSql) { + super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql); + this.supportLoadTable = supportLoadTable; + this.catalog = catalog; + this.catalogTable = catalogTable; + } + + @Override + public void handleSchemaSaveMode() { + super.handleSchemaSaveMode(); + TablePath tablePath = catalogTable.getTablePath(); + Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); + // load paimon table and set it into paimon sink + this.supportLoadTable.setLoadTable(paimonTable); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index ac1a0b97edd..cdec4b0c760 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -17,54 +17,46 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; -import com.google.auto.service.AutoService; - import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; -@AutoService(SeaTunnelSink.class) public class PaimonSink implements SeaTunnelSink< - SeaTunnelRow, PaimonSinkState, PaimonCommitInfo, PaimonAggregatedCommitInfo> { + SeaTunnelRow, + PaimonSinkState, + PaimonCommitInfo, + PaimonAggregatedCommitInfo>, + SupportSaveMode, + SupportLoadTable
, + SupportMultiTableSink { private static final long serialVersionUID = 1L; @@ -72,79 +64,44 @@ public class PaimonSink private SeaTunnelRowType seaTunnelRowType; - private Config pluginConfig; - private Table table; - @Override - public String getPluginName() { - return PLUGIN_NAME; - } + private JobContext jobContext; - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key()); - if (!result.isSuccess()) { - throw new PaimonConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - // initialize paimon table - final String warehouse = pluginConfig.getString(WAREHOUSE.key()); - final String database = pluginConfig.getString(DATABASE.key()); - final String table = pluginConfig.getString(TABLE.key()); - final Map optionsMap = new HashMap<>(); - optionsMap.put(WAREHOUSE.key(), warehouse); - final Options options = Options.fromMap(optionsMap); - final Configuration hadoopConf = new Configuration(); - if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) { - hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key()))); - } - final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); - try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) { - Identifier identifier = Identifier.create(database, table); - this.table = catalog.getTable(identifier); - } catch (Exception e) { - String errorMsg = - String.format( - "Failed to get table [%s] from database [%s] on warehouse [%s]", - database, table, warehouse); - throw new PaimonConnectorException( - PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e); - } - } + private ReadonlyConfig readonlyConfig; - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + private PaimonSinkConfig paimonSinkConfig; + + private CatalogTable catalogTable; + + public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + this.readonlyConfig = readonlyConfig; + this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig); + this.catalogTable = catalogTable; + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); } @Override - public SeaTunnelDataType getConsumedType() { - return this.seaTunnelRowType; + public String getPluginName() { + return PLUGIN_NAME; } @Override public SinkWriter createWriter( SinkWriter.Context context) throws IOException { - return new PaimonSinkWriter(context, table, seaTunnelRowType); + return new PaimonSinkWriter(context, table, seaTunnelRowType, jobContext); } @Override public Optional> createAggregatedCommitter() throws IOException { - return Optional.of(new PaimonAggregatedCommitter(table)); + return Optional.of(new PaimonAggregatedCommitter(table, jobContext)); } @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { - return new PaimonSinkWriter(context, table, seaTunnelRowType, states); + return new PaimonSinkWriter(context, table, seaTunnelRowType, states, jobContext); } @Override @@ -156,4 +113,43 @@ public Optional> getAggregatedCommitInfoS public Optional> getCommitInfoSerializer() { return Optional.of(new DefaultSerializer<>()); } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } + + @Override + public Optional getSaveModeHandler() { + org.apache.seatunnel.api.table.factory.CatalogFactory catalogFactory = + discoverFactory( + Thread.currentThread().getContextClassLoader(), + org.apache.seatunnel.api.table.factory.CatalogFactory.class, + "Paimon"); + if (catalogFactory == null) { + throw new PaimonConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), + PluginType.SINK, + "Cannot find paimon catalog factory")); + } + org.apache.seatunnel.api.table.catalog.Catalog catalog = + catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), readonlyConfig); + catalog.open(); + return Optional.of( + new PaimonSaveModeHandler( + this, + paimonSinkConfig.getSchemaSaveMode(), + paimonSinkConfig.getDataSaveMode(), + catalog, + catalogTable, + null)); + } + + @Override + public void setLoadTable(Table table) { + this.table = table; + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index dfae43c4820..c0b4d997ead 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -17,16 +17,30 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; + +import org.apache.commons.lang3.StringUtils; import com.google.auto.service.AutoService; @AutoService(Factory.class) public class PaimonSinkFactory implements TableSinkFactory { + public static final String REPLACE_TABLE_NAME_KEY = "${table_name}"; + + public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}"; + + public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}"; + @Override public String factoryIdentifier() { return "Paimon"; @@ -35,10 +49,56 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(PaimonConfig.WAREHOUSE) - .required(PaimonConfig.DATABASE) - .required(PaimonConfig.TABLE) - .optional(PaimonConfig.HDFS_SITE_PATH) + .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE) + .optional( + PaimonConfig.HDFS_SITE_PATH, + PaimonSinkConfig.SCHEMA_SAVE_MODE, + PaimonSinkConfig.DATA_SAVE_MODE) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = + renameCatalogTable(new PaimonSinkConfig(readonlyConfig), context.getCatalogTable()); + return () -> new PaimonSink(context.getOptions(), catalogTable); + } + + private CatalogTable renameCatalogTable( + PaimonSinkConfig paimonSinkConfig, CatalogTable catalogTable) { + TableIdentifier tableId = catalogTable.getTableId(); + String tableName; + String namespace; + if (StringUtils.isNotEmpty(paimonSinkConfig.getTable())) { + tableName = replaceName(paimonSinkConfig.getTable(), tableId); + } else { + tableName = tableId.getTableName(); + } + + if (StringUtils.isNotEmpty(paimonSinkConfig.getNamespace())) { + namespace = replaceName(paimonSinkConfig.getNamespace(), tableId); + } else { + namespace = tableId.getSchemaName(); + } + + TableIdentifier newTableId = + TableIdentifier.of( + tableId.getCatalogName(), namespace, tableId.getSchemaName(), tableName); + + return CatalogTable.of(newTableId, catalogTable); + } + + private String replaceName(String original, TableIdentifier tableId) { + if (tableId.getTableName() != null) { + original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); + } + if (tableId.getSchemaName() != null) { + original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); + } + if (tableId.getDatabaseName() != null) { + original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); + } + return original; + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index 930f62045fd..7b2e8327a99 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -17,21 +17,28 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter; import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; -import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommit; +import org.apache.paimon.table.sink.TableWrite; +import org.apache.paimon.table.sink.WriteBuilder; import lombok.extern.slf4j.Slf4j; @@ -46,13 +53,14 @@ @Slf4j public class PaimonSinkWriter - implements SinkWriter { + implements SinkWriter, + SupportMultiTableSinkWriter { private String commitUser = UUID.randomUUID().toString(); - private final BatchWriteBuilder tableWriteBuilder; + private final WriteBuilder tableWriteBuilder; - private final BatchTableWrite tableWrite; + private final TableWrite tableWrite; private long checkpointId = 0; @@ -64,37 +72,58 @@ public class PaimonSinkWriter private final SinkWriter.Context context; - public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType seaTunnelRowType) { + private final JobContext jobContext; + + public PaimonSinkWriter( + Context context, + Table table, + SeaTunnelRowType seaTunnelRowType, + JobContext jobContext) { this.table = table; - this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite(); + this.tableWriteBuilder = + JobContextUtil.isBatchJob(jobContext) + ? this.table.newBatchWriteBuilder().withOverwrite() + : this.table.newStreamWriteBuilder(); this.tableWrite = tableWriteBuilder.newWrite(); this.seaTunnelRowType = seaTunnelRowType; this.context = context; + this.jobContext = jobContext; } public PaimonSinkWriter( Context context, Table table, SeaTunnelRowType seaTunnelRowType, - List states) { + List states, + JobContext jobContext) { this.table = table; - this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite(); + this.tableWriteBuilder = + JobContextUtil.isBatchJob(jobContext) + ? this.table.newBatchWriteBuilder().withOverwrite() + : this.table.newStreamWriteBuilder(); this.tableWrite = tableWriteBuilder.newWrite(); this.seaTunnelRowType = seaTunnelRowType; this.context = context; + this.jobContext = jobContext; if (Objects.isNull(states) || states.isEmpty()) { return; } this.commitUser = states.get(0).getCommitUser(); this.checkpointId = states.get(0).getCheckpointId(); - try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) { + try (TableCommit tableCommit = tableWriteBuilder.newCommit()) { List commitables = states.stream() .map(PaimonSinkState::getCommittables) .flatMap(List::stream) .collect(Collectors.toList()); log.info("Trying to recommit states {}", commitables); - tableCommit.commit(commitables); + if (JobContextUtil.isBatchJob(jobContext)) { + log.debug("Trying to recommit states batch mode"); + ((BatchTableCommit) tableCommit).commit(commitables); + } else { + log.debug("Trying to recommit states streaming mode"); + ((StreamTableCommit) tableCommit).commit(Objects.hash(commitables), commitables); + } } catch (Exception e) { throw new PaimonConnectorException( PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e); @@ -117,7 +146,13 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { try { - List fileCommittables = tableWrite.prepareCommit(); + List fileCommittables; + if (JobContextUtil.isBatchJob(jobContext)) { + fileCommittables = ((BatchTableWrite) tableWrite).prepareCommit(); + } else { + fileCommittables = + ((StreamTableWrite) tableWrite).prepareCommit(false, committables.size()); + } committables.addAll(fileCommittables); return Optional.of(new PaimonCommitInfo(fileCommittables)); } catch (Exception e) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java new file mode 100644 index 00000000000..734762e23ca --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.sink; + +public interface SupportLoadTable { + void setLoadTable(T table); +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java index 987d8fbb807..2c0be5d4241 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java @@ -17,14 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil; import org.apache.paimon.operation.Lock; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.TableCommit; +import org.apache.paimon.table.sink.WriteBuilder; import lombok.extern.slf4j.Slf4j; @@ -32,35 +38,49 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** Paimon connector aggregated committer class */ @Slf4j public class PaimonAggregatedCommitter - implements SinkAggregatedCommitter { + implements SinkAggregatedCommitter, + SupportMultiTableSinkAggregatedCommitter { private static final long serialVersionUID = 1L; private final Lock.Factory localFactory = Lock.emptyFactory(); - private final Table table; + private final WriteBuilder tableWriteBuilder; - public PaimonAggregatedCommitter(Table table) { - this.table = table; + private final JobContext jobContext; + + public PaimonAggregatedCommitter(Table table, JobContext jobContext) { + this.jobContext = jobContext; + this.tableWriteBuilder = + JobContextUtil.isBatchJob(jobContext) + ? table.newBatchWriteBuilder() + : table.newStreamWriteBuilder(); } @Override public List commit( List aggregatedCommitInfo) throws IOException { - try (BatchTableCommit tableCommit = - table.newBatchWriteBuilder().withOverwrite().newCommit()) { + try (TableCommit tableCommit = tableWriteBuilder.newCommit()) { List fileCommittables = aggregatedCommitInfo.stream() .map(PaimonAggregatedCommitInfo::getCommittables) .flatMap(List::stream) .flatMap(List::stream) .collect(Collectors.toList()); - tableCommit.commit(fileCommittables); + if (JobContextUtil.isBatchJob(jobContext)) { + log.debug("Trying to commit states batch mode"); + ((BatchTableCommit) tableCommit).commit(fileCommittables); + } else { + log.debug("Trying to commit states streaming mode"); + ((StreamTableCommit) tableCommit) + .commit(Objects.hash(fileCommittables), fileCommittables); + } } catch (Exception e) { throw new PaimonConnectorException( PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java new file mode 100644 index 00000000000..3a4d9b72d40 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.utils; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.common.constants.JobMode; + +import lombok.extern.slf4j.Slf4j; + +/** Job env util */ +@Slf4j +public class JobContextUtil { + + public static boolean isBatchJob(JobContext jobContext) { + return jobContext.getJobMode().equals(JobMode.BATCH); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index 44f8fb2624b..6b9a6bf01c5 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -333,6 +333,10 @@ public static InternalRow convert( SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) { BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields()); BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow); + // Convert SeaTunnel RowKind to Paimon RowKind + org.apache.paimon.types.RowKind rowKind = + RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind()); + binaryRow.setRowKind(rowKind); SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); for (int i = 0; i < fieldTypes.length; i++) { // judge the field is or not equals null @@ -393,8 +397,8 @@ public static InternalRow convert( MapType mapType = (MapType) seaTunnelRowType.getFieldType(i); SeaTunnelDataType keyType = mapType.getKeyType(); SeaTunnelDataType valueType = mapType.getValueType(); - DataType paimonKeyType = RowTypeConverter.convert(keyType); - DataType paimonValueType = RowTypeConverter.convert(valueType); + DataType paimonKeyType = RowTypeConverter.reconvert(keyType); + DataType paimonValueType = RowTypeConverter.reconvert(valueType); Map field = (Map) seaTunnelRow.getField(i); Object[] keys = field.keySet().toArray(new Object[0]); Object[] values = field.values().toArray(new Object[0]); @@ -411,13 +415,13 @@ public static InternalRow convert( i, paimonArray, new InternalArraySerializer( - RowTypeConverter.convert(arrayType.getElementType()))); + RowTypeConverter.reconvert(arrayType.getElementType()))); break; case ROW: SeaTunnelDataType rowType = seaTunnelRowType.getFieldType(i); Object row = seaTunnelRow.getField(i); InternalRow paimonRow = convert((SeaTunnelRow) row, (SeaTunnelRowType) rowType); - RowType paimonRowType = RowTypeConverter.convert((SeaTunnelRowType) rowType); + RowType paimonRowType = RowTypeConverter.reconvert((SeaTunnelRowType) rowType); binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType)); break; default: diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java new file mode 100644 index 00000000000..ce6a172e431 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.utils; + +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; + +import org.apache.paimon.data.InternalRow; + +public class RowKindConverter { + + /** + * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow} + * + * @param seaTunnelRowInd + * @return + */ + public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowKind( + RowKind seaTunnelRowInd) { + switch (seaTunnelRowInd) { + case DELETE: + return org.apache.paimon.types.RowKind.DELETE; + case UPDATE_AFTER: + return org.apache.paimon.types.RowKind.UPDATE_AFTER; + case UPDATE_BEFORE: + return org.apache.paimon.types.RowKind.UPDATE_BEFORE; + case INSERT: + return org.apache.paimon.types.RowKind.INSERT; + default: + throw new PaimonConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported rowKind type " + seaTunnelRowInd.shortString()); + } + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java index 4dfd6b69fa2..16863ebff5f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; @@ -70,13 +71,93 @@ public static SeaTunnelRowType convert(RowType rowType) { return new SeaTunnelRowType(fieldNames, dataTypes); } + /** + * Convert Paimon row type {@link DataType} to SeaTunnel row type {@link SeaTunnelDataType} + * + * @param dataType Paimon data type + * @return SeaTunnel data type {@link SeaTunnelDataType} + */ + public static SeaTunnelDataType convert(DataType dataType) { + SeaTunnelDataType seaTunnelDataType; + PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor = + PaimonToSeaTunnelTypeVisitor.INSTANCE; + switch (dataType.getTypeRoot()) { + case CHAR: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((CharType) dataType); + break; + case VARCHAR: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((VarCharType) dataType); + break; + case BOOLEAN: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((BooleanType) dataType); + break; + case BINARY: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((BinaryType) dataType); + break; + case VARBINARY: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((VarBinaryType) dataType); + break; + case DECIMAL: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((DecimalType) dataType); + break; + case TINYINT: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((TinyIntType) dataType); + break; + case SMALLINT: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((SmallIntType) dataType); + break; + case INTEGER: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((IntType) dataType); + break; + case BIGINT: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((BigIntType) dataType); + break; + case FLOAT: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((FloatType) dataType); + break; + case DOUBLE: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((DoubleType) dataType); + break; + case DATE: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((DateType) dataType); + break; + case TIME_WITHOUT_TIME_ZONE: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((TimeType) dataType); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((TimestampType) dataType); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + seaTunnelDataType = + paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType); + break; + case ARRAY: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType); + break; + case MAP: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((MapType) dataType); + break; + case ROW: + seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit((RowType) dataType); + break; + default: + String errorMsg = + String.format( + "Paimon dataType not support this genericType [%s]", + dataType.asSQLString()); + throw new PaimonConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg); + } + return seaTunnelDataType; + } + /** * Convert SeaTunnel row type {@link SeaTunnelRowType} to Paimon row type {@link RowType} * * @param seaTunnelRowType SeaTunnel row type {@link SeaTunnelRowType} * @return Paimon row type {@link RowType} */ - public static RowType convert(SeaTunnelRowType seaTunnelRowType) { + public static RowType reconvert(SeaTunnelRowType seaTunnelRowType) { SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); DataType[] dataTypes = Arrays.stream(fieldTypes) @@ -96,7 +177,7 @@ public static RowType convert(SeaTunnelRowType seaTunnelRowType) { * @param dataType SeaTunnel data type {@link SeaTunnelDataType} * @return Paimon data type {@link DataType} */ - public static DataType convert(SeaTunnelDataType dataType) { + public static DataType reconvert(SeaTunnelDataType dataType) { return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java new file mode 100644 index 00000000000..c03a77149c9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.utils; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper; + +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import java.util.Objects; + +/** The util seatunnel schema to paimon schema */ +public class SchemaUtil { + + public static DataType toPaimonType(Column column) { + return PaimonTypeMapper.INSTANCE.reconvert(column); + } + + public static Schema toPaimonSchema(TableSchema tableSchema) { + Schema.Builder paiSchemaBuilder = Schema.newBuilder(); + for (int i = 0; i < tableSchema.getColumns().size(); i++) { + Column column = tableSchema.getColumns().get(i); + paiSchemaBuilder.column(column.getName(), toPaimonType(column)); + } + PrimaryKey primaryKey = tableSchema.getPrimaryKey(); + if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size() > 0) { + paiSchemaBuilder.primaryKey(primaryKey.getColumnNames()); + } + return paiSchemaBuilder.build(); + } + + public static SeaTunnelDataType toSeaTunnelType(DataType dataType) { + return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType(); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java index f32b87f0070..f828be06505 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java @@ -103,7 +103,7 @@ public void paimonToSeaTunnel() { @Test public void seaTunnelToPaimon() { - RowType convert = RowTypeConverter.convert(seaTunnelRowType); + RowType convert = RowTypeConverter.reconvert(seaTunnelRowType); Assertions.assertEquals(convert, rowType); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml index 4af6e8436e8..69ea9a9f74f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml @@ -30,16 +30,25 @@ org.apache.seatunnel connector-fake ${project.version} + test org.apache.seatunnel connector-paimon ${project.version} + test + + + org.apache.seatunnel + seatunnel-hadoop3-3.1.4-uber + optional + test org.apache.seatunnel connector-assert ${project.version} + test diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java new file mode 100644 index 00000000000..a960f7d4d37 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") +@Slf4j +public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { + private static final String CATALOG_ROOT_DIR = "/tmp/"; + private static final String NAMESPACE = "paimon"; + private static final String NAMESPACE_TAR = "paimon.tar.gz"; + private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/"; + private static final String TARGET_TABLE = "st_test"; + private static final String TARGET_DATABASE = "seatunnel_namespace"; + private static final String FAKE_TABLE1 = "FakeTable1"; + private static final String FAKE_DATABASE1 = "FakeDatabase1"; + private static final String FAKE_TABLE2 = "FakeTable1"; + private static final String FAKE_DATABASE2 = "FakeDatabase2"; + + @BeforeAll + @Override + public void startUp() throws Exception {} + + @AfterAll + @Override + public void tearDown() throws Exception {} + + @TestTemplate + public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // copy paimon to local + container.executeExtraCommands(containerExtendedFactory); + List paimonRecords = + loadPaimonData(TARGET_DATABASE, TARGET_TABLE); + Assertions.assertEquals(2, paimonRecords.size()); + paimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + }); + + cleanPaimonTable(container); + } + + @TestTemplate + public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // copy paimon to local + container.executeExtraCommands(containerExtendedFactory); + // Check FakeDatabase1.FakeTable1 + List fake1PaimonRecords = + loadPaimonData(FAKE_DATABASE1, FAKE_TABLE1); + Assertions.assertEquals(2, fake1PaimonRecords.size()); + fake1PaimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + // Check FakeDatabase2.FakeTable1 + List fake2PaimonRecords = + loadPaimonData(FAKE_DATABASE2, FAKE_TABLE2); + Assertions.assertEquals(2, fake2PaimonRecords.size()); + fake2PaimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 100) { + Assertions.assertEquals( + "A_100", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 200) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + }); + + cleanPaimonTable(container); + } + + protected final ContainerExtendedFactory cleanContainerExtendedFactory = + genericContainer -> + genericContainer.execInContainer("sh", "-c", "rm -rf " + CATALOG_DIR + "**"); + + private void cleanPaimonTable(TestContainer container) + throws IOException, InterruptedException { + // clean table + container.executeExtraCommands(cleanContainerExtendedFactory); + } + + protected final ContainerExtendedFactory containerExtendedFactory = + container -> { + FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); + FileUtils.createNewDir(CATALOG_DIR); + container.execInContainer( + "sh", + "-c", + "cd " + + CATALOG_ROOT_DIR + + " && tar -czvf " + + NAMESPACE_TAR + + " " + + NAMESPACE); + container.copyFileFromContainer( + CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + NAMESPACE_TAR); + extractFiles(); + }; + + private void extractFiles() { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command( + "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + NAMESPACE_TAR); + try { + Process process = processBuilder.start(); + // wait command completed + int exitCode = process.waitFor(); + if (exitCode == 0) { + log.info("Extract files successful."); + } else { + log.error("Extract files failed with exit code " + exitCode); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + private List loadPaimonData(String dbName, String tbName) throws Exception { + Table table = getTable(dbName, tbName); + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List result = new ArrayList<>(); + log.info( + "====================================Paimon data==========================================="); + log.info( + "=========================================================================================="); + log.info( + "=========================================================================================="); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> { + result.add(new PaimonRecord(row.getLong(0), row.getString(1).toString())); + log.info("key_id:" + row.getLong(0) + ", name:" + row.getString(1)); + }); + } + log.info( + "=========================================================================================="); + log.info( + "=========================================================================================="); + log.info( + "=========================================================================================="); + return result; + } + + private Table getTable(String dbName, String tbName) { + try { + return getCatalog().getTable(getIdentifier(dbName, tbName)); + } catch (Catalog.TableNotExistException e) { + // do something + throw new RuntimeException("table not exist"); + } + } + + private Identifier getIdentifier(String dbName, String tbName) { + return Identifier.create(dbName, tbName); + } + + private Catalog getCatalog() { + Options options = new Options(); + options.set("warehouse", "file://" + CATALOG_DIR); + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + return catalog; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public class PaimonRecord { + private Long pkId; + private String name; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf new file mode 100644 index 00000000000..59e3a0cf727 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "seatunnel_namespace" + table = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf new file mode 100644 index 00000000000..ddc92268710 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "FakeDatabase1.FakeTable1" + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + }, + { + schema = { + table = "FakeDatabase2.FakeTable1" + fields { + pk_id = bigint + name = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [100, "A"] + }, + { + kind = INSERT + fields = [200, "B"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + }, + { + kind = INSERT + fields = [300, "C"] + } + { + kind = UPDATE_BEFORE + fields = [100, "A"] + }, + { + kind = UPDATE_AFTER + fields = [100, "A_100"] + }, + { + kind = DELETE + fields = [200, "B"] + } + ] + } + ] + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "${database_name}" + table = "${table_name}" + } +} diff --git a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml index 00b55265c49..be5ced9214a 100644 --- a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml +++ b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml @@ -47,6 +47,11 @@ hadoop-client ${hadoop3.version} + + org.xerial.snappy + snappy-java + 1.1.10.4 +