Skip to content

Commit

Permalink
Merge pull request #42 from nirmata/leader-election
Browse files Browse the repository at this point in the history
feat: add leader election
  • Loading branch information
realshuting authored Aug 30, 2023
2 parents 7330150 + 92a3fc3 commit 94a6c7a
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 25 deletions.
19 changes: 19 additions & 0 deletions configs/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ rules:
- create
- update
- delete
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down Expand Up @@ -177,6 +188,14 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SERVICE_NAME
value: svc
- name: DEPLOYMENT_NAME
value: kyverno-notation-aws
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
# USE IF IRSA IS NOT CONFIGURED
# - name: AWS_ACCESS_KEY_ID
# value: ${AWS_ACCESS_KEY_ID}
Expand Down
2 changes: 1 addition & 1 deletion configs/samples/kyverno-policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
context:
- name: tlscerts
apiCall:
urlPath: "/api/v1/namespaces/kyverno-notation-aws/secrets/kyverno-notation-aws.kyverno-notation-aws.svc.tls-pair"
urlPath: "/api/v1/namespaces/kyverno-notation-aws/secrets/svc.kyverno-notation-aws.svc.tls-pair"
jmesPath: "base64_decode( data.\"tls.crt\" )"
- name: response
apiCall:
Expand Down
37 changes: 37 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"sync"

"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/controllers"
)

type Controller interface {
Run(context.Context, logr.Logger, *sync.WaitGroup)
}

type controller struct {
name string
controller controllers.Controller
workers int
}

func NewController(name string, c controllers.Controller, w int) Controller {
return controller{
name: name,
controller: c,
workers: w,
}
}

func (c controller) Run(ctx context.Context, logger logr.Logger, wg *sync.WaitGroup) {
wg.Add(1)
go func(logger logr.Logger) {
logger.Info("starting controller", "workers", c.workers)
defer logger.Info("controller stopped")
defer wg.Done()
c.controller.Run(ctx, c.workers)
}(logger.WithValues("name", c.name))
}
110 changes: 86 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,40 @@ import (
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/go-logr/zapr"
"github.com/kyverno/kyverno/pkg/leaderelection"
"github.com/kyverno/pkg/certmanager"
tlsMgr "github.com/kyverno/pkg/tls"
"github.com/nirmata/kyverno-notation-verifier/kubenotation"
knvSetup "github.com/nirmata/kyverno-notation-verifier/setup"
knvVerifier "github.com/nirmata/kyverno-notation-verifier/verifier"
_ "github.com/notaryproject/notation-core-go/signature/cose"
_ "github.com/notaryproject/notation-core-go/signature/jws"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
)

var (
namespace = "kyverno-notation-aws"
Namespace = os.Getenv("POD_NAMESPACE")
PodName = os.Getenv("POD_NAME")
ServiceName = getEnvWithFallback("SERVICE_NAME", "svc")
DeploymentName = getEnvWithFallback("DEPLOYMENT_NAME", "kyverno-notation-aws")

CertRenewalInterval = 12 * time.Hour
CAValidityDuration = 365 * 24 * time.Hour
TLSValidityDuration = 150 * 24 * time.Hour
resyncPeriod = 15 * time.Minute

resyncPeriod = 15 * time.Minute
)

func main() {
Expand Down Expand Up @@ -96,39 +107,74 @@ func main() {
log.Fatalf("failed to initialize kube client: %v", err)
}

signalCtx, sdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer sdown()

tlsMgrConfig := &tlsMgr.Config{
ServiceName: "kyverno-notation-aws",
Namespace: namespace,
ServiceName: ServiceName,
Namespace: Namespace,
}

certRenewer := tlsMgr.NewCertRenewer(
zapr.NewLogger(logger),
kubeClient.CoreV1().Secrets(namespace),
CertRenewalInterval,
CAValidityDuration,
TLSValidityDuration,
"",
tlsMgrConfig,
)

caStopCh := make(chan struct{}, 1)
caInformer := NewSecretInformer(kubeClient, namespace, tlsMgr.GenerateRootCASecretName(tlsMgrConfig), resyncPeriod)
caInformer := NewSecretInformer(kubeClient, Namespace, tlsMgr.GenerateRootCASecretName(tlsMgrConfig), resyncPeriod)
go caInformer.Informer().Run(caStopCh)

tlsStopCh := make(chan struct{}, 1)
tlsInformer := NewSecretInformer(kubeClient, namespace, tlsMgr.GenerateTLSPairSecretName(tlsMgrConfig), resyncPeriod)
tlsInformer := NewSecretInformer(kubeClient, Namespace, tlsMgr.GenerateTLSPairSecretName(tlsMgrConfig), resyncPeriod)
go tlsInformer.Informer().Run(tlsStopCh)

certManager := certmanager.NewController(
zapr.NewLogger(logger),
caInformer,
tlsInformer,
certRenewer,
tlsMgrConfig,
le, err := leaderelection.New(
zapr.NewLogger(logger).WithName("leader-election"),
DeploymentName,
Namespace,
kubeClient,
PodName,
2*time.Second,
func(ctx context.Context) {

certRenewer := tlsMgr.NewCertRenewer(
zapr.NewLogger(logger).WithName("tls").WithValues("pod", PodName),
kubeClient.CoreV1().Secrets(Namespace),
CertRenewalInterval,
CAValidityDuration,
TLSValidityDuration,
"",
tlsMgrConfig,
)

certManager := certmanager.NewController(
zapr.NewLogger(logger).WithName("certmanager").WithValues("pod", PodName),
caInformer,
tlsInformer,
certRenewer,
tlsMgrConfig,
)

leaderControllers := []Controller{NewController("cert-manager", certManager, 1)}

// start leader controllers
var wg sync.WaitGroup
for _, controller := range leaderControllers {
controller.Run(signalCtx, zapr.NewLogger(logger).WithName("controllers"), &wg)
}
// wait all controllers shut down
wg.Wait()
},
nil,
)
if err != nil {
log.Fatalf("failed to initialize leader election: %v", err)
os.Exit(1)
}

// start leader election
go func() {
certManager.Run(context.TODO(), 1)
select {
case <-signalCtx.Done():
return
default:
le.Run(signalCtx)
}
}()

crdSetup, err := kubenotation.Setup(zapr.NewLogger(logger), metricsAddr, probeAddr, enableLeaderElection)
Expand Down Expand Up @@ -172,7 +218,23 @@ func main() {
errsTLS := make(chan error, 1)
if !flagNoTLS {
tlsConf := &tls.Config{
GetCertificate: certManager.GetCertificate,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
secret, err := tlsInformer.Lister().Secrets(tlsMgrConfig.Namespace).Get(tlsMgr.GenerateTLSPairSecretName(tlsMgrConfig))
if err != nil {
return nil, err
} else if secret == nil {
return nil, errors.New("tls secret not found")
} else if secret.Type != corev1.SecretTypeTLS {
return nil, errors.New("secret is not a TLS secret")
}

cert, err := tls.X509KeyPair(secret.Data[corev1.TLSCertKey], secret.Data[corev1.TLSPrivateKeyKey])
if err != nil {
return nil, err
}

return &cert, nil
},
}
srv := &http.Server{
Addr: ":9443",
Expand Down
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -44,3 +45,10 @@ func (i *secretInformer) Informer() cache.SharedIndexInformer {
func (i *secretInformer) Lister() corev1listers.SecretLister {
return i.lister
}

func getEnvWithFallback(name, fallback string) string {
if value := os.Getenv(name); value != "" {
return value
}
return fallback
}

0 comments on commit 94a6c7a

Please sign in to comment.