diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index 4ff2ee9d8dce..c5fe9fb19a6c 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -1,7 +1,7 @@ # Resource Center Configuration - You could use `Resource Center` to upload text files and other task-related files. -- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html) etc. +- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html), [Tencent Cloud COS](https://cloud.tencent.com/product/cos), etc. - You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resource Center` without the need of an external `HDFS` system or `S3`. - Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resource Center`. In this way, you could operate remote files as if on your local machines. @@ -96,3 +96,21 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com > * If you want to use the resource upload function, the deployment user in [installation and deployment](../installation/standalone.md) must have relevant operation authority. > * If you using a Hadoop cluster with HA, you need to enable HDFS resource upload, and you need to copy the `core-site.xml` and `hdfs-site.xml` under the Hadoop cluster to `worker-server/conf` and `api-server/conf`, otherwise skip this copy step. +## connect COS + +if you want to upload resources to `Resource Center` connected to `COS`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following: + +config the following fields + +```properties +# access key id, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.id= +# access key secret, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.secret= +# cos bucket name, required if you set resource.storage.type=COS +resource.tencent.cloud.cos.bucket.name=dolphinscheduler +# cos bucket region, required if you set resource.storage.type=COS, refer to https://cloud.tencent.com/document/product/436/6224 +resource.tencent.cloud.cos.region=ap-nanjing + +``` + diff --git a/docs/docs/zh/guide/resource/configuration.md b/docs/docs/zh/guide/resource/configuration.md index 62c8cf135e7f..018f9d8099f3 100644 --- a/docs/docs/zh/guide/resource/configuration.md +++ b/docs/docs/zh/guide/resource/configuration.md @@ -1,7 +1,7 @@ # 资源中心配置详情 - 资源中心通常用于上传文件以及任务组管理等操作。 -- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html) 等。 +- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html),[腾讯云 COS](https://cloud.tencent.com/product/cos) 等。 - 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。 - 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。 @@ -90,3 +90,19 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com > * 如果用到资源上传的功能,那么[安装部署](../installation/standalone.md)中,部署用户需要有这部分的操作权限。 > * 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 `core-site.xml` 和 `hdfs-site.xml` 复制到 `worker-server/conf` 以及 `api-server/conf`,非 NameNode HA 跳过此步骤。 +## 对接腾讯云 COS + +如果需要使用到资源中心的 COS 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下: + +```properties +# access key id, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.id= +# access key secret, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.secret= +# cos bucket name, required if you set resource.storage.type=COS +resource.tencent.cloud.cos.bucket.name=dolphinscheduler +# cos bucket region, required if you set resource.storage.type=COS, refer to https://cloud.tencent.com/document/product/436/6224 +resource.tencent.cloud.cos.region=ap-nanjing + +``` + diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index b7efb4815cf1..47ddbf0d485a 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -117,6 +117,7 @@ 1.2.10 3.17.2 3.23.3 + 5.6.231 1.2.1 0.10.1 1.19.3 @@ -934,6 +935,22 @@ + + com.qcloud + cos_api + ${qcloud-cos.version} + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-api + + + + com.github.stefanbirkner system-lambda diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index cb0b81201107..867b99fa1164 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -124,6 +124,9 @@ public final class Constants { public static final String HUAWEI_CLOUD_OBS_BUCKET_NAME = "resource.huawei.cloud.obs.bucket.name"; public static final String HUAWEI_CLOUD_OBS_END_POINT = "resource.huawei.cloud.obs.endpoint"; + public static final String TENCENT_CLOUD_COS_BUCKET_NAME = "resource.tencent.cloud.cos.bucket.name"; + public static final String TENCENT_CLOUD_COS_REGION = "resource.tencent.cloud.cos.region"; + /** * fetch applicationId way */ diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index e0704bebe58f..d78f09b49681 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, COS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL @@ -73,6 +73,16 @@ resource.huawei.cloud.obs.bucket.name=dolphinscheduler resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com +# tencent cloud access key id, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.id= +# access key secret, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.secret= +# cos bucket name, required if you set resource.storage.type=COS +resource.tencent.cloud.cos.bucket.name=dolphinscheduler +# cos bucket region, required if you set resource.storage.type=COS, see: https://cloud.tencent.com/document/product/436/6224 +resource.tencent.cloud.cos.region=ap-nanjing + + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir diff --git a/dolphinscheduler-common/src/test/resources/common.properties b/dolphinscheduler-common/src/test/resources/common.properties index ce8ef3bf4f42..2e4ad4f0957a 100644 --- a/dolphinscheduler-common/src/test/resources/common.properties +++ b/dolphinscheduler-common/src/test/resources/common.properties @@ -27,7 +27,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, COS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL @@ -83,6 +83,17 @@ resource.azure.blob.storage.account.name= # abs connection string, required if you set resource.storage.type=ABS resource.azure.blob.storage.connection.string= + +# tencent cloud access key id, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.id= +# access key secret, required if you set resource.storage.type=COS +resource.tencent.cloud.access.key.secret= +# cos bucket name, required if you set resource.storage.type=COS +resource.tencent.cloud.cos.bucket.name=dolphinscheduler +# cos bucket region, required if you set resource.storage.type=COS, see: https://cloud.tencent.com/document/product/436/6224 +resource.tencent.cloud.cos.region=ap-nanjing + + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index a4cdfeadabbc..ceba727734e3 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -713,6 +713,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. azure-core-management 1.10.1: https://mvnrepository.com/artifact/com.azure/azure-core-management/1.10.1, MIT azure-storage-blob 12.21.0: https://mvnrepository.com/artifact/com.azure/azure-storage-blob/12.21.0, MIT azure-storage-internal-avro 12.6.0: https://mvnrepository.com/artifact/com.azure/azure-storage-internal-avro/12.6.0, MIT + cos_api 5.6.231 https://mvnrepository.com/artifact/com.qcloud/cos_api/5.6.231, MIT ======================================================================== MPL 1.1 licenses diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml index 4d6edcd1cc76..8cb45a1a1ac8 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml @@ -62,6 +62,11 @@ dolphinscheduler-storage-obs ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-storage-cos + ${project.version} + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java index 7ead2c8a9488..bd561138394f 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java @@ -29,7 +29,11 @@ public enum StorageType { ABS(5, "ABS"), - OBS(6, "OBS"); + OBS(6, "OBS"), + + COS(7, "COS"), + + ; private final int code; private final String name; diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/pom.xml new file mode 100644 index 000000000000..9145995a2690 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-storage-plugin + dev-SNAPSHOT + + + dolphinscheduler-storage-cos + + + + com.qcloud + cos_api + + + + org.apache.dolphinscheduler + dolphinscheduler-storage-api + ${project.version} + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperator.java new file mode 100644 index 000000000000..05603ac4ec9b --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperator.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.cos; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.plugin.storage.api.AbstractStorageOperator; +import org.apache.dolphinscheduler.plugin.storage.api.ResourceMetadata; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; + +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.exception.CosServiceException; +import com.qcloud.cos.http.HttpProtocol; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectSummary; +import com.qcloud.cos.model.CopyObjectRequest; +import com.qcloud.cos.model.CopyResult; +import com.qcloud.cos.model.GetObjectRequest; +import com.qcloud.cos.model.ListObjectsRequest; +import com.qcloud.cos.model.ObjectListing; +import com.qcloud.cos.model.ObjectMetadata; +import com.qcloud.cos.model.PutObjectRequest; +import com.qcloud.cos.model.UploadResult; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.transfer.Copy; +import com.qcloud.cos.transfer.Download; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos.transfer.TransferManagerConfiguration; +import com.qcloud.cos.transfer.Upload; + +@Slf4j +public class CosStorageOperator extends AbstractStorageOperator implements Closeable, StorageOperator { + + private static final int TRANSFER_THREAD_POOL_SIZE = 16; + + private static final long MULTIPART_UPLOAD_BYTES_THRESHOLD = 5 * 1024 * 1024; + private static final long MIN_UPLOAD_PART_BYTES = 1024 * 1024; + + private final String bucketName; + + private final COSClient cosClient; + + private final TransferManager cosTransferManager; + + public CosStorageOperator(CosStorageProperties cosStorageProperties) { + super(cosStorageProperties.getResourceUploadPath()); + String secretId = cosStorageProperties.getAccessKeyId(); + String secretKey = cosStorageProperties.getAccessKeySecret(); + COSCredentials cosCredentials = new BasicCOSCredentials(secretId, secretKey); + String regionName = cosStorageProperties.getRegion(); + ClientConfig clientConfig = new ClientConfig(new Region(regionName)); + clientConfig.setHttpProtocol(HttpProtocol.https); + this.cosClient = new COSClient(cosCredentials, clientConfig); + this.bucketName = cosStorageProperties.getBucketName(); + ensureBucketSuccessfullyCreated(bucketName); + this.cosTransferManager = getCosTransferManager(); + } + + @Override + public void close() throws IOException { + this.cosTransferManager.shutdownNow(true); + } + + @Override + public String getStorageBaseDirectory() { + // All directory should end with File.separator + if (resourceBaseAbsolutePath.startsWith("/")) { + log.warn("{} -> {} should not start with / in cos", Constants.RESOURCE_UPLOAD_PATH, + resourceBaseAbsolutePath); + return resourceBaseAbsolutePath.substring(1); + } + return resourceBaseAbsolutePath; + } + + @SneakyThrows + @Override + public void createStorageDir(String directoryAbsolutePath) { + String cosKey = transformAbsolutePathToCOSKey(directoryAbsolutePath); + if (cosClient.doesObjectExist(bucketName, cosKey)) { + throw new FileAlreadyExistsException("directory: " + cosKey + " already exists"); + } + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(0L); + InputStream emptyContent = new ByteArrayInputStream(new byte[0]); + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, cosKey, emptyContent, metadata); + cosClient.putObject(putObjectRequest); + } + + @SneakyThrows + @Override + public void download(String srcFilePath, String dstFilePath, boolean overwrite) { + String cosKey = transformAbsolutePathToCOSKey(srcFilePath); + + File dstFile = new File(dstFilePath); + if (dstFile.isDirectory()) { + Files.delete(dstFile.toPath()); + } else { + FileUtils.createDirectoryWith755(dstFile.getParentFile().toPath()); + } + + GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, cosKey); + Download download = cosTransferManager.download(getObjectRequest, dstFile); + download.waitForCompletion(); + } + + @Override + public boolean exists(String fileName) { + String cosKey = transformAbsolutePathToCOSKey(fileName); + return cosClient.doesObjectExist(bucketName, cosKey); + } + + @Override + public void delete(String filePath, boolean recursive) { + String cosKey = transformAbsolutePathToCOSKey(filePath); + cosClient.deleteObject(bucketName, cosKey); + } + + @SneakyThrows + @Override + public void copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) { + String srcCosKey = transformAbsolutePathToCOSKey(srcPath); + String destCosKey = transformAbsolutePathToCOSKey(dstPath); + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, srcCosKey, bucketName, destCosKey); + Copy copy = cosTransferManager.copy(copyObjectRequest, cosClient, null); + + CopyResult copyResult = copy.waitForCopyResult(); + if (copyResult != null && deleteSource) { + cosClient.deleteObject(bucketName, srcPath); + } + } + + @SneakyThrows + @Override + public void upload(String srcFile, String dstPath, boolean deleteSource, boolean overwrite) { + dstPath = transformAbsolutePathToCOSKey(dstPath); + + if (cosClient.doesObjectExist(bucketName, dstPath)) { + if (!overwrite) { + throw new CosServiceException("file: " + dstPath + " already exists"); + } else { + cosClient.deleteObject(bucketName, dstPath); + } + } + + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, dstPath, new File(srcFile)); + Upload upload = cosTransferManager.upload(putObjectRequest); + UploadResult uploadResult = upload.waitForUploadResult(); + if (uploadResult != null && deleteSource) { + Files.delete(Paths.get(srcFile)); + } + } + + @SneakyThrows + @Override + public List fetchFileContent(String filePath, int skipLineNums, int limit) { + String cosKey = transformAbsolutePathToCOSKey(filePath); + + COSObject cosObject = cosClient.getObject(bucketName, cosKey); + try ( + InputStreamReader inputStreamReader = new InputStreamReader(cosObject.getObjectContent()); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + return bufferedReader + .lines() + .skip(skipLineNums) + .limit(limit) + .collect(Collectors.toList()); + } + } + + @Override + public List listStorageEntity(String resourceAbsolutePath) { + resourceAbsolutePath = transformCOSKeyToAbsolutePath(resourceAbsolutePath); + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucketName); + request.setPrefix(resourceAbsolutePath); + request.setDelimiter("/"); + + ObjectListing result = cosClient.listObjects(request); + + return result.getObjectSummaries() + .stream() + .map((COSObjectSummary summary) -> { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(summary.getSize()); + metadata.setLastModified(summary.getLastModified()); + COSObject object = new COSObject(); + object.setObjectMetadata(metadata); + object.setKey(summary.getKey()); + return transformCOSObjectToStorageEntity(object); + }) + .collect(Collectors.toList()); + } + + @Override + public List listFileStorageEntityRecursively(String resourceAbsolutePath) { + resourceAbsolutePath = transformCOSKeyToAbsolutePath(resourceAbsolutePath); + + Set visited = new HashSet<>(); + List storageEntityList = new ArrayList<>(); + LinkedList foldersToFetch = new LinkedList<>(); + foldersToFetch.addLast(resourceAbsolutePath); + + while (!foldersToFetch.isEmpty()) { + String pathToExplore = foldersToFetch.pop(); + visited.add(pathToExplore); + List storageEntities = listStorageEntity(pathToExplore); + for (StorageEntity entity : storageEntities) { + if (entity.isDirectory()) { + if (visited.contains(entity.getFullName())) { + continue; + } + foldersToFetch.add(entity.getFullName()); + } + } + storageEntityList.addAll(storageEntities); + } + + return storageEntityList; + } + + @Override + public StorageEntity getStorageEntity(String resourceAbsolutePath) { + String cosKey = transformCOSKeyToAbsolutePath(resourceAbsolutePath); + + COSObject object = cosClient.getObject(bucketName, cosKey); + return transformCOSObjectToStorageEntity(object); + } + + public void ensureBucketSuccessfullyCreated(String bucketName) { + if (StringUtils.isBlank(bucketName)) { + throw new IllegalArgumentException("resource.tencent.cloud.cos.bucket.name is empty"); + } + + if (!cosClient.doesBucketExist(bucketName)) { + throw new IllegalArgumentException( + "bucketName: " + bucketName + " is not exists, you need to create them by yourself"); + } + + log.info("bucketName: {} has been found", bucketName); + } + + protected StorageEntity transformCOSObjectToStorageEntity(COSObject object) { + ObjectMetadata metadata = object.getObjectMetadata(); + String fileAbsolutePath = transformCOSKeyToAbsolutePath(object.getKey()); + ResourceMetadata resourceMetaData = getResourceMetaData(fileAbsolutePath); + String fileExtension = com.google.common.io.Files.getFileExtension(resourceMetaData.getResourceAbsolutePath()); + + return StorageEntity.builder() + .fileName(new File(fileAbsolutePath).getName()) + .fullName(fileAbsolutePath) + .pfullName(resourceMetaData.getResourceParentAbsolutePath()) + .type(resourceMetaData.getResourceType()) + .isDirectory(StringUtils.isEmpty(fileExtension)) + .size(metadata.getContentLength()) + .createTime(metadata.getLastModified()) + .updateTime(metadata.getLastModified()) + .build(); + } + + private String transformAbsolutePathToCOSKey(String absolutePath) { + ResourceMetadata resourceMetaData = getResourceMetaData(absolutePath); + if (resourceMetaData.isDirectory()) { + return FileUtils.concatFilePath(absolutePath, "/"); + } + return absolutePath; + } + + private String transformCOSKeyToAbsolutePath(String s3Key) { + if (s3Key.endsWith("/")) { + return s3Key.substring(0, s3Key.length() - 1); + } + return s3Key; + } + + private TransferManager getCosTransferManager() { + ExecutorService threadPool = Executors.newFixedThreadPool(TRANSFER_THREAD_POOL_SIZE); + TransferManager transferManager = new TransferManager(cosClient, threadPool); + TransferManagerConfiguration transferManagerConfiguration = new TransferManagerConfiguration(); + transferManagerConfiguration.setMultipartUploadThreshold(MULTIPART_UPLOAD_BYTES_THRESHOLD); + transferManagerConfiguration.setMinimumUploadPartSize(MIN_UPLOAD_PART_BYTES); + transferManager.setConfiguration(transferManagerConfiguration); + return transferManager; + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperatorFactory.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperatorFactory.java new file mode 100644 index 000000000000..2395dd3c997f --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageOperatorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.cos; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperatorFactory; +import org.apache.dolphinscheduler.plugin.storage.api.StorageType; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import com.google.auto.service.AutoService; + +@AutoService(StorageOperatorFactory.class) +public class CosStorageOperatorFactory implements StorageOperatorFactory { + + @Override + public StorageOperator createStorageOperate() { + final CosStorageProperties cosStorageProperties = getCosStorageProperties(); + return new CosStorageOperator(cosStorageProperties); + } + + @Override + public StorageType getStorageOperate() { + return StorageType.COS; + } + + private CosStorageProperties getCosStorageProperties() { + return CosStorageProperties.builder() + .region(PropertyUtils.getString(Constants.TENCENT_CLOUD_COS_REGION)) + .accessKeyId(PropertyUtils.getString(TaskConstants.TENCENT_CLOUD_ACCESS_KEY_ID)) + .accessKeySecret(PropertyUtils.getString(TaskConstants.TENCENT_CLOUD_ACCESS_KEY_SECRET)) + .bucketName(PropertyUtils.getString(Constants.TENCENT_CLOUD_COS_BUCKET_NAME)) + .resourceUploadPath(PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler")) + .build(); + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageProperties.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageProperties.java new file mode 100644 index 000000000000..2a501d84862f --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/main/java/org/apache/dolphinscheduler/plugin/storage/cos/CosStorageProperties.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.cos; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class CosStorageProperties { + + private String region; + + private String accessKeyId; + + private String accessKeySecret; + + private String bucketName; + + private String resourceUploadPath; +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/test/resources/logback.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/test/resources/logback.xml new file mode 100644 index 000000000000..6f211959c590 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-cos/src/test/resources/logback.xml @@ -0,0 +1,21 @@ + + + + + + diff --git a/dolphinscheduler-storage-plugin/pom.xml b/dolphinscheduler-storage-plugin/pom.xml index a6a86bc2a644..001b93efd32b 100644 --- a/dolphinscheduler-storage-plugin/pom.xml +++ b/dolphinscheduler-storage-plugin/pom.xml @@ -36,6 +36,7 @@ dolphinscheduler-storage-gcs dolphinscheduler-storage-abs dolphinscheduler-storage-obs + dolphinscheduler-storage-cos diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 77fd8db7fa5d..f78532679a30 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -389,6 +389,12 @@ private TaskConstants() { public static final String HUAWEI_CLOUD_ACCESS_KEY_ID = "resource.huawei.cloud.access.key.id"; public static final String HUAWEI_CLOUD_ACCESS_KEY_SECRET = "resource.huawei.cloud.access.key.secret"; + /** + * tencent cloud config + */ + public static final String TENCENT_CLOUD_ACCESS_KEY_ID = "resource.tencent.cloud.access.key.id"; + public static final String TENCENT_CLOUD_ACCESS_KEY_SECRET = "resource.tencent.cloud.access.key.secret"; + /** * use for k8s task */ diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index c6780804d4f1..2e731541778d 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -485,6 +485,7 @@ azure-storage-blob-12.21.0.jar azure-storage-internal-avro-12.6.0.jar vertica-jdbc-12.0.4-0.jar esdk-obs-java-bundle-3.23.3.jar +cos_api-5.6.231.jar dyvmsapi20170525-2.1.4.jar alibabacloud-gateway-spi-0.0.1.jar credentials-java-0.2.2.jar