From ebc48f7996e8e4f604c84e840ab79fa36c613e00 Mon Sep 17 00:00:00 2001 From: v_dylanxu <136539068@qq.com> Date: Mon, 26 Oct 2020 17:38:36 +0800 Subject: [PATCH 1/2] Add low version compatibility --- .../federatedml/PipelineModelProcessor.java | 2 +- .../proxy/rpc/grpc/ProxyRequestHandler.java | 5 ++- .../proxy/rpc/router/ZkServingRouter.java | 4 +-- .../proxy/utils/FederatedModelUtils.java | 34 +++++++++++++++---- .../grpc/service/HostInferenceService.java | 2 +- .../interceptors/HostModelInterceptor.java | 2 +- .../interceptors/HostParamInterceptor.java | 2 +- 7 files changed, 36 insertions(+), 15 deletions(-) diff --git a/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/PipelineModelProcessor.java b/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/PipelineModelProcessor.java index 70214a83..02985a4a 100644 --- a/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/PipelineModelProcessor.java +++ b/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/PipelineModelProcessor.java @@ -342,7 +342,7 @@ public Map singleLocalPredict(Context context, Map responseObs public InboundPackage buildInboundPackage(Context context, Proxy.Packet req) { context.setCaseId(Long.toString(System.currentTimeMillis())); - context.setVersion(req.getAuth().getVersion()); - if (StringUtils.isEmpty(context.getVersion())) { - context.setVersion(Dict.DEFAULT_VERSION); + if (StringUtils.isNotBlank(req.getHeader().getOperator())) { + context.setVersion(req.getHeader().getOperator()); } context.setGuestAppId(req.getHeader().getSrc().getPartyId()); context.setHostAppid(req.getHeader().getDst().getPartyId()); diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java index b19cb6c7..901ab4ca 100644 --- a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java @@ -91,9 +91,9 @@ private String getEnvironment(Context context, InboundPackage inboundPackage) { return null; } else { Proxy.Packet sourcePacket = (Proxy.Packet) inboundPackage.getBody(); - if (MetaInfo.PROPERTY_COORDINATOR.equals(sourcePacket.getHeader().getDst().getPartyId())) { + if (String.valueOf(MetaInfo.PROPERTY_COORDINATOR).equals(sourcePacket.getHeader().getDst().getPartyId())) { // host, proxy -> serving - return FederatedModelUtils.getModelRouteKey(sourcePacket); + return FederatedModelUtils.getModelRouteKey(context, sourcePacket); } else { // exchange, proxy -> proxy // return Dict.ONLINE_ENVIRONMENT; diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/utils/FederatedModelUtils.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/utils/FederatedModelUtils.java index 42de5885..3b571b7d 100644 --- a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/utils/FederatedModelUtils.java +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/utils/FederatedModelUtils.java @@ -17,26 +17,48 @@ package com.webank.ai.fate.serving.proxy.utils; import com.webank.ai.fate.api.networking.proxy.Proxy; +import com.webank.ai.fate.serving.core.bean.Context; import com.webank.ai.fate.serving.core.bean.EncryptMethod; import com.webank.ai.fate.serving.core.utils.EncryptUtils; +import com.webank.ai.fate.serving.core.utils.JsonUtil; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.Map; public class FederatedModelUtils { + private static final Logger logger = LoggerFactory.getLogger(FederatedModelUtils.class); + private static final String MODEL_KEY_SEPARATOR = "&"; public static String genModelKey(String tableName, String namespace) { return StringUtils.join(Arrays.asList(tableName, namespace), MODEL_KEY_SEPARATOR); } - public static String getModelRouteKey(Proxy.Packet packet) { - String data = packet.getBody().getValue().toStringUtf8(); - Proxy.Model model = packet.getHeader().getTask().getModel(); - String key = genModelKey(model.getTableName(), model.getNamespace()); - String md5Key = EncryptUtils.encrypt(key, EncryptMethod.MD5); - return md5Key; + public static String getModelRouteKey(Context context, Proxy.Packet packet) { + String namespace; + String tableName; + if (StringUtils.isBlank(context.getVersion()) || Double.parseDouble(context.getVersion()) < 200) { + // version 1.x + String data = packet.getBody().getValue().toStringUtf8(); + Map hostFederatedParams = JsonUtil.json2Object(data, Map.class); + Map partnerModelInfo = (Map) hostFederatedParams.get("partnerModelInfo"); + namespace = partnerModelInfo.get("namespace").toString(); + tableName = partnerModelInfo.get("name").toString(); + } else { + // version 2.0.0+ + Proxy.Model model = packet.getHeader().getTask().getModel(); + namespace = model.getNamespace(); + tableName = model.getTableName(); + } + + String key = genModelKey(tableName, namespace); + logger.info("get model route key by version: {} namespace: {} tablename: {}, key: {}", context.getVersion(), namespace, tableName, key); + + return EncryptUtils.encrypt(key, EncryptMethod.MD5); } } diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/grpc/service/HostInferenceService.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/grpc/service/HostInferenceService.java index 2b5269cf..d3f68db0 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/grpc/service/HostInferenceService.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/grpc/service/HostInferenceService.java @@ -59,7 +59,7 @@ public void unaryCall(Proxy.Packet req, StreamObserver responseObs String tableName = req.getHeader().getTask().getModel().getTableName(); context.setActionType(actionType); context.setVersion(req.getHeader().getOperator()); - if (StringUtils.isBlank(context.getVersion()) || Long.parseLong(context.getVersion()) < 200) { + if (StringUtils.isBlank(context.getVersion()) || Double.parseDouble(context.getVersion()) < 200) { // 1.x Map hostFederatedParams = JsonUtil.json2Object(req.getBody().getValue().toStringUtf8(), Map.class); Map partnerModelInfo = (Map) hostFederatedParams.get("partnerModelInfo"); diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostModelInterceptor.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostModelInterceptor.java index 8e490c41..59387951 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostModelInterceptor.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostModelInterceptor.java @@ -49,7 +49,7 @@ public void doPreProcess(Context context, InboundPackage inboundPackage, Outboun String tableName = servingServerContext.getModelTableName(); String nameSpace = servingServerContext.getModelNamesapce(); Model model; - if (StringUtils.isBlank(context.getVersion()) || Long.parseLong(context.getVersion()) < 200) { + if (StringUtils.isBlank(context.getVersion()) || Double.parseDouble(context.getVersion()) < 200) { model = modelManager.getPartnerModel(tableName, nameSpace); } else { model = modelManager.getModelByTableNameAndNamespace(tableName, nameSpace); diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostParamInterceptor.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostParamInterceptor.java index a5901dd9..417755c2 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostParamInterceptor.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/host/interceptors/HostParamInterceptor.java @@ -45,7 +45,7 @@ public void doPreProcess(Context context, InboundPackage inboundPackage, Outboun inboundPackage.setBody(params); } else { InferenceRequest inferenceRequest = JsonUtil.json2Object(reqBody, InferenceRequest.class); - if (StringUtils.isBlank(context.getVersion()) || Long.parseLong(context.getVersion()) < 200) { + if (StringUtils.isBlank(context.getVersion()) || Double.parseDouble(context.getVersion()) < 200) { Map hostParams = JsonUtil.json2Object(reqBody, Map.class); Preconditions.checkArgument(hostParams != null, "parse inference params error"); Preconditions.checkArgument(hostParams.get("featureIdMap") != null, "parse inference params featureIdMap error"); From 249744a895e05a3515f1c8a3678502888866870e Mon Sep 17 00:00:00 2001 From: v_dylanxu <136539068@qq.com> Date: Mon, 26 Oct 2020 17:47:09 +0800 Subject: [PATCH 2/2] simplify routing --- .../proxy/rpc/router/ZkServingRouter.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java index 901ab4ca..79b6c5e1 100644 --- a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/rpc/router/ZkServingRouter.java @@ -84,22 +84,15 @@ private String getEnvironment(Context context, InboundPackage inboundPackage) { // guest, proxy -> serving return (String) inboundPackage.getHead().get(Dict.SERVICE_ID); } - // default unaryCall - if (GrpcType.INTRA_GRPC == context.getGrpcType()) { - // guest, serving -> proxy -// return Dict.ONLINE_ENVIRONMENT; - return null; - } else { + + if (Dict.UNARYCALL.equals(context.getServiceName()) && context.getGrpcType() == GrpcType.INTER_GRPC) { + // host, proxy -> serving Proxy.Packet sourcePacket = (Proxy.Packet) inboundPackage.getBody(); - if (String.valueOf(MetaInfo.PROPERTY_COORDINATOR).equals(sourcePacket.getHeader().getDst().getPartyId())) { - // host, proxy -> serving - return FederatedModelUtils.getModelRouteKey(context, sourcePacket); - } else { - // exchange, proxy -> proxy -// return Dict.ONLINE_ENVIRONMENT; - return null; - } + return FederatedModelUtils.getModelRouteKey(context, sourcePacket); } + + // default unaryCall proxy -> proxy + return null; } @Override