Skip to content

Commit

Permalink
AV-214603 Changes for accepting Gateway with some valid and some inva…
Browse files Browse the repository at this point in the history
…lid listener
  • Loading branch information
pkoshtavmware committed Oct 8, 2024
1 parent 1c442df commit aa1cb49
Show file tree
Hide file tree
Showing 12 changed files with 1,305 additions and 44 deletions.
2 changes: 1 addition & 1 deletion ako-gateway-api/k8s/ako_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (c *GatewayController) FullSyncK8s(sync bool) error {
resVer := meta.GetResourceVersion()
objects.SharedResourceVerInstanceLister().Save(key, resVer)
}
if IsValidGateway(key, gatewayObj) {
if valid, _ := IsValidGateway(key, gatewayObj); valid {
filteredGateways = append(filteredGateways, gatewayObj)
}
}
Expand Down
55 changes: 53 additions & 2 deletions ako-gateway-api/k8s/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package k8s
import (
"fmt"
"reflect"
"sort"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -500,13 +503,23 @@ func (c *GatewayController) SetupGatewayApiEventHandlers(numWorkers uint32) {
utils.AviLog.Debugf("key: %s, msg: same resource version returning", key)
return
}
if !IsValidGateway(key, gw) {
valid, allowedRoutesAll := IsValidGateway(key, gw)
if !valid {
return
}
listRoutes, err := validateReferredHTTPRoute(key, allowedRoutesAll, gw)
if err != nil {
utils.AviLog.Errorf("Validation of Referred HTTPRoutes Failed due to error : %s", err.Error())
}
namespace, _, _ := cache.SplitMetaNamespaceKey(utils.ObjKey(gw))
bkt := utils.Bkt(namespace, numWorkers)
c.workqueue[bkt].AddRateLimited(key)
utils.AviLog.Debugf("key: %s, msg: ADD", key)
for _, route := range listRoutes {
key := lib.HTTPRoute + "/" + utils.ObjKey(route)
c.workqueue[bkt].AddRateLimited(key)
utils.AviLog.Debugf("key: %s, msg: UPDATE", key)
}
},
DeleteFunc: func(obj interface{}) {
if c.DisableSync {
Expand Down Expand Up @@ -542,13 +555,23 @@ func (c *GatewayController) SetupGatewayApiEventHandlers(numWorkers uint32) {
gw := obj.(*gatewayv1.Gateway)
if IsGatewayUpdated(oldGw, gw) {
key := lib.Gateway + "/" + utils.ObjKey(gw)
if !IsValidGateway(key, gw) {
valid, allowedRoutesAll := IsValidGateway(key, gw)
if !valid {
return
}
listRoutes, err := validateReferredHTTPRoute(key, allowedRoutesAll, gw)
if err != nil {
utils.AviLog.Errorf("Validation of Referred HTTPRoutes Failed due to error : %s", err.Error())
}
namespace, _, _ := cache.SplitMetaNamespaceKey(utils.ObjKey(gw))
bkt := utils.Bkt(namespace, numWorkers)
c.workqueue[bkt].AddRateLimited(key)
utils.AviLog.Debugf("key: %s, msg: UPDATE", key)
for _, route := range listRoutes {
key := lib.HTTPRoute + "/" + utils.ObjKey(route)
c.workqueue[bkt].AddRateLimited(key)
utils.AviLog.Debugf("key: %s, msg: UPDATE", key)
}
}
},
}
Expand Down Expand Up @@ -714,3 +737,31 @@ func validateAviConfigMap(obj interface{}) (*corev1.ConfigMap, bool) {
}
return nil, false
}
func validateReferredHTTPRoute(key string, allowedRoutesAll bool, gateway *gatewayv1.Gateway) ([]*gatewayv1.HTTPRoute, error) {
namespace := gateway.Namespace
if allowedRoutesAll {
namespace = metav1.NamespaceAll
}
hrObjs, err := akogatewayapilib.AKOControlConfig().GatewayApiInformers().HTTPRouteInformer.Lister().HTTPRoutes(namespace).List(labels.Set(nil).AsSelector())
httpRoutes := make([]*gatewayv1.HTTPRoute, 0)
if err != nil {
return nil, err
}
for _, httpRoute := range hrObjs {
for _, parentRef := range httpRoute.Spec.ParentRefs {
if parentRef.Name == gatewayv1.ObjectName(gateway.Name) {
if IsHTTPRouteConfigValid(key, httpRoute) {
httpRoutes = append(httpRoutes, httpRoute)
}
break
}
}
}
sort.Slice(httpRoutes, func(i, j int) bool {
if httpRoutes[i].GetCreationTimestamp().Unix() == httpRoutes[j].GetCreationTimestamp().Unix() {
return httpRoutes[i].Namespace+"/"+httpRoutes[i].Name < httpRoutes[j].Namespace+"/"+httpRoutes[j].Name
}
return httpRoutes[i].GetCreationTimestamp().Unix() < httpRoutes[j].GetCreationTimestamp().Unix()
})
return httpRoutes, nil
}
32 changes: 17 additions & 15 deletions ako-gateway-api/k8s/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func IsGatewayClassValid(key string, gatewayClass *gatewayv1.GatewayClass) bool
return true
}

func IsValidGateway(key string, gateway *gatewayv1.Gateway) bool {
func IsValidGateway(key string, gateway *gatewayv1.Gateway) (bool, bool) {
spec := gateway.Spec

allowedRoutesAll := false
defaultCondition := akogatewayapistatus.NewCondition().
Type(string(gatewayv1.GatewayConditionAccepted)).
Reason(string(gatewayv1.GatewayReasonInvalid)).
Expand All @@ -76,7 +76,7 @@ func IsValidGateway(key string, gateway *gatewayv1.Gateway) bool {
programmedCondition.
SetIn(&gatewayStatus.Conditions)
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
return false
return false, allowedRoutesAll
}

// has 1 or none addresses
Expand All @@ -89,7 +89,7 @@ func IsValidGateway(key string, gateway *gatewayv1.Gateway) bool {
Reason(string(gatewayv1.GatewayReasonAddressNotUsable)).
SetIn(&gatewayStatus.Conditions)
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
return false
return false, allowedRoutesAll
}

if len(spec.Addresses) == 1 && *spec.Addresses[0].Type != "IPAddress" {
Expand All @@ -102,14 +102,21 @@ func IsValidGateway(key string, gateway *gatewayv1.Gateway) bool {
Reason(string(gatewayv1.GatewayReasonAddressNotUsable)).
SetIn(&gatewayStatus.Conditions)
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
return false
return false, allowedRoutesAll
}

gatewayStatus.Listeners = make([]gatewayv1.ListenerStatus, len(gateway.Spec.Listeners))

var validListenerCount int
for index := range spec.Listeners {
if isValidListener(key, gateway, gatewayStatus, index) {
if !allowedRoutesAll {
if spec.Listeners[index].AllowedRoutes != nil && spec.Listeners[index].AllowedRoutes.Namespaces != nil && spec.Listeners[index].AllowedRoutes.Namespaces.From != nil {
if string(*spec.Listeners[index].AllowedRoutes.Namespaces.From) == akogatewayapilib.AllowedRoutesNamespaceFromAll {
allowedRoutesAll = true
}
}
}
validListenerCount++
}
}
Expand All @@ -124,31 +131,26 @@ func IsValidGateway(key string, gateway *gatewayv1.Gateway) bool {
programmedCondition.
SetIn(&gatewayStatus.Conditions)
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
gateway.Status = *gatewayStatus.DeepCopy()
return false
return false, allowedRoutesAll
} else if validListenerCount < len(spec.Listeners) {
defaultCondition.
Reason(string(gatewayv1.GatewayReasonListenersNotValid)).
Status(metav1.ConditionTrue).
Message("Gateway contains atleast one valid listener").
SetIn(&gatewayStatus.Conditions)
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
utils.AviLog.Infof("key: %s, msg: Gateway %s contains atleast one valid listener", key, gateway.Name)
return false
return false, allowedRoutesAll
}

defaultCondition.
Reason(string(gatewayv1.GatewayReasonAccepted)).
Status(metav1.ConditionTrue).
Message("Gateway configuration is valid").
SetIn(&gatewayStatus.Conditions)
_, err := akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
akogatewayapistatus.Record(key, gateway, &status.Status{GatewayStatus: gatewayStatus})
utils.AviLog.Infof("key: %s, msg: Gateway %s is valid", key, gateway.Name)
if err == nil {
return true
} else {
utils.AviLog.Errorf("key: %s, msg: Gateway status patch was not successful :%s ", key, err.Error())
return false
}
return true, allowedRoutesAll
}

func isValidListener(key string, gateway *gatewayv1.Gateway, gatewayStatus *gatewayv1.GatewayStatus, index int) bool {
Expand Down
26 changes: 21 additions & 5 deletions ako-gateway-api/nodes/gateway_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

akogatewayapilib "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/lib"
akogatewayapiobjects "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/objects"
"github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/internal/lib"
"github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/internal/nodes"
"github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/pkg/utils"
Expand Down Expand Up @@ -77,8 +78,11 @@ func (o *AviObjectGraph) BuildGatewayParent(gateway *gatewayv1.Gateway, key stri

func BuildPortProtocols(gateway *gatewayv1.Gateway, key string) []nodes.AviPortHostProtocol {
var portProtocols []nodes.AviPortHostProtocol
for _, listener := range gateway.Spec.Listeners {

gwStatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gateway.Namespace + "/" + gateway.Name)
for i, listener := range gateway.Spec.Listeners {
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status == "False" {
continue
}
pp := nodes.AviPortHostProtocol{Port: int32(listener.Port), Protocol: string(listener.Protocol)}
//TLS config on listener is present
if listener.TLS != nil && len(listener.TLS.CertificateRefs) > 0 {
Expand All @@ -96,7 +100,11 @@ func BuildTLSNodesForGateway(gateway *gatewayv1.Gateway, key string) []*nodes.Av
var tlsNodes []*nodes.AviTLSKeyCertNode
var ns, name string
cs := utils.GetInformers().ClientSet
for _, listener := range gateway.Spec.Listeners {
gwStatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gateway.Namespace + "/" + gateway.Name)
for i, listener := range gateway.Spec.Listeners {
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status == "False" {
continue
}
if listener.TLS != nil {
for _, certRef := range listener.TLS.CertificateRefs {
//kind is validated at ingestion
Expand Down Expand Up @@ -161,7 +169,11 @@ func DeleteTLSNode(key string, object *AviObjectGraph, gateway *gatewayv1.Gatewa
var tlsNodes []*nodes.AviTLSKeyCertNode
_, _, secretName := lib.ExtractTypeNameNamespace(key)
evhVsCertRefs := object.GetAviEvhVS()[0].SSLKeyCertRefs
for _, listener := range gateway.Spec.Listeners {
gwStatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gateway.Namespace + "/" + gateway.Name)
for i, listener := range gateway.Spec.Listeners {
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status == "False" {
continue
}
if listener.TLS != nil {
for _, certRef := range listener.TLS.CertificateRefs {
name := string(certRef.Name)
Expand All @@ -188,7 +200,11 @@ func AddTLSNode(key string, object *AviObjectGraph, gateway *gatewayv1.Gateway,
var tlsNodes []*nodes.AviTLSKeyCertNode
_, _, secretName := lib.ExtractTypeNameNamespace(key)
evhVsCertRefs := object.GetAviEvhVS()[0].SSLKeyCertRefs
for _, listener := range gateway.Spec.Listeners {
gwStatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gateway.Namespace + "/" + gateway.Name)
for i, listener := range gateway.Spec.Listeners {
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status == "False" {
continue
}
if listener.TLS != nil {
for _, certRef := range listener.TLS.CertificateRefs {
name := string(certRef.Name)
Expand Down
22 changes: 21 additions & 1 deletion ako-gateway-api/nodes/gateway_model_rel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

akogatewayapilib "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/lib"
akogatewayapiobjects "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/objects"
Expand Down Expand Up @@ -126,8 +128,12 @@ func GatewayGetGw(namespace, name, key string) ([]string, bool) {
hostnames := make(map[string]string, 0)
var gwHostnames []string
//var hostnames map[string]string
gwStatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gwNsName)

for _, listenerObj := range gwObj.Spec.Listeners {
for i, listenerObj := range gwObj.Spec.Listeners {
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status != "True" {
continue
}
gwListener := akogatewayapiobjects.GatewayListenerStore{}
gwListener.Name = string(listenerObj.Name)
gwListener.Gateway = gwNsName
Expand Down Expand Up @@ -258,7 +264,20 @@ func HTTPRouteToGateway(namespace, name, key string) ([]string, bool) {
var gatewayList []string
var gwNsNameList []string
parentNameToHostnameMap := make(map[string][]string)
statusIndex := 0
for _, parentRef := range hrObj.Spec.ParentRefs {
if statusIndex >= len(hrObj.Status.Parents) {
break
}
if hrObj.Status.RouteStatus.Parents[statusIndex].ParentRef.Name != parentRef.Name {
continue
}
for statusIndex < len(hrObj.Status.Parents) && (parentRef.SectionName != nil && *hrObj.Status.RouteStatus.Parents[statusIndex].ParentRef.SectionName != *parentRef.SectionName) {
statusIndex += 1
}
if hrObj.Status.Parents[statusIndex].Conditions[0].Type == string(gatewayv1.RouteConditionAccepted) && hrObj.Status.Parents[statusIndex].Conditions[0].Status == metav1.ConditionFalse {
continue
}
hostnameIntersection, _ := parentNameToHostnameMap[string(parentRef.Name)]
ns := namespace
if parentRef.Namespace != nil {
Expand Down Expand Up @@ -332,6 +351,7 @@ func HTTPRouteToGateway(namespace, name, key string) ([]string, bool) {
gwNsNameList = append(gwNsNameList, gwNsName)
}
parentNameToHostnameMap[string(parentRef.Name)] = hostnameIntersection
statusIndex += 1
}

utils.AviLog.Debugf("key: %s, msg: Gateways retrieved %s", key, gwNsNameList)
Expand Down
11 changes: 11 additions & 0 deletions ako-gateway-api/nodes/route_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ func validateParentReference(key string, httpRoute *gatewayv1.HTTPRoute, httpRou
*parentRefIndexInHttpRouteStatus = *parentRefIndexInHttpRouteStatus + 1
return err
}
if gwStatus.Listeners[i].Conditions[0].Type == string(gatewayv1.GatewayConditionAccepted) && gwStatus.Listeners[i].Conditions[0].Status == metav1.ConditionFalse {
// listener is present in gateway but is in invalid state
utils.AviLog.Errorf("key: %s, msg: Matching gateway listener %s in Parent Reference is in invalid state", key, listenerName)
err := fmt.Errorf("Matching gateway listener is in Invalid state")
defaultCondition.
Reason(string(gatewayv1.RouteReasonPending)).
Message(err.Error()).
SetIn(&httpRouteStatus.Parents[*parentRefIndexInHttpRouteStatus].Conditions)
*parentRefIndexInHttpRouteStatus = *parentRefIndexInHttpRouteStatus + 1
return err
}
listenersForRoute = append(listenersForRoute, gateway.Spec.Listeners[i])
} else {
listenersForRoute = append(listenersForRoute, gateway.Spec.Listeners...)
Expand Down
32 changes: 21 additions & 11 deletions ako-gateway-api/status/gateway_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"errors"
"errors"
"reflect"
"strings"

Expand All @@ -30,6 +29,7 @@ import (
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

akogatewayapilib "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/lib"
akogatewayapiobjects "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/ako-gateway-api/objects"
"github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/internal/status"
"github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/pkg/utils"
)
Expand Down Expand Up @@ -145,7 +145,7 @@ func (o *gateway) Update(key string, option status.StatusOptions) {
return
}

gatewaystatus := gw.Status.DeepCopy()
gatewaystatus := akogatewayapiobjects.GatewayApiLister().GetGatewayToGatewayStatusMapping(gw.Namespace + "/" + gw.Name)
addressType := gatewayv1.IPAddressType
ipAddrs := []gatewayv1.GatewayStatusAddress{}
for _, vip := range option.Options.Vip {
Expand Down Expand Up @@ -178,15 +178,25 @@ func (o *gateway) Update(key string, option status.StatusOptions) {
Message(message).
SetIn(&gatewaystatus.Conditions)

for i := range gatewaystatus.Listeners {
for i, listener := range gatewaystatus.Listeners {
listenerCondition := NewCondition()
listenerCondition.
Type(string(gatewayv1.ListenerConditionProgrammed)).
Status(conditionStatus).
Reason(string(gatewayv1.ListenerReasonProgrammed)).
ObservedGeneration(gw.ObjectMeta.Generation).
Message(message).
SetIn(&gatewaystatus.Listeners[i].Conditions)
if listener.Conditions[0].Type == string(gatewayv1.ListenerConditionAccepted) && listener.Conditions[0].Status == metav1.ConditionTrue && listener.Conditions[1].Type == string(gatewayv1.ListenerConditionResolvedRefs) && listener.Conditions[1].Status == metav1.ConditionTrue {
listenerCondition.
Type(string(gatewayv1.ListenerConditionProgrammed)).
Status(conditionStatus).
Reason(string(gatewayv1.ListenerReasonProgrammed)).
ObservedGeneration(gw.ObjectMeta.Generation).
Message(message).
SetIn(&gatewaystatus.Listeners[i].Conditions)
} else {
listenerCondition.
Type(string(gatewayv1.ListenerConditionProgrammed)).
Status(metav1.ConditionFalse).
Reason(string(gatewayv1.ListenerReasonInvalid)).
ObservedGeneration(gw.ObjectMeta.Generation).
Message("Virtual service not configured/updated for this listener").
SetIn(&gatewaystatus.Listeners[i].Conditions)
}
}
o.Patch(key, gw, &status.Status{GatewayStatus: gatewaystatus})

Expand Down Expand Up @@ -238,7 +248,7 @@ func (o *gateway) Patch(key string, obj runtime.Object, status *status.Status, r
patchPayload, _ := json.Marshal(map[string]interface{}{
"status": status.GatewayStatus,
})
updatedGateway, err := akogatewayapilib.AKOControlConfig().GatewayAPIClientset().GatewayV1().Gateways(gw.Namespace).Patch(context.TODO(), gw.Name, types.MergePatchType, patchPayload, metav1.PatchOptions{}, "status")
_, err := akogatewayapilib.AKOControlConfig().GatewayAPIClientset().GatewayV1().Gateways(gw.Namespace).Patch(context.TODO(), gw.Name, types.MergePatchType, patchPayload, metav1.PatchOptions{}, "status")
if err != nil {
utils.AviLog.Warnf("key: %s, msg: there was an error in updating the gateway status. err: %+v, retry: %d", key, err, retry)
updatedGW, err := akogatewayapilib.AKOControlConfig().GatewayApiInformers().GatewayInformer.Lister().Gateways(gw.Namespace).Get(gw.Name)
Expand Down
Loading

0 comments on commit aa1cb49

Please sign in to comment.