Skip to content

Commit

Permalink
Add RPC metrics (#16121)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Jun 11, 2024
1 parent d6714bb commit fd5a182
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 40 deletions.
5 changes: 5 additions & 0 deletions docs/docs/en/guide/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
- stop: the number of stopped workflow instances
- failover: the number of workflow instance fail-overs

### RPC Related Metrics

- ds.rpc.client.sync.request.exception.count: (counter) the number of exceptions occurred in sync rpc requests
- ds.rpc.client.sync.request.duration.time: (histogram) the time cost of sync rpc requests

### Master Server Metrics

- ds.master.overload.count: (counter) the number of times the master overloaded
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/zh/guide/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
- stop:停止的工作流实例数量
- failover:容错的工作流实例数量

### RPC相关指标

- ds.rpc.client.sync.request.exception.count: (counter) 同步rpc请求异常数
- ds.rpc.client.sync.request.duration.time: (histogram) 同步rpc请求耗时

### Master Server指标

- ds.master.overload.count: (counter) master过载次数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@
@Documented
public @interface RpcMethod {

long timeout() default 3000L;
long timeout() default -1;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base;

import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.utils.Host;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SyncRequestDto {

private Host serverHost;
private Transporter transporter;
private long timeoutMillis;

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
.writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
if (log.isDebugEnabled()) {
log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(ctx.channel()));
log.info("Client send heartbeat to: {}", ctx.channel().remoteAddress());
}
} else {
super.userEventTriggered(ctx, evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
import org.apache.dolphinscheduler.extract.base.metrics.RpcMetrics;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

Expand Down Expand Up @@ -97,8 +100,8 @@ public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("client-idle-handler",
new IdleStateHandler(
Constants.NETTY_CLIENT_HEART_BEAT_TIME,
0,
clientConfig.getHeartBeatIntervalMillis(),
0,
TimeUnit.MILLISECONDS))
.addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder());
Expand All @@ -107,38 +110,60 @@ public void initChannel(SocketChannel ch) {
isStarted.compareAndSet(false, true);
}

public IRpcResponse sendSync(final Host host,
final Transporter transporter,
final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws RemotingException {
long start = System.currentTimeMillis();

final Host host = syncRequestDto.getServerHost();
final Transporter transporter = syncRequestDto.getTransporter();
final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? clientConfig.getConnectTimeoutMillis()
: syncRequestDto.getTimeoutMillis();
final long opaque = transporter.getHeader().getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);

try {
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
});
/*
* sync wait for result
*/
IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
});
/*
* sync wait for result
*/
IRpcResponse iRpcResponse = responseFuture.waitResponse();
if (iRpcResponse == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return iRpcResponse;
} catch (Exception ex) {
ClientSyncExceptionMetrics clientSyncExceptionMetrics = ClientSyncExceptionMetrics
.of(syncRequestDto)
.withThrowable(ex);
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
if (ex instanceof RemotingException) {
throw (RemotingException) ex;
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
throw new RemotingException(ex);
}
} finally {
ClientSyncDurationMetrics clientSyncDurationMetrics = ClientSyncDurationMetrics
.of(syncRequestDto)
.withMilliseconds(System.currentTimeMillis() - start);
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
}
return iRpcResponse;
}

Channel getOrCreateChannel(Host host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
Expand All @@ -41,8 +42,12 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
transporter.setHeader(TransporterHeader.of(methodIdentifier));

IRpcResponse iRpcResponse =
nettyRemotingClient.sendSync(serverHost, transporter, sync.timeout());
SyncRequestDto syncRequestDto = SyncRequestDto.builder()
.timeoutMillis(sync.timeout())
.transporter(transporter)
.serverHost(serverHost)
.build();
IRpcResponse iRpcResponse = nettyRemotingClient.sendSync(syncRequestDto);
if (!iRpcResponse.isSuccess()) {
throw MethodInvocationException.of(iRpcResponse.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.extract.base.config;

import java.time.Duration;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -64,4 +66,14 @@ public class NettyClientConfig {
@Builder.Default
private int connectTimeoutMillis = 3000;

/**
* Will send {@link org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter} to netty server every
* heartBeatIntervalMillis, used to keep the {@link io.netty.channel.Channel} active.
*/
@Builder.Default
private long heartBeatIntervalMillis = Duration.ofSeconds(10).toMillis();

@Builder.Default
private int defaultRpcTimeoutMillis = 10_000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.extract.base.config;

import java.time.Duration;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -66,6 +68,12 @@ public class NettyServerConfig {
@Builder.Default
private int workerThread = Runtime.getRuntime().availableProcessors() * 2;

/**
* If done's receive any data from a {@link io.netty.channel.Channel} during 180s then will close it.
*/
@Builder.Default
private long connectionIdleTime = Duration.ofSeconds(60).toMillis();

/**
* listen port
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base.metrics;

import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClientSyncDurationMetrics {

private Transporter transporter;

private long milliseconds;

@Builder.Default
private String clientHost = NetUtils.getHost();

private String serverHost;

public static ClientSyncDurationMetrics of(SyncRequestDto syncRequestDto) {
return ClientSyncDurationMetrics.builder()
.transporter(syncRequestDto.getTransporter())
.serverHost(syncRequestDto.getServerHost().getIp())
.build();
}

public ClientSyncDurationMetrics withMilliseconds(long milliseconds) {
this.milliseconds = milliseconds;
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base.metrics;

import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClientSyncExceptionMetrics {

private Transporter transporter;

private String clientHost;

@Builder.Default
private String serverHost = NetUtils.getHost();

private Throwable throwable;

public static ClientSyncExceptionMetrics of(SyncRequestDto syncRequestDto) {
return ClientSyncExceptionMetrics.builder()
.transporter(syncRequestDto.getTransporter())
.build();

}

public ClientSyncExceptionMetrics withThrowable(Throwable throwable) {
this.throwable = throwable;
return this;
}

}
Loading

0 comments on commit fd5a182

Please sign in to comment.