Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-16658][RPC] Implement of Netty add ssl #16659

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;

import javax.annotation.PostConstruct;
Expand All @@ -40,7 +41,8 @@
@Slf4j
@Import({CommonConfiguration.class,
DaoConfiguration.class,
RegistryConfiguration.class})
RegistryConfiguration.class,
NettySslConfig.class})
@SpringBootApplication
public class AlertServer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,8 +30,9 @@
@Service
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {

public AlertRpcServer(AlertConfig alertConfig) {
super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build());
public AlertRpcServer(AlertConfig alertConfig, NettySslConfig nettySslConfig) {
super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build(),
nettySslConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ registry:
metrics:
enabled: true

rpc:
ssl:
enabled: false
cert-file-path: /path/cert.crt
key-file-path: /path/private.pem

# Override by profile

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;

import org.junit.jupiter.api.Test;

class AlertRpcServerTest {

private final AlertRpcServer alertRpcServer = new AlertRpcServer(new AlertConfig());
private final AlertRpcServer alertRpcServer = new AlertRpcServer(new AlertConfig(), new NettySslConfig());

@Test
void testStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class ApiApplicationServer {
@Autowired
private PluginDao pluginDao;

@Autowired
NettySslConfig nettySslConfig;

public static void main(String[] args) {
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Expand Down
5 changes: 5 additions & 0 deletions dolphinscheduler-api/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ casdoor:
# Doplhinscheduler login url
redirect-url: ""

rpc:
ssl:
enabled: false
cert-file-path: /path/cert.crt
key-file-path: /path/private.pem

# Override by profile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
Expand Down Expand Up @@ -103,7 +104,8 @@ public void setUp() {
}

springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(
NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build());
NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build(),
new NettySslConfig());
springServerMethodInvokerDiscovery.start();
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new ILogService() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class Clients {
private static final JdkDynamicRpcClientProxyFactory jdkDynamicRpcClientProxyFactory =
new JdkDynamicRpcClientProxyFactory(
NettyRemotingClientFactory.buildNettyRemotingClient(
new NettyClientConfig()));
new NettyClientConfig(), null));

public static <T> JdkDynamicRpcClientProxyBuilder<T> withService(Class<T> serviceClazz) {
return new JdkDynamicRpcClientProxyBuilder<>(serviceClazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
Expand All @@ -33,6 +34,7 @@
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.io.File;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +43,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import javax.net.ssl.SSLException;

import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -52,6 +56,8 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.IdleStateHandler;

@Slf4j
Expand All @@ -67,11 +73,22 @@ public class NettyRemotingClient implements AutoCloseable {
private final EventLoopGroup workerGroup;

private final NettyClientConfig clientConfig;
private final NettySslConfig nettySslConfig;
private SslContext sslContext = null;

private final NettyClientHandler clientHandler;

public NettyRemotingClient(final NettyClientConfig clientConfig) {
public NettyRemotingClient(final NettyClientConfig clientConfig, final NettySslConfig nettySslConfig) {
this.clientConfig = clientConfig;
this.nettySslConfig = nettySslConfig;
if (nettySslConfig.isEnabled()) {
try {
sslContext =
SslContextBuilder.forClient().trustManager(new File(nettySslConfig.getCertFilePath())).build();
} catch (SSLException e) {
throw new IllegalArgumentException("Initialize SslContext error, please check the cert-file", e);
}
}
ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
if (Epoll.isAvailable()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
Expand All @@ -97,6 +114,9 @@ private void start() {

@Override
public void initChannel(SocketChannel ch) {
if (nettySslConfig.isEnabled()) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("client-idle-handler",
new IdleStateHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.extract.base.client;

import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,7 +27,8 @@
@Slf4j
public class NettyRemotingClientFactory {

public NettyRemotingClient buildNettyRemotingClient(NettyClientConfig nettyClientConfig) {
return new NettyRemotingClient(nettyClientConfig);
public NettyRemotingClient buildNettyRemotingClient(NettyClientConfig nettyClientConfig,
NettySslConfig nettySslConfig) {
return new NettyRemotingClient(nettyClientConfig, nettySslConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base.config;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "rpc.ssl")
@Data
public class NettySslConfig {

public boolean enabled;

public String certFilePath;

public String keyFilePath;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLException;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
Expand All @@ -40,6 +44,8 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.IdleStateHandler;

/**
Expand All @@ -66,7 +72,20 @@ class NettyRemotingServer {

private final AtomicBoolean isStarted = new AtomicBoolean(false);

NettyRemotingServer(final NettyServerConfig serverConfig) {
private SslContext sslContext = null;

private NettySslConfig nettySslConfig;

public NettyRemotingServer(final NettyServerConfig serverConfig, final NettySslConfig nettySslConfig) {
this.nettySslConfig = nettySslConfig;
if (nettySslConfig.isEnabled()) {
try {
sslContext = SslContextBuilder.forServer(new File(nettySslConfig.getCertFilePath()),
new File(nettySslConfig.getKeyFilePath())).build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}
Comment on lines +79 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NettySslConfig should as a field in NettyServerConfig and NettyClientConfig.

this.serverConfig = serverConfig;
this.serverName = serverConfig.getServerName();
this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
Expand Down Expand Up @@ -130,6 +149,10 @@ protected void initChannel(SocketChannel ch) {
* @param ch socket channel
*/
private void initNettyChannel(SocketChannel ch) {
if (nettySslConfig.isEnabled()) {
ch.pipeline().addLast("ssl", sslContext.newHandler(ch.alloc()));

}
ch.pipeline()
.addLast("encoder", new TransporterEncoder())
.addLast("decoder", new TransporterDecoder())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.dolphinscheduler.extract.base.server;

import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;

import lombok.experimental.UtilityClass;

@UtilityClass
class NettyRemotingServerFactory {

NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig) {
return new NettyRemotingServer(nettyServerConfig);
public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig nettyServerConfig,
NettySslConfig nettySslConfig) {
return new NettyRemotingServer(nettyServerConfig, nettySslConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;

import java.lang.reflect.Method;

Expand All @@ -39,8 +40,9 @@ public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {

private final NettyRemotingServer nettyRemotingServer;

public RpcServer(NettyServerConfig nettyServerConfig) {
this.nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
public RpcServer(NettyServerConfig nettyServerConfig, NettySslConfig nettySslConfig) {
this.nettyRemotingServer =
NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig, nettySslConfig);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.extract.base.server;

import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -31,8 +32,8 @@
@Slf4j
public class SpringServerMethodInvokerDiscovery extends RpcServer implements BeanPostProcessor {

public SpringServerMethodInvokerDiscovery(NettyServerConfig nettyServerConfig) {
super(nettyServerConfig);
public SpringServerMethodInvokerDiscovery(NettyServerConfig nettyServerConfig, NettySslConfig nettySslConfig) {
super(nettyServerConfig, nettySslConfig);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.config.NettySslConfig;
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;

Expand All @@ -48,7 +49,8 @@ public void setUp() {
.listenPort(listenPort)
.build();
serverAddress = "localhost:" + listenPort;
springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyServerConfig);
springServerMethodInvokerDiscovery =
new SpringServerMethodInvokerDiscovery(nettyServerConfig, getNettySslConfig());
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new IServiceImpl());
springServerMethodInvokerDiscovery.start();
}
Expand Down Expand Up @@ -111,5 +113,9 @@ public void voidMethod() {
System.out.println("void method");
}
}

private NettySslConfig getNettySslConfig() {
NettySslConfig info = new NettySslConfig();
info.setEnabled(false);
return info;
}
}
Loading
Loading