Skip to content

Commit

Permalink
[Feature-16396][storage-plugin] Add Tencent Cloud COS Storage Plugin (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mighten authored Sep 27, 2024
1 parent 4e85302 commit 9024dca
Show file tree
Hide file tree
Showing 32 changed files with 1,695 additions and 118 deletions.
31 changes: 30 additions & 1 deletion docs/docs/en/guide/resource/configuration.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Resource Center Configuration

- You could use `Resource Center` to upload text files and other task-related files.
- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html) etc.
- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html), [Tencent Cloud COS](https://cloud.tencent.com/product/cos), etc.
- You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resource Center` without the need of an external `HDFS` system or `S3`.
- Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resource Center`. In this way, you could operate remote files as if on your local machines.

Expand Down Expand Up @@ -96,3 +96,32 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
> * If you want to use the resource upload function, the deployment user in [installation and deployment](../installation/standalone.md) must have relevant operation authority.
> * If you using a Hadoop cluster with HA, you need to enable HDFS resource upload, and you need to copy the `core-site.xml` and `hdfs-site.xml` under the Hadoop cluster to `worker-server/conf` and `api-server/conf`, otherwise skip this copy step.
## connect COS

if you want to upload resources to `Resource Center` connected to `COS`, you need to configure `api-server/conf/resource-center.yaml` and `worker-server/conf/resource-center.yaml`. You can refer to the following:

config the following fields

```yaml
resource:
# Tencent Cloud Storage (COS) setup, required if you set resource.storage.type=COS
tencent:
cloud:
access:
key:
id: <your-access-key-id>
secret: <your-access-key-secret>
cos:
# predefined region code: https://cloud.tencent.com/document/product/436/6224
region: ap-nanjing
bucket:
name: dolphinscheduler

```

To enable COS, you also need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following:

```properties
resource.storage.type=COS
```

29 changes: 28 additions & 1 deletion docs/docs/zh/guide/resource/configuration.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# 资源中心配置详情

- 资源中心通常用于上传文件以及任务组管理等操作。
- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss)[华为云 OBS](https://support.huaweicloud.com/obs/index.html) 等。
- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss)[华为云 OBS](https://support.huaweicloud.com/obs/index.html)[腾讯云 COS](https://cloud.tencent.com/product/cos) 等。
- 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop``S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。
- 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。

Expand Down Expand Up @@ -90,3 +90,30 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
> * 如果用到资源上传的功能,那么[安装部署](../installation/standalone.md)中,部署用户需要有这部分的操作权限。
> * 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 `core-site.xml``hdfs-site.xml` 复制到 `worker-server/conf` 以及 `api-server/conf`,非 NameNode HA 跳过此步骤。
## 对接腾讯云 COS

如果需要使用到资源中心的 COS 上传资源,我们需要对以下路径的进行配置:`api-server/conf/resource-center.yaml``worker-server/conf/resource-center.yaml`。可参考如下:

```yaml
resource:
# 腾讯云 COS 配置
tencent:
cloud:
access:
key:
id: <your-access-key-id>
secret: <your-access-key-secret>
cos:
# COS 区域代码可参考: https://cloud.tencent.com/document/product/436/6224
region: ap-nanjing
bucket:
name: dolphinscheduler

```

为了激活腾讯云存储 COS,还需要对以下路径的进行配置:`api-server/conf/common.properties``worker-server/conf/common.properties`。可参考如下:

```properties
resource.storage.type=COS
```

17 changes: 17 additions & 0 deletions dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
<protobuf.version>3.17.2</protobuf.version>
<esdk-obs.version>3.23.3</esdk-obs.version>
<qcloud-cos.version>5.6.231</qcloud-cos.version>
<system-lambda.version>1.2.1</system-lambda.version>
<zeppelin-client.version>0.10.1</zeppelin-client.version>
<testcontainer.version>1.19.3</testcontainer.version>
Expand Down Expand Up @@ -927,6 +928,22 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>${qcloud-cos.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions dolphinscheduler-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class Constants {

public static final String REMOTE_LOGGING_YAML_PATH = "/remote-logging.yaml";
public static final String AWS_YAML_PATH = "/aws.yaml";
public static final String RESOURCE_CENTER_YAML_PATH = "/resource-center.yaml";

public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
Expand Down Expand Up @@ -681,6 +682,17 @@ public final class Constants {
public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key";
public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name";

/**
* remote logging for COS
*/
public static final String REMOTE_LOGGING_COS_ACCESS_KEY_ID = "remote.logging.cos.access.key.id";

public static final String REMOTE_LOGGING_COS_ACCESS_KEY_SECRET = "remote.logging.cos.access.key.secret";

public static final String REMOTE_LOGGING_COS_BUCKET_NAME = "remote.logging.cos.bucket.name";

public static final String REMOTE_LOGGING_COS_REGION = "remote.logging.cos.region";

/**
* data quality
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import org.apache.commons.lang3.StringUtils;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.GetObjectRequest;
import com.qcloud.cos.model.PutObjectRequest;
import com.qcloud.cos.region.Region;

@Slf4j
public class CosRemoteLogHandler implements RemoteLogHandler, Closeable {

private final COSClient cosClient;

private final String bucketName;

private static CosRemoteLogHandler instance;

private CosRemoteLogHandler() {
String secretId = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_ACCESS_KEY_ID);
String secretKey = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_ACCESS_KEY_SECRET);
COSCredentials cosCredentials = new BasicCOSCredentials(secretId, secretKey);
String regionName = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_REGION);
ClientConfig clientConfig = new ClientConfig(new Region(regionName));
clientConfig.setHttpProtocol(HttpProtocol.https);
this.cosClient = new COSClient(cosCredentials, clientConfig);
this.bucketName = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_BUCKET_NAME);
checkBucketNameExists(this.bucketName);
}

public static synchronized CosRemoteLogHandler getInstance() {
if (instance == null) {
instance = new CosRemoteLogHandler();
}
return instance;
}

@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("send remote log from {} to tencent cos {}", logPath, objectName);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, new File(logPath));
cosClient.putObject(putObjectRequest);
} catch (Exception e) {
log.error("error while sending remote log from {} to tencent cos {}, reason:", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("get remote log from tencent cos {} to {}", objectName, logPath);
cosClient.getObject(new GetObjectRequest(bucketName, objectName), new File(logPath));
} catch (Exception e) {
log.error("error while sending remote log from {} to tencent cos {}, reason:", objectName, logPath, e);
}
}

@Override
public void close() throws IOException {
if (cosClient != null) {
cosClient.shutdown();
}
}

private void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException(Constants.REMOTE_LOGGING_COS_BUCKET_NAME + " is empty");
}
boolean existsBucket = cosClient.doesBucketExist(bucketName);
if (!existsBucket) {
throw new IllegalArgumentException(
"bucketName: " + bucketName + " does not exists, you should create it first");
}

log.debug("tencent cos bucket {} has been found for remote logging", bucketName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public RemoteLogHandler getRemoteLogHandler() {
return GcsRemoteLogHandler.getInstance();
} else if ("ABS".equals(target)) {
return AbsRemoteLogHandler.getInstance();
} else if ("COS".equals(target)) {
return CosRemoteLogHandler.getInstance();
}

log.error("No suitable remote logging target for {}", target);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.dolphinscheduler.common.constants.Constants.AWS_YAML_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.REMOTE_LOGGING_YAML_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_CENTER_YAML_PATH;

import org.apache.dolphinscheduler.common.config.ImmutablePriorityPropertyDelegate;
import org.apache.dolphinscheduler.common.config.ImmutablePropertyDelegate;
Expand All @@ -43,7 +44,7 @@ public class PropertyUtils {
private final ImmutablePriorityPropertyDelegate propertyDelegate =
new ImmutablePriorityPropertyDelegate(
new ImmutablePropertyDelegate(COMMON_PROPERTIES_PATH),
new ImmutableYamlDelegate(REMOTE_LOGGING_YAML_PATH, AWS_YAML_PATH));
new ImmutableYamlDelegate(REMOTE_LOGGING_YAML_PATH, AWS_YAML_PATH, RESOURCE_CENTER_YAML_PATH));

public static String getString(String key) {
return propertyDelegate.get(key.trim());
Expand Down
70 changes: 1 addition & 69 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,58 +21,15 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js

# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, COS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler

# The Azure client ID (Azure Application (client) ID)
resource.azure.client.id=minioadmin
# The Azure client secret in the Azure application
resource.azure.client.secret=minioadmin
# The Azure data factory subscription ID
resource.azure.subId=minioadmin
# The Azure tenant id in the Azure Active Directory
resource.azure.tenant.id=minioadmin
# The query interval
resource.query.interval=10000

# alibaba cloud access key id, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.secret=<your-access-key-secret>
# alibaba cloud region, required if you set resource.storage.type=OSS
resource.alibaba.cloud.region=cn-hangzhou
# oss bucket name, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com

# the location of the google cloud credential, required if you set resource.storage.type=GCS
resource.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set resource.storage.type=GCS
resource.google.cloud.storage.bucket.name=<your-bucket>

# abs container name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.container.name=<your-container>
# abs account name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>


# huawei cloud access key id, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.id=<your-access-key-id>
# huawei cloud access key secret, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.secret=<your-access-key-secret>
# oss bucket name, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com


# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
Expand Down Expand Up @@ -163,28 +120,3 @@ shell.interceptor.type=bash

# Whether to enable remote logging
remote.logging.enable=false
# if remote.logging.enable = true, set the target of remote logging
remote.logging.target=OSS
# if remote.logging.enable = true, set the log base directory
remote.logging.base.dir=logs
# if remote.logging.enable = true, set the number of threads to send logs to remote storage
remote.logging.thread.pool.size=10
# oss access key id, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.id=<access.key.id>
# oss access key secret, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.secret=<access.key.secret>
# oss bucket name, required if you set remote.logging.target=OSS
remote.logging.oss.bucket.name=<bucket.name>
# oss endpoint, required if you set remote.logging.target=OSS
remote.logging.oss.endpoint=<endpoint>

# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>
# abs account name, required if you set resource.storage.type=ABS
remote.logging.abs.account.name=<your-account-name>
# abs account key, required if you set resource.storage.type=ABS
remote.logging.abs.account.key=<your-account-key>
# abs container name, required if you set resource.storage.type=ABS
remote.logging.abs.container.name=<your-container-name>
Loading

0 comments on commit 9024dca

Please sign in to comment.