Skip to content

Commit

Permalink
Add interceptor support for GRPC APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
sgayangi committed Oct 10, 2024
1 parent c09a29e commit 84e1c4f
Showing 1 changed file with 71 additions and 4 deletions.
75 changes: 71 additions & 4 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
// handle gRPC APIs
if len(prodRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == constants.GRPC {
if apiState.ProdGRPCRoute, err = apiReconciler.resolveGRPCRouteRefs(ctx, prodRouteRefs,
namespace, api); err != nil {
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving production grpcRouteref %s in namespace :%s was not found. %s",
prodRouteRefs, namespace, err.Error())
}
Expand All @@ -483,7 +483,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1

if len(sandRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == constants.GRPC {
if apiState.SandGRPCRoute, err = apiReconciler.resolveGRPCRouteRefs(ctx, sandRouteRefs,
namespace, api); err != nil {
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving sandbox grpcRouteref %s in namespace :%s was not found. %s",
sandRouteRefs, namespace, err.Error())
}
Expand Down Expand Up @@ -603,11 +603,15 @@ func (apiReconciler *APIReconciler) resolveHTTPRouteRefs(ctx context.Context, ht
}

func (apiReconciler *APIReconciler) resolveGRPCRouteRefs(ctx context.Context, grpcRouteRefs []string,
namespace string, api dpv1alpha3.API) (*synchronizer.GRPCRouteState, error) {
namespace string, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService, api dpv1alpha3.API) (*synchronizer.GRPCRouteState, error) {
grpcRouteState, err := apiReconciler.concatGRPCRoutes(ctx, grpcRouteRefs, namespace, api)
if err != nil {
return nil, err
}
grpcRouteState.BackendMapping, err = apiReconciler.getResolvedBackendsMappingForGRPC(ctx, &grpcRouteState, interceptorServiceMapping, api)
if err != nil {
return nil, err
}
grpcRouteState.Scopes, err = apiReconciler.getScopesForGRPCRoute(ctx, grpcRouteState.GRPCRouteCombined, api)
return &grpcRouteState, err
}
Expand All @@ -633,7 +637,6 @@ func (apiReconciler *APIReconciler) concatGRPCRoutes(ctx context.Context, grpcRo
}
grpcRouteState.GRPCRoutePartitions = grpcRoutePartitions
backendNamespacedName := types.NamespacedName{
//TODO check if this is correct
Name: string(grpcRouteState.GRPCRouteCombined.Spec.Rules[0].BackendRefs[0].BackendRef.Name),
Namespace: namespace,
}
Expand Down Expand Up @@ -1065,6 +1068,70 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
return backendMapping, airl, nil
}

func (apiReconciler *APIReconciler) getResolvedBackendsMappingForGRPC(ctx context.Context,
grpcRouteState *synchronizer.GRPCRouteState, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, error) {
backendMapping := make(map[string]*dpv1alpha2.ResolvedBackend)
grpcRoute := grpcRouteState.GRPCRouteCombined

for _, rule := range grpcRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
backendNamespacedName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, grpcRoute.Namespace),
}
if _, exists := backendMapping[backendNamespacedName.String()]; !exists {
resolvedBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, backendNamespacedName, &api)
if resolvedBackend != nil {
backendMapping[backendNamespacedName.String()] = resolvedBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
}
}
}

for _, filter := range rule.Filters {
if filter.RequestMirror != nil {
mirrorBackend := filter.RequestMirror.BackendRef
mirrorBackendNamespacedName := types.NamespacedName{
Name: string(mirrorBackend.Name),
Namespace: utils.GetNamespace(mirrorBackend.Namespace, grpcRoute.Namespace),
}
if string(*mirrorBackend.Kind) == constants.KindBackend {
if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists {
resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api)
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
}
}
} else if string(*mirrorBackend.Kind) == constants.KindService {
var err error
service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, grpcRoute.Namespace), string(mirrorBackend.Name))
if err != nil {
return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
}
backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port))
if err != nil {
return nil, fmt.Errorf("error in getting service information %s", service)
}
}
}
}
}

// Resolve backends in InterceptorServices
interceptorServices := maps.Values(interceptorServiceMapping)
for _, interceptorService := range interceptorServices {
utils.ResolveAndAddBackendToMapping(ctx, apiReconciler.client, backendMapping,
interceptorService.Spec.BackendRef, interceptorService.Namespace, &api)
}

loggers.LoggerAPKOperator.Debugf("Generated backendMapping: %v", backendMapping)
return backendMapping, nil
}

// These proxy methods are designed as intermediaries for the getAPIsFor<CR objects> methods.
// Their purpose is to encapsulate the process of updating owner references within the reconciliation watch methods.
// By employing these proxies, we prevent redundant owner reference updates for the same object due to the hierarchical structure of these functions.
Expand Down

0 comments on commit 84e1c4f

Please sign in to comment.