Skip to content

Commit

Permalink
Merge branch 'main' into K8SPS-353
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes authored Aug 8, 2024
2 parents f7cae2d + 786fa5f commit 07db709
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 66 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ require (
github.com/sjmudd/stopwatch v0.1.1
go.nhat.io/grpcmock v0.26.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.16.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.17.0
google.golang.org/grpc v1.65.0
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -71,7 +71,7 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0
github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down Expand Up @@ -111,15 +111,15 @@ require (
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.23.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/protobuf v1.34.1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240723171418-e6d459c13d2a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 h1:CWyXh/jylQWp2dtiV33mY4iSSp6yf4lmn+c7/tN+ObI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLmrppXsfT07oC8k0XNDxd8sVU=
github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc=
github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
Expand Down Expand Up @@ -292,15 +292,15 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -314,8 +314,8 @@ golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -340,19 +340,19 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/api v0.0.0-20240723171418-e6d459c13d2a h1:YIa/rzVqMEokBkPtydCkx1VLmv3An1Uw7w1P1m6EhOY=
google.golang.org/genproto/googleapis/api v0.0.0-20240723171418-e6d459c13d2a/go.mod h1:AHT0dDg3SoMOgZGnZk29b5xTbPHMoEC8qthmBLJCpys=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a h1:hqK4+jJZXCU4pW7jsAdGOVFIfLHQeV7LaizZKnZ84HI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240723171418-e6d459c13d2a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,13 +807,19 @@ func (r *PerconaServerMySQLReconciler) reconcileReplication(ctx context.Context,
}

if err := orchestrator.Discover(ctx, r.ClientCmd, pod, mysql.ServiceName(cr), mysql.DefaultPort); err != nil {
switch err.Error() {
case "Unauthorized":
switch {
case errors.Is(err, orchestrator.ErrUnauthorized):
log.Info("mysql is not ready, unauthorized orchestrator discover response. skip")
return nil
case orchestrator.ErrEmptyResponse.Error():
case errors.Is(err, orchestrator.ErrEmptyResponse):
log.Info("mysql is not ready, empty orchestrator discover response. skip")
return nil
case errors.Is(err, orchestrator.ErrBadConn):
log.Info("mysql is not ready, bad connection. skip")
return nil
case errors.Is(err, orchestrator.ErrNoSuchHost):
log.Info("mysql is not ready, host not found. skip")
return nil
}
return errors.Wrap(err, "failed to discover cluster")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ func (r *PerconaServerMySQLReconciler) isAsyncReady(ctx context.Context, cr *api

instances, err := orchestrator.Cluster(ctx, r.ClientCmd, pod, cr.ClusterHint())
if err != nil {
if errors.Is(err, orchestrator.ErrEmptyResponse) || errors.Is(err, orchestrator.ErrUnableToGetClusterName) {
return false, errors.Wrap(err, "orchestrator").Error(), nil
}
return false, "", err
}

Expand Down
85 changes: 43 additions & 42 deletions pkg/orchestrator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package orchestrator
import (
"bytes"
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strings"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand All @@ -18,6 +20,25 @@ type orcResponse struct {
Details interface{} `json:"Details,omitempty"`
}

func (r *orcResponse) Error() error {
if r.Code != "ERROR" {
return nil
}
if strings.Contains(r.Message, "Unable to determine cluster name") {
return ErrUnableToGetClusterName
}
if r.Message == "Unauthorized" {
return ErrUnauthorized
}
if r.Message == driver.ErrBadConn.Error() {
return ErrBadConn
}
if strings.Contains(r.Message, "no such host") {
return ErrNoSuchHost
}
return errors.New(r.Message)
}

type InstanceKey struct {
Hostname string `json:"Hostname"`
Port int32 `json:"Port"`
Expand All @@ -32,7 +53,13 @@ type Instance struct {
Problems []string `json:"Problems"`
}

var ErrEmptyResponse = errors.New("empty response")
var (
ErrEmptyResponse = errors.New("empty response")
ErrUnableToGetClusterName = errors.New("unable to determine cluster name")
ErrUnauthorized = errors.New("unauthorized")
ErrBadConn = errors.New("bad connection")
ErrNoSuchHost = errors.New("mysql host not found")
)

func exec(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, endpoint string, outb, errb *bytes.Buffer) error {
c := []string{"curl", fmt.Sprintf("localhost:%d/%s", defaultWebPort, endpoint)}
Expand Down Expand Up @@ -65,8 +92,8 @@ func ClusterPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Po
return nil, errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return nil, errors.New(orcResp.Message)
if err := orcResp.Error(); err != nil {
return nil, err
}

return primary, nil
Expand All @@ -86,11 +113,7 @@ func StopReplication(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.P
return errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}

return nil
return orcResp.Error()
}

func StartReplication(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, host string, port int32) error {
Expand All @@ -107,11 +130,7 @@ func StartReplication(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.
return errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}

return nil
return orcResp.Error()
}

func AddPeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, peer string) error {
Expand All @@ -136,11 +155,7 @@ func AddPeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, peer
return errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}

return nil
return orcResp.Error()
}

func RemovePeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, peer string) error {
Expand All @@ -165,11 +180,7 @@ func RemovePeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, p
return errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}

return nil
return orcResp.Error()
}

func EnsureNodeIsPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clusterHint, host string, port int) error {
Expand Down Expand Up @@ -197,11 +208,7 @@ func EnsureNodeIsPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *core
return errors.Wrapf(err, "json decode \"%s\"", string(body))
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}

return nil
return orcResp.Error()
}

func Discover(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, host string, port int) error {
Expand All @@ -224,10 +231,7 @@ func Discover(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, hos
return errors.Wrapf(err, "json decode \"%s\"", string(body))
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}
return nil
return orcResp.Error()
}

func SetWriteable(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, host string, port int) error {
Expand All @@ -250,10 +254,7 @@ func SetWriteable(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod,
return errors.Wrapf(err, "json decode \"%s\"", string(body))
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}
return nil
return orcResp.Error()
}

func Cluster(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clusterHint string) ([]*Instance, error) {
Expand All @@ -266,6 +267,9 @@ func Cluster(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clus
}

body := res.Bytes()
if len(body) == 0 {
return nil, ErrEmptyResponse
}

instances := []*Instance{}
if err := json.Unmarshal(body, &instances); err == nil {
Expand All @@ -277,8 +281,8 @@ func Cluster(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clus
return nil, errors.Wrap(err, "json decode")
}

if orcResp.Code == "ERROR" {
return nil, errors.New(orcResp.Message)
if err := orcResp.Error(); err != nil {
return nil, err
}

return instances, nil
Expand All @@ -304,8 +308,5 @@ func ForgetInstance(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Po
return errors.Wrapf(err, "json decode \"%s\"", string(body))
}

if orcResp.Code == "ERROR" {
return errors.New(orcResp.Message)
}
return nil
return orcResp.Error()
}

0 comments on commit 07db709

Please sign in to comment.