Skip to content

Commit

Permalink
Allower to read cluster config directly from the k8s config-map (#484)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 20, 2024
1 parent 82bdb1c commit 6c0fc21
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 141 deletions.
99 changes: 66 additions & 33 deletions cmd/coordinator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,41 @@ package coordinator
import (
"errors"
"io"
"time"
"log/slog"
"strings"

"github.com/fsnotify/fsnotify"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/streamnative/oxia/cmd/flag"
"github.com/streamnative/oxia/common"

"github.com/streamnative/oxia/cmd/flag"
"github.com/streamnative/oxia/coordinator"
"github.com/streamnative/oxia/coordinator/model"
)

var (
conf = coordinator.NewConfig()
configFile string
configChangeCh chan struct{}
conf = coordinator.NewConfig()
configFile string

Cmd = &cobra.Command{
Use: "coordinator",
Short: "Start a coordinator",
Long: `Start a coordinator`,
PreRunE: validate,
Run: exec,
RunE: exec,
}
)

func init() {
flag.InternalAddr(Cmd, &conf.InternalServiceAddr)
flag.MetricsAddr(Cmd, &conf.MetricsServiceAddr)
Cmd.Flags().Var(&conf.MetadataProviderImpl, "metadata", "Metadata provider implementation: file, configmap or memory")
Cmd.Flags().StringVar(&conf.K8SMetadataNamespace, "k8s-namespace", conf.K8SMetadataNamespace, "Kubernetes namespace for metadata configmap")
Cmd.Flags().StringVar(&conf.K8SMetadataConfigMapName, "k8s-configmap-name", conf.K8SMetadataConfigMapName, "ConfigMap name for metadata configmap")
Cmd.Flags().StringVar(&conf.K8SMetadataNamespace, "k8s-namespace", conf.K8SMetadataNamespace, "Kubernetes namespace for oxia config maps")
Cmd.Flags().StringVar(&conf.K8SMetadataConfigMapName, "k8s-configmap-name", conf.K8SMetadataConfigMapName, "ConfigMap name for cluster status configmap")
Cmd.Flags().StringVar(&conf.FileMetadataPath, "file-clusters-status-path", "data/cluster-status.json", "The path where the cluster status is stored when using 'file' provider")
Cmd.Flags().StringVarP(&configFile, "conf", "f", "", "Cluster config file")
Cmd.Flags().DurationVar(&conf.ClusterConfigRefreshTime, "conf-file-refresh-time", 1*time.Minute, "How frequently to check for updates for cluster configuration file")

setConfigPath()
viper.OnConfigChange(func(_ fsnotify.Event) {
configChangeCh <- struct{}{}
})
}

func validate(*cobra.Command, []string) error {
Expand All @@ -69,41 +63,80 @@ func validate(*cobra.Command, []string) error {
return errors.New("k8s-configmap-name must be set with metadata=configmap")
}
}
if _, _, err := loadClusterConfig(); err != nil {
return err
}
return nil
}

func setConfigPath() {
func configIsRemote() bool {
return strings.HasPrefix(configFile, "configmap:")
}

func setConfigPath(v *viper.Viper) error {
v.SetConfigType("yaml")

if configIsRemote() {
err := v.AddRemoteProvider("configmap", "endpoint", configFile)
if err != nil {
slog.Error("Failed to add remote provider", slog.Any("error", err))
return err
}

return v.WatchRemoteConfigOnChannel()
}

if configFile == "" {
viper.AddConfigPath("/oxia/conf")
viper.AddConfigPath(".")
} else {
viper.SetConfigFile(configFile)
v.AddConfigPath("/oxia/conf")
v.AddConfigPath(".")
}

v.SetConfigFile(configFile)
v.WatchConfig()
return nil
}

func loadClusterConfig() (model.ClusterConfig, chan struct{}, error) {
setConfigPath()
func loadClusterConfig(v *viper.Viper) (model.ClusterConfig, error) {
cc := model.ClusterConfig{}

if err := viper.ReadInConfig(); err != nil {
return cc, configChangeCh, err
var err error

if configIsRemote() {
err = v.ReadRemoteConfig()
} else {
err = v.ReadInConfig()
}

if err != nil {
return cc, err
}

if err := viper.Unmarshal(&cc); err != nil {
return cc, configChangeCh, err
if err := v.Unmarshal(&cc); err != nil {
return cc, err
}

return cc, configChangeCh, nil
return cc, nil
}

func exec(*cobra.Command, []string) {
viper.WatchConfig()
conf.ClusterConfigProvider = loadClusterConfig
func exec(*cobra.Command, []string) error {
v := viper.New()

conf.ClusterConfigChangeNotifications = make(chan any)
conf.ClusterConfigProvider = func() (model.ClusterConfig, error) {
return loadClusterConfig(v)
}

v.OnConfigChange(func(e fsnotify.Event) {
conf.ClusterConfigChangeNotifications <- nil
})

if err := setConfigPath(v); err != nil {
return err
}

if _, err := loadClusterConfig(v); err != nil {
return err
}

common.RunProcess(func() (io.Closer, error) {
return coordinator.New(conf)
})
return nil
}
33 changes: 19 additions & 14 deletions cmd/coordinator/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
"strings"
"testing"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -62,10 +63,9 @@ func TestCmd(t *testing.T) {
isErr bool
}{
{[]string{}, coordinator.Config{
InternalServiceAddr: "localhost:6649",
MetricsServiceAddr: "localhost:8080",
MetadataProviderImpl: coordinator.File,
ClusterConfigRefreshTime: 0,
InternalServiceAddr: "localhost:6649",
MetricsServiceAddr: "localhost:8080",
MetadataProviderImpl: coordinator.File,
}, model.ClusterConfig{
Namespaces: []model.NamespaceConfig{{
Name: common.DefaultNamespace,
Expand Down Expand Up @@ -143,22 +143,27 @@ func TestCmd(t *testing.T) {
},
}, false},
{[]string{"-f=invalid.yaml"}, coordinator.Config{
InternalServiceAddr: "localhost:6649",
MetricsServiceAddr: "localhost:8080",
InternalServiceAddr: "localhost:6649",
MetricsServiceAddr: "localhost:8080",
MetadataProviderImpl: coordinator.File,
}, model.ClusterConfig{}, true},
} {
t.Run(strings.Join(test.args, "_"), func(t *testing.T) {
conf = coordinator.NewConfig()
configFile = ""
viper.Reset()
Cmd.SetArgs(test.args)
Cmd.Run = func(cmd *cobra.Command, args []string) {
assert.Equal(t, test.expectedConf, conf)
Cmd.RunE = func(cmd *cobra.Command, args []string) error {
v := viper.New()
assert.NoError(t, setConfigPath(v))

conf.ClusterConfigProvider = loadClusterConfig
clusterConf, _, err := conf.ClusterConfigProvider()
assert.NoError(t, err)
assert.Equal(t, test.expectedConf, conf)
conf.ClusterConfigProvider = func() (model.ClusterConfig, error) {
return loadClusterConfig(v)
}
clusterConf, err := conf.ClusterConfigProvider()
assert.Equal(t, test.isErr, err != nil)
assert.Equal(t, test.expectedClusterConf, clusterConf)
return err
}
err = Cmd.Execute()
assert.Equal(t, test.isErr, err != nil)
Expand All @@ -182,7 +187,7 @@ func TestCmd(t *testing.T) {
configFile = ""
viper.Reset()
Cmd.SetArgs(test.args)
Cmd.Run = func(cmd *cobra.Command, args []string) {}
Cmd.RunE = func(cmd *cobra.Command, args []string) error { return nil }
err = Cmd.Execute()
assert.Equal(t, test.isErr, err != nil)
})
Expand Down
132 changes: 132 additions & 0 deletions cmd/coordinator/viper_configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package coordinator

import (
"bytes"
"context"
"io"
"log/slog"
"strings"

"github.com/pkg/errors"
"github.com/spf13/viper"
v1 "k8s.io/api/core/v1" //nolint:revive
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/streamnative/oxia/common"
"github.com/streamnative/oxia/coordinator/impl"
)

type cmConfigProvider struct {
}

const filePath = "config.yaml"

func getNamespaceAndCmName(rp viper.RemoteProvider) (namespace, cmName string, err error) {
p := strings.Split(strings.TrimPrefix(rp.Path(), "configmap:"), "/")
if len(p) != 2 {
return "", "", errors.New("Invalid configmap configuration")
}

return p[0], p[1], nil
}

func (*cmConfigProvider) Get(rp viper.RemoteProvider) (io.Reader, error) {
kubernetes := impl.NewK8SClientset(impl.NewK8SClientConfig())
namespace, configmap, err := getNamespaceAndCmName(rp)
if err != nil {
return nil, err
}
cmValue, err := impl.K8SConfigMaps(kubernetes).Get(namespace, configmap)
if err != nil {
return nil, err
}

data, ok := cmValue.Data[filePath]
if !ok {
return nil, errors.Errorf("path not found in config map: %s", rp.Path())
}
return bytes.NewReader([]byte(data)), nil
}

func (c *cmConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error) {
return c.Get(rp)
}

func (*cmConfigProvider) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) {
kubernetes := impl.NewK8SClientset(impl.NewK8SClientConfig())
namespace, configmap, _ := getNamespaceAndCmName(rp)

ch := make(chan *viper.RemoteResponse)
w, err := kubernetes.CoreV1().ConfigMaps(namespace).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
slog.Error("Failed to setup watch on config map",
slog.String("k8s-namespace", namespace),
slog.String("k8s-config-map", configmap),
slog.Any("error", err))
ch <- &viper.RemoteResponse{Error: err}
close(ch)
return ch, nil
}

go common.DoWithLabels(context.Background(), map[string]string{
"component": "k8s-configmap-watch",
}, func() {
for res := range w.ResultChan() {
cm, ok := res.Object.(*v1.ConfigMap)
if !ok {
slog.Warn("Got wrong type of object notification",
slog.String("k8s-namespace", namespace),
slog.String("k8s-config-map", configmap),
slog.Any("object", res),
)
}
if cm.Name != configmap {
continue
}

slog.Info("Got watch event from K8S",
slog.String("k8s-namespace", namespace),
slog.String("k8s-config-map", configmap),
slog.Any("event-type", res.Type),
)

switch res.Type {
case watch.Added, watch.Modified:
ch <- &viper.RemoteResponse{
Value: []byte(cm.Data[filePath]),
Error: nil,
}

// Also notifies directly the oxia coordinator
conf.ClusterConfigChangeNotifications <- nil
default:
ch <- &viper.RemoteResponse{
Value: nil,
Error: errors.Errorf("unexpected event on config map: %v", res.Type),
}
}
}
})

return ch, nil
}

func init() {
viper.RemoteConfig = &cmConfigProvider{}
viper.SupportedRemoteProviders = []string{"configmap"}
}
Loading

0 comments on commit 6c0fc21

Please sign in to comment.