Skip to content

Commit

Permalink
[Feature][Paimon] paimon sink support savemode
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Mar 8, 2024
1 parent a70d696 commit 5b32488
Show file tree
Hide file tree
Showing 12 changed files with 698 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;

import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

@Slf4j
public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private ReadonlyConfig readonlyConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
this.readonlyConfig = readonlyConfig;
this.catalogName = catalogName;
this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonSinkConfig(readonlyConfig));
}

@Override
public void open() throws CatalogException {
this.catalog = paimonCatalogLoader.loadCatalog();
}

@Override
public void close() throws CatalogException {
if (catalog != null && catalog instanceof Closeable) {
try {
((Closeable) catalog).close();
} catch (IOException e) {
log.error("Error while closing IcebergCatalog.", e);
throw new CatalogException(e);
}
}
}

@Override
public String name() {
return this.catalogName;
}

@Override
public String getDefaultDatabase() throws CatalogException {
return DEFAULT_DATABASE;
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return catalog.databaseExists(databaseName);
}

@Override
public List<String> listDatabases() throws CatalogException {
return catalog.listDatabases();
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
try {
return catalog.listTables(databaseName);
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
return catalog.tableExists(toIdentifier(tablePath));
}

@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
try {
FileStoreTable paimonFileStoreTableTable = (FileStoreTable) getPaimonTable(tablePath);
return toCatalogTable(paimonFileStoreTableTable, tablePath);
} catch (Exception e) {
throw new TableNotExistException(this.catalogName, tablePath);
}
}

@Override
public Table getPaimonTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
try {
return catalog.getTable(toIdentifier(tablePath));
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
throw new TableNotExistException(this.catalogName, tablePath);
}
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
try {
catalog.createTable(
toIdentifier(tablePath),
SchemaUtil.toPaimonSchema(table.getTableSchema()),
ignoreIfExists);
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistException(this.catalogName, tablePath);
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName());
}
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
throw new TableNotExistException(this.catalogName, tablePath);
}
}

@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
try {
catalog.createDatabase(tablePath.getDatabaseName(), ignoreIfExists);
} catch (org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException e) {
throw new DatabaseAlreadyExistException(this.catalogName, tablePath.getDatabaseName());
}
}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
try {
catalog.dropDatabase(tablePath.getDatabaseName(), ignoreIfNotExists, true);
} catch (Exception e) {
throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName());
}
}

private CatalogTable toCatalogTable(
FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
org.apache.paimon.schema.TableSchema schema = paimonFileStoreTableTable.schema();
List<DataField> dataFields = schema.fields();
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
String name = dataField.name();
SeaTunnelDataType<?> seaTunnelType =
SchemaUtil.toSeaTunnelType(dataField.type());
PhysicalColumn physicalColumn =
PhysicalColumn.of(
name,
seaTunnelType,
(Long) null,
true,
null,
dataField.description());
builder.column(physicalColumn);
});

List<String> partitionKeys = schema.partitionKeys();

return CatalogTable.of(
org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
paimonFileStoreTableTable.options(),
partitionKeys,
null,
catalogName);
}

private Identifier toIdentifier(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class PaimonCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
return new PaimonCatalog(catalogName, readonlyConfig);
}

@Override
public String factoryIdentifier() {
return "Paimon";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
PaimonSinkConfig.WAREHOUSE,
PaimonSinkConfig.DATABASE,
PaimonSinkConfig.TABLE)
.optional(
PaimonSinkConfig.HDFS_SITE_PATH,
PaimonSinkConfig.SCHEMA_SAVE_MODE,
PaimonSinkConfig.DATA_SAVE_MODE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;

import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.Options;

import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;

@Slf4j
public class PaimonCatalogLoader implements Serializable {
private PaimonSinkConfig config;

public PaimonCatalogLoader(PaimonSinkConfig config) {
this.config = config;
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
final String warehouse = config.getWarehouse();
final Map<String, String> optionsMap = new HashMap<>();
optionsMap.put(WAREHOUSE.key(), warehouse);
final Options options = Options.fromMap(optionsMap);
final Configuration hadoopConf = new Configuration();
String hdfsSitePathOptional = config.getHdfsSitePath();
if (StringUtils.isNotBlank(hdfsSitePathOptional)) {
hadoopConf.addResource(new Path(hdfsSitePathOptional));
}
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
return CatalogFactory.createCatalog(catalogContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;

import org.apache.paimon.table.Table;

public interface PaimonTable {
Table getPaimonTable(TablePath tablePath) throws CatalogException, TableNotExistException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;

import java.io.Serializable;
import java.util.List;

/**
* Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link
* SeaTunnelSink}.
*/
public class PaimonConfig {
public class PaimonConfig implements Serializable {

public static final Option<String> WAREHOUSE =
Options.key("warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse path of paimon");

public static final Option<String> CATALOG_NAME =
Options.key("catalog_name")
.stringType()
.defaultValue("paimon")
.withDescription(" the iceberg catalog name");

public static final Option<String> DATABASE =
Options.key("database")
.stringType()
Expand Down
Loading

0 comments on commit 5b32488

Please sign in to comment.