Skip to content

Commit

Permalink
[Enhancement] Respect the component conf definition in configmaps (#568)
Browse files Browse the repository at this point in the history
Signed-off-by: yandongxiao <yandongxiao@starrocks.com>
  • Loading branch information
yandongxiao authored Jul 24, 2024
1 parent 3e89206 commit a8a8b87
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 131 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
sigs.k8s.io/controller-runtime v0.14.0
sigs.k8s.io/yaml v1.3.0
)

require (
Expand Down Expand Up @@ -82,5 +83,4 @@ require (
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
22 changes: 0 additions & 22 deletions pkg/common/resource_utils/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ limitations under the License.
package resource_utils

import (
"bytes"
"strconv"

"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
)

// the fe ports key
Expand Down Expand Up @@ -70,24 +66,6 @@ var DefMap = map[string]int32{
BRPC_PORT: 8060,
}

func ResolveConfigMap(configMap *corev1.ConfigMap, key string) (map[string]interface{}, error) {
res := make(map[string]interface{})
data := configMap.Data
if _, ok := data[key]; !ok {
return res, nil
}
value := data[key]

// We use a new viper instance, not the global one, in order to avoid concurrency problems: concurrent map iteration
// and map write,
v := viper.New()
v.SetConfigType("properties")
if err := v.ReadConfig(bytes.NewBuffer([]byte(value))); err != nil {
return nil, err
}
return v.AllSettings(), nil
}

// GetPort get ports from config file.
func GetPort(config map[string]interface{}, key string) int32 {
if v, ok := config[key]; ok {
Expand Down
20 changes: 0 additions & 20 deletions pkg/common/resource_utils/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,3 @@ limitations under the License.
*/

package resource_utils

import (
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

func TestResolveConfigMap(t *testing.T) {
configMap := corev1.ConfigMap{
Data: map[string]string{
"fe.conf": "http_port = 8030",
},
}
res, err := ResolveConfigMap(&configMap, "fe.conf")
require.NoError(t, err)

_, ok := res["http_port"]
require.Equal(t, true, ok)
}
83 changes: 83 additions & 0 deletions pkg/k8sutils/k8sutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package k8sutils

import (
"bytes"
"context"
"fmt"
"path/filepath"
"strings"
"unicode"

"github.com/go-logr/logr"
"github.com/spf13/viper"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -469,3 +471,84 @@ func CheckVolumes(volumes []corev1.Volume, mounts []corev1.VolumeMount) error {
}
return nil
}

// GetConfig get the config of component.
// First, It tries to read the config from the ConfigMapInfo, which has the configMap name and key.
// Second, if the ConfigMapInfo is empty, it will try to read the config from the ConfigMaps.
// Last, If the fe ConfigMapInfo is empty and the configMaps is nil, it will return an empty map.
func GetConfig(ctx context.Context, k8sClient client.Client,
configMapInfo srapi.ConfigMapInfo,
configMaps []srapi.ConfigMapReference, expectMountPath, expectKey string,
namespace string) (map[string]interface{}, error) {
if configMapInfo.ConfigMapName != "" || configMapInfo.ResolveKey != "" {
if configMapInfo.ConfigMapName == "" || configMapInfo.ResolveKey == "" {
return make(map[string]interface{}), nil
}
configMap, err := GetConfigMap(ctx, k8sClient, namespace, configMapInfo.ConfigMapName)
if err != nil {
if apierrors.IsNotFound(err) {
return make(map[string]interface{}), nil
}
return nil, err
}

res, err := ResolveConfigMap(configMap, configMapInfo.ResolveKey)
return res, err
}
return getConfigFromConfigMaps(ctx, k8sClient, configMaps, expectMountPath, expectKey, namespace)
}

func ResolveConfigMap(configMap *corev1.ConfigMap, key string) (map[string]interface{}, error) {
res := make(map[string]interface{})
data := configMap.Data
if _, ok := data[key]; !ok {
return res, nil
}
value := data[key]

// We use a new viper instance, not the global one, in order to avoid concurrency problems: concurrent map iteration
// and map write,
v := viper.New()
v.SetConfigType("properties")
if err := v.ReadConfig(bytes.NewBuffer([]byte(value))); err != nil {
return nil, err
}
return v.AllSettings(), nil
}

// getConfigFromConfigMaps try to read the config from the configMaps. The strategy is to match
// the mountPath with expectMountPath.
// - if subpath is empty, the mount path should equal to expectMountPath. And it will use expectKey as the key.
// - if subpath is not empty, it should equal to expectKey, and the mount path should be expectMountPath/expectKey.
func getConfigFromConfigMaps(ctx context.Context, k8sClient client.Client,
configMaps []srapi.ConfigMapReference, expectMountPath, expectKey string,
namespace string) (map[string]interface{}, error) {
configMapName := ""
for i := range configMaps {
subPath := configMaps[i].SubPath
if subPath == "" {
if configMaps[i].MountPath == expectMountPath {
configMapName = configMaps[i].Name
// don't break here, we need to use the ConfigMapReference with the subPath first.
}
} else {
if configMaps[i].MountPath == filepath.Join(expectMountPath, expectKey) && expectKey == subPath {
configMapName = configMaps[i].Name
break
}
}
}
if configMapName == "" {
return make(map[string]interface{}), nil
}

configMap, err := GetConfigMap(ctx, k8sClient, namespace, configMapName)
if err != nil {
if apierrors.IsNotFound(err) {
return make(map[string]interface{}), nil
}
return nil, err
}
res, err := ResolveConfigMap(configMap, expectKey)
return res, err
}
14 changes: 14 additions & 0 deletions pkg/k8sutils/k8sutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -879,3 +880,16 @@ func TestHasVolume(t *testing.T) {
})
}
}

func TestResolveConfigMap(t *testing.T) {
configMap := corev1.ConfigMap{
Data: map[string]string{
"fe.conf": "http_port = 8030",
},
}
res, err := k8sutils.ResolveConfigMap(&configMap, "fe.conf")
require.NoError(t, err)

_, ok := res["http_port"]
require.Equal(t, true, ok)
}
32 changes: 7 additions & 25 deletions pkg/subcontrollers/be/be_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (be *BeController) SyncCluster(ctx context.Context, src *srapi.StarRocksClu
}

logger.V(log.DebugLevel).Info("get be/fe config to resolve ports", "ConfigMapInfo", beSpec.ConfigMapInfo)
config, err := be.GetConfig(ctx, &beSpec.ConfigMapInfo, src.Namespace)
config, err := be.GetBeConfig(ctx, beSpec, src.Namespace)
if err != nil {
logger.Error(err, "get be config failed", "ConfigMapInfo", beSpec.ConfigMapInfo)
return err
}

feconfig, _ := be.getFeConfig(ctx, &src.Spec.StarRocksFeSpec.ConfigMapInfo, src.Namespace)
feconfig, _ := fe.GetFEConfig(ctx, be.Client, src.Spec.StarRocksFeSpec, src.Namespace)
// add query port from fe config.
config[rutils.QUERY_PORT] = strconv.FormatInt(int64(rutils.GetPort(feconfig, rutils.QUERY_PORT)), 10)
// generate new be external service.
Expand Down Expand Up @@ -207,29 +207,11 @@ func (be *BeController) generateInternalService(ctx context.Context,
return searchSvc
}

func (be *BeController) GetConfig(ctx context.Context,
configMapInfo *srapi.ConfigMapInfo, namespace string) (map[string]interface{}, error) {
configMap, err := k8sutils.GetConfigMap(ctx, be.Client, namespace, configMapInfo.ConfigMapName)
if err != nil && apierrors.IsNotFound(err) {
return make(map[string]interface{}), nil
} else if err != nil {
return make(map[string]interface{}), err
}

res, err := rutils.ResolveConfigMap(configMap, configMapInfo.ResolveKey)
return res, err
}

func (be *BeController) getFeConfig(ctx context.Context,
feconfigMapInfo *srapi.ConfigMapInfo, namespace string) (map[string]interface{}, error) {
feconfigMap, err := k8sutils.GetConfigMap(ctx, be.Client, namespace, feconfigMapInfo.ConfigMapName)
if err != nil && apierrors.IsNotFound(err) {
return make(map[string]interface{}), nil
} else if err != nil {
return make(map[string]interface{}), err
}
res, err := rutils.ResolveConfigMap(feconfigMap, feconfigMapInfo.ResolveKey)
return res, err
// GetBeConfig get the config of BE from configmap.
func (be *BeController) GetBeConfig(ctx context.Context,
beSpec *srapi.StarRocksBeSpec, namespace string) (map[string]interface{}, error) {
return k8sutils.GetConfig(ctx, be.Client, beSpec.ConfigMapInfo,
beSpec.ConfigMaps, _beConfDirPath, _beConfigKey, namespace)
}

func (be *BeController) ClearResources(ctx context.Context, src *srapi.StarRocksCluster) error {
Expand Down
Loading

0 comments on commit a8a8b87

Please sign in to comment.