diff --git a/build/pxc-entrypoint.sh b/build/pxc-entrypoint.sh index 9041bdd86a..5465176a73 100755 --- a/build/pxc-entrypoint.sh +++ b/build/pxc-entrypoint.sh @@ -149,6 +149,13 @@ function join { echo "${joined%?}" } +escape_special() { + echo "$1" \ + | sed 's/\\/\\\\/g' \ + | sed 's/'\''/'\\\\\''/g' \ + | sed 's/"/\\\"/g' +} + MYSQL_VERSION=$(mysqld -V | awk '{print $3}' | awk -F'.' '{print $1"."$2}') MYSQL_PATCH_VERSION=$(mysqld -V | awk '{print $3}' | awk -F'.' '{print $3}' | awk -F'-' '{print $1}') @@ -256,10 +263,10 @@ else fi -WSREP_CLUSTER_NAME=$(grep wsrep_cluster_name ${CFG}| cut -d '=' -f 2| tr -d ' ' ) +WSREP_CLUSTER_NAME=$(grep wsrep_cluster_name ${CFG} | cut -d '=' -f 2 | tr -d ' ') if [[ -z ${WSREP_CLUSTER_NAME} || ${WSREP_CLUSTER_NAME} == 'noname' ]]; then - echo "Cluster name is invalid, please check DNS" - exit 1 + echo "Cluster name is invalid, please check DNS" + exit 1 fi # if we have CLUSTER_JOIN - then we do not need to perform datadir initialize @@ -341,7 +348,7 @@ if [ -z "$CLUSTER_JOIN" ] && [ "$1" = 'mysqld' -a -z "$wantHelp" ]; then # no, we don't care if read finds a terminating character in this heredoc # https://unix.stackexchange.com/questions/265149/why-is-set-o-errexit-breaking-this-read-heredoc-expression/265151#265151 read -r -d '' rootCreate <<-EOSQL || true - CREATE USER 'root'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ; + CREATE USER 'root'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '$(escape_special "${MYSQL_ROOT_PASSWORD}")' ; GRANT ALL ON *.* TO 'root'@'${MYSQL_ROOT_HOST}' WITH GRANT OPTION ; EOSQL fi @@ -370,28 +377,28 @@ if [ -z "$CLUSTER_JOIN" ] && [ "$1" = 'mysqld' -a -z "$wantHelp" ]; then SET @@SESSION.SQL_LOG_BIN=0; DELETE FROM mysql.user WHERE user NOT IN ('mysql.sys', 'mysqlxsys', 'root', 'mysql.infoschema', 'mysql.pxc.internal.session', 'mysql.pxc.sst.role', 'mysql.session') OR host NOT IN ('localhost') ; - ALTER USER 'root'@'localhost' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ; + ALTER USER 'root'@'localhost' IDENTIFIED BY '$(escape_special "${MYSQL_ROOT_PASSWORD}")' ; GRANT ALL ON *.* TO 'root'@'localhost' WITH GRANT OPTION ; ${rootCreate} /*!80016 REVOKE SYSTEM_USER ON *.* FROM root */; - CREATE USER 'operator'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '${OPERATOR_ADMIN_PASSWORD}' ; + CREATE USER 'operator'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '$(escape_special "${OPERATOR_ADMIN_PASSWORD}")' ; GRANT ALL ON *.* TO 'operator'@'${MYSQL_ROOT_HOST}' WITH GRANT OPTION ; - CREATE USER 'xtrabackup'@'%' IDENTIFIED BY '${XTRABACKUP_PASSWORD}'; + CREATE USER 'xtrabackup'@'%' IDENTIFIED BY '$(escape_special "${XTRABACKUP_PASSWORD}")'; GRANT ALL ON *.* TO 'xtrabackup'@'%'; - CREATE USER 'monitor'@'${MONITOR_HOST}' IDENTIFIED BY '${MONITOR_PASSWORD}' WITH MAX_USER_CONNECTIONS 100; + CREATE USER 'monitor'@'${MONITOR_HOST}' IDENTIFIED BY '$(escape_special "${MONITOR_PASSWORD}")' WITH MAX_USER_CONNECTIONS 100; GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'monitor'@'${MONITOR_HOST}'; GRANT SELECT ON performance_schema.* TO 'monitor'@'${MONITOR_HOST}'; ${monitorConnectGrant} - CREATE USER 'clustercheck'@'localhost' IDENTIFIED BY '${CLUSTERCHECK_PASSWORD}'; + CREATE USER 'clustercheck'@'localhost' IDENTIFIED BY '$(escape_special "${CLUSTERCHECK_PASSWORD}")'; GRANT PROCESS ON *.* TO 'clustercheck'@'localhost'; ${systemUserGrant} - CREATE USER 'replication'@'%' IDENTIFIED BY '${REPLICATION_PASSWORD}'; + CREATE USER 'replication'@'%' IDENTIFIED BY '$(escape_special "${REPLICATION_PASSWORD}")'; GRANT REPLICATION SLAVE ON *.* to 'replication'@'%'; DROP DATABASE IF EXISTS test; FLUSH PRIVILEGES ; diff --git a/cmd/pitr/collector/collector.go b/cmd/pitr/collector/collector.go index 20834c459c..1fa5b94e2e 100644 --- a/cmd/pitr/collector/collector.go +++ b/cmd/pitr/collector/collector.go @@ -2,6 +2,7 @@ package collector import ( "bytes" + "context" "crypto/md5" "fmt" "io" @@ -13,6 +14,7 @@ import ( "syscall" "time" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/minio/minio-go/v7" "github.com/pkg/errors" @@ -31,17 +33,31 @@ type Collector struct { } type Config struct { - PXCServiceName string `env:"PXC_SERVICE,required"` - PXCUser string `env:"PXC_USER,required"` - PXCPass string `env:"PXC_PASS,required"` - S3Endpoint string `env:"ENDPOINT" envDefault:"s3.amazonaws.com"` - S3AccessKeyID string `env:"ACCESS_KEY_ID,required"` - S3AccessKey string `env:"SECRET_ACCESS_KEY,required"` - S3BucketURL string `env:"S3_BUCKET_URL,required"` - S3Region string `env:"DEFAULT_REGION,required"` - BufferSize int64 `env:"BUFFER_SIZE"` - CollectSpanSec float64 `env:"COLLECT_SPAN_SEC" envDefault:"60"` - VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"` + PXCServiceName string `env:"PXC_SERVICE,required"` + PXCUser string `env:"PXC_USER,required"` + PXCPass string `env:"PXC_PASS,required"` + StorageType string `env:"STORAGE_TYPE,required"` + BackupStorageS3 BackupS3 + BackupStorageAzure BackupAzure + BufferSize int64 `env:"BUFFER_SIZE"` + CollectSpanSec float64 `env:"COLLECT_SPAN_SEC" envDefault:"60"` + VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"` +} + +type BackupS3 struct { + Endpoint string `env:"ENDPOINT" envDefault:"s3.amazonaws.com"` + AccessKeyID string `env:"ACCESS_KEY_ID,required"` + AccessKey string `env:"SECRET_ACCESS_KEY,required"` + BucketURL string `env:"S3_BUCKET_URL,required"` + Region string `env:"DEFAULT_REGION,required"` +} + +type BackupAzure struct { + Endpoint string `env:"AZURE_ENDPOINT,required"` + ContainerName string `env:"AZURE_CONTAINER_NAME,required"` + StorageClass string `env:"AZURE_STORAGE_CLASS"` + AccountName string `env:"AZURE_STORAGE_ACCOUNT,required"` + AccountKey string `env:"AZURE_ACCESS_KEY,required"` } const ( @@ -50,27 +66,40 @@ const ( ) func New(c Config) (*Collector, error) { - bucketArr := strings.Split(c.S3BucketURL, "/") - prefix := "" - // if c.S3BucketURL looks like "my-bucket/data/more-data" we need prefix to be "data/more-data/" - if len(bucketArr) > 1 { - prefix = strings.TrimPrefix(c.S3BucketURL, bucketArr[0]+"/") + "/" - } - s3, err := storage.NewS3(strings.TrimPrefix(strings.TrimPrefix(c.S3Endpoint, "https://"), "http://"), c.S3AccessKeyID, c.S3AccessKey, bucketArr[0], prefix, c.S3Region, strings.HasPrefix(c.S3Endpoint, "https"), c.VerifyTLS) - if err != nil { - return nil, errors.Wrap(err, "new storage manager") + var s storage.Storage + var err error + switch c.StorageType { + case "s3": + bucketArr := strings.Split(c.BackupStorageS3.BucketURL, "/") + prefix := "" + // if c.S3BucketURL looks like "my-bucket/data/more-data" we need prefix to be "data/more-data/" + if len(bucketArr) > 1 { + prefix = strings.TrimPrefix(c.BackupStorageS3.BucketURL, bucketArr[0]+"/") + "/" + } + s, err = storage.NewS3(c.BackupStorageS3.Endpoint, c.BackupStorageS3.AccessKeyID, c.BackupStorageS3.AccessKey, bucketArr[0], prefix, c.BackupStorageS3.Region, c.VerifyTLS) + if err != nil { + return nil, errors.Wrap(err, "new storage manager") + } + case "azure": + container, prefix, _ := strings.Cut(c.BackupStorageAzure.ContainerName, "/") + s, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, container, prefix+"/") + if err != nil { + return nil, errors.Wrap(err, "new azure storage") + } + default: + return nil, errors.New("unknown STORAGE_TYPE") } return &Collector{ - storage: s3, + storage: s, pxcUser: c.PXCUser, pxcServiceName: c.PXCServiceName, verifyTLS: c.VerifyTLS, }, nil } -func (c *Collector) Run() error { - err := c.newDB() +func (c *Collector) Run(ctx context.Context) error { + err := c.newDB(ctx) if err != nil { return errors.Wrap(err, "new db connection") } @@ -80,7 +109,7 @@ func (c *Collector) Run() error { // read it from aws file c.lastSet = "" - err = c.CollectBinLogs() + err = c.CollectBinLogs(ctx) if err != nil { return errors.Wrap(err, "collect binlog files") } @@ -88,10 +117,13 @@ func (c *Collector) Run() error { return nil } -func (c *Collector) lastGTIDSet(sourceID string) (string, error) { +func (c *Collector) lastGTIDSet(ctx context.Context, sourceID string) (string, error) { // get last binlog set stored on S3 - lastSetObject, err := c.storage.GetObject(lastSetFilePrefix + sourceID) + lastSetObject, err := c.storage.GetObject(ctx, lastSetFilePrefix+sourceID) if err != nil { + if bloberror.HasCode(errors.Cause(err), bloberror.BlobNotFound) { + return "", nil + } return "", errors.Wrap(err, "get last set content") } lastSet, err := ioutil.ReadAll(lastSetObject) @@ -101,7 +133,7 @@ func (c *Collector) lastGTIDSet(sourceID string) (string, error) { return string(lastSet), nil } -func (c *Collector) newDB() error { +func (c *Collector) newDB(ctx context.Context) error { file, err := os.Open("/etc/mysql/mysql-users-secret/xtrabackup") if err != nil { return errors.Wrap(err, "open file") @@ -112,7 +144,7 @@ func (c *Collector) newDB() error { } c.pxcPass = string(pxcPass) - host, err := pxc.GetPXCOldestBinlogHost(c.pxcServiceName, c.pxcUser, c.pxcPass) + host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass) if err != nil { return errors.Wrap(err, "get host") } @@ -131,13 +163,13 @@ func (c *Collector) close() error { return c.db.Close() } -func (c *Collector) CurrentSourceID(logs []pxc.Binlog) (string, error) { +func (c *Collector) CurrentSourceID(ctx context.Context, logs []pxc.Binlog) (string, error) { var ( gtidSet string err error ) for i := len(logs) - 1; i >= 0 && gtidSet == ""; i-- { - gtidSet, err = c.db.GetGTIDSet(logs[i].Name) + gtidSet, err = c.db.GetGTIDSet(ctx, logs[i].Name) if err != nil { return gtidSet, err } @@ -145,10 +177,10 @@ func (c *Collector) CurrentSourceID(logs []pxc.Binlog) (string, error) { return strings.Split(gtidSet, ":")[0], nil } -func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error) { +func (c *Collector) removeEmptyBinlogs(ctx context.Context, logs []pxc.Binlog) ([]pxc.Binlog, error) { result := make([]pxc.Binlog, 0) for _, v := range logs { - set, err := c.db.GetGTIDSet(v.Name) + set, err := c.db.GetGTIDSet(ctx, v.Name) if err != nil { return nil, errors.Wrap(err, "get GTID set") } @@ -162,9 +194,9 @@ func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error) return result, nil } -func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) { +func (c *Collector) filterBinLogs(ctx context.Context, logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) { if lastBinlogName == "" { - return c.removeEmptyBinlogs(logs) + return c.removeEmptyBinlogs(ctx, logs) } logsLen := len(logs) @@ -178,7 +210,7 @@ func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]p return nil, nil } - set, err := c.db.GetGTIDSet(logs[startIndex].Name) + set, err := c.db.GetGTIDSet(ctx, logs[startIndex].Name) if err != nil { return nil, errors.Wrap(err, "get gtid set of last uploaded binlog") } @@ -188,7 +220,7 @@ func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]p startIndex++ } - return c.removeEmptyBinlogs(logs[startIndex:]) + return c.removeEmptyBinlogs(ctx, logs[startIndex:]) } func createGapFile(gtidSet string) error { @@ -206,13 +238,13 @@ func createGapFile(gtidSet string) error { return nil } -func (c *Collector) CollectBinLogs() error { - list, err := c.db.GetBinLogList() +func (c *Collector) CollectBinLogs(ctx context.Context) error { + list, err := c.db.GetBinLogList(ctx) if err != nil { return errors.Wrap(err, "get binlog list") } - sourceID, err := c.CurrentSourceID(list) + sourceID, err := c.CurrentSourceID(ctx, list) if err != nil { return errors.Wrap(err, "get current source id") } @@ -222,7 +254,7 @@ func (c *Collector) CollectBinLogs() error { return nil } - c.lastSet, err = c.lastGTIDSet(sourceID) + c.lastSet, err = c.lastGTIDSet(ctx, sourceID) if err != nil { return errors.Wrap(err, "get last uploaded gtid set") } @@ -231,7 +263,7 @@ func (c *Collector) CollectBinLogs() error { if c.lastSet != "" { // get last uploaded binlog file name - lastUploadedBinlogName, err = c.db.GetBinLogName(c.lastSet) + lastUploadedBinlogName, err = c.db.GetBinLogName(ctx, c.lastSet) if err != nil { return errors.Wrap(err, "get last uploaded binlog name by gtid set") } @@ -245,7 +277,7 @@ func (c *Collector) CollectBinLogs() error { } } - list, err = c.filterBinLogs(list, lastUploadedBinlogName) + list, err = c.filterBinLogs(ctx, list, lastUploadedBinlogName) if err != nil { return errors.Wrap(err, "filter empty binlogs") } @@ -256,7 +288,7 @@ func (c *Collector) CollectBinLogs() error { } for _, binlog := range list { - err = c.manageBinlog(binlog) + err = c.manageBinlog(ctx, binlog) if err != nil { return errors.Wrap(err, "manage binlog") } @@ -275,9 +307,8 @@ func mergeErrors(a, b error) error { return b } -func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) { - - binlogTmstmp, err := c.db.GetBinLogFirstTimestamp(binlog.Name) +func (c *Collector) manageBinlog(ctx context.Context, binlog pxc.Binlog) (err error) { + binlogTmstmp, err := c.db.GetBinLogFirstTimestamp(ctx, binlog.Name) if err != nil { return errors.Wrapf(err, "get first timestamp for %s", binlog.Name) } @@ -302,7 +333,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) { } errBuf := &bytes.Buffer{} - cmd := exec.Command("mysqlbinlog", "-R", "--raw", "-h"+c.db.GetHost(), "-u"+c.pxcUser, binlog.Name) + cmd := exec.CommandContext(ctx, "mysqlbinlog", "-R", "--raw", "-h"+c.db.GetHost(), "-u"+c.pxcUser, binlog.Name) cmd.Env = append(cmd.Env, "MYSQL_PWD="+c.pxcPass) cmd.Dir = os.TempDir() cmd.Stderr = errBuf @@ -337,7 +368,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) { go readBinlog(file, pw, errBuf, binlog.Name) - err = c.storage.PutObject(binlogName, pr, -1) + err = c.storage.PutObject(ctx, binlogName, pr, -1) if err != nil { return errors.Wrapf(err, "put %s object", binlog.Name) } @@ -349,7 +380,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) { return errors.Wrap(err, "wait mysqlbinlog command error:"+errBuf.String()) } - err = c.storage.PutObject(binlogName+gtidPostfix, &setBuffer, int64(setBuffer.Len())) + err = c.storage.PutObject(ctx, binlogName+gtidPostfix, &setBuffer, int64(setBuffer.Len())) if err != nil { return errors.Wrap(err, "put gtid-set object") } @@ -357,7 +388,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) { // nolint:errcheck setBuffer.WriteString(binlog.GTIDSet) - err = c.storage.PutObject(lastSetFilePrefix+strings.Split(binlog.GTIDSet, ":")[0], &setBuffer, int64(setBuffer.Len())) + err = c.storage.PutObject(ctx, lastSetFilePrefix+strings.Split(binlog.GTIDSet, ":")[0], &setBuffer, int64(setBuffer.Len())) if err != nil { return errors.Wrap(err, "put last-set object") } diff --git a/cmd/pitr/main.go b/cmd/pitr/main.go index 482d63ae0b..51f19acedc 100644 --- a/cmd/pitr/main.go +++ b/cmd/pitr/main.go @@ -1,9 +1,13 @@ package main import ( + "context" + "errors" "fmt" "log" "os" + "os/signal" + "syscall" "time" "github.com/percona/percona-xtradb-cluster-operator/cmd/pitr/collector" @@ -17,18 +21,20 @@ func main() { if len(os.Args) > 1 { command = os.Args[1] } + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt) + defer stop() switch command { case "collect": - runCollector() + runCollector(ctx) case "recover": - runRecoverer() + runRecoverer(ctx) default: fmt.Fprintf(os.Stderr, "ERROR: unknown command \"%s\".\nCommands:\n collect - collect binlogs\n recover - recover from binlogs\n", command) os.Exit(1) } } -func runCollector() { +func runCollector(ctx context.Context) { config, err := getCollectorConfig() if err != nil { log.Fatalln("ERROR: get config:", err) @@ -39,26 +45,32 @@ func runCollector() { } log.Println("run binlog collector") for { - err := c.Run() + err := c.Run(ctx) if err != nil { log.Println("ERROR:", err) } - time.Sleep(time.Duration(config.CollectSpanSec) * time.Second) + t := time.NewTimer(time.Duration(config.CollectSpanSec) * time.Second) + select { + case <-ctx.Done(): + log.Fatalln("ERROR:", ctx.Err().Error()) + case <-t.C: + break + } } } -func runRecoverer() { +func runRecoverer(ctx context.Context) { config, err := getRecovererConfig() if err != nil { log.Fatalln("ERROR: get recoverer config:", err) } - c, err := recoverer.New(config) + c, err := recoverer.New(ctx, config) if err != nil { log.Fatalln("ERROR: new recoverer controller:", err) } log.Println("run recover") - err = c.Run() + err = c.Run(ctx) if err != nil { log.Fatalln("ERROR: recover:", err) } @@ -67,6 +79,18 @@ func runRecoverer() { func getCollectorConfig() (collector.Config, error) { cfg := collector.Config{} err := env.Parse(&cfg) + switch cfg.StorageType { + case "s3": + if err := env.Parse(&cfg.BackupStorageS3); err != nil { + return cfg, err + } + case "azure": + if err := env.Parse(&cfg.BackupStorageAzure); err != nil { + return cfg, err + } + default: + return cfg, errors.New("unknown STORAGE_TYPE") + } return cfg, err @@ -77,11 +101,23 @@ func getRecovererConfig() (recoverer.Config, error) { if err := env.Parse(&cfg); err != nil { return cfg, err } - if err := env.Parse(&cfg.BackupStorage); err != nil { - return cfg, err - } - if err := env.Parse(&cfg.BinlogStorage); err != nil { - return cfg, err + switch cfg.StorageType { + case "s3": + if err := env.Parse(&cfg.BackupStorageS3); err != nil { + return cfg, err + } + if err := env.Parse(&cfg.BinlogStorageS3); err != nil { + return cfg, err + } + case "azure": + if err := env.Parse(&cfg.BackupStorageAzure); err != nil { + return cfg, err + } + if err := env.Parse(&cfg.BinlogStorageAzure); err != nil { + return cfg, err + } + default: + return cfg, errors.New("unknown STORAGE_TYPE") } return cfg, nil diff --git a/cmd/pitr/pxc/pxc.go b/cmd/pitr/pxc/pxc.go index c8a32c8aa9..7066dc93c6 100644 --- a/cmd/pitr/pxc/pxc.go +++ b/cmd/pitr/pxc/pxc.go @@ -1,6 +1,7 @@ package pxc import ( + "context" "database/sql" "log" "os/exec" @@ -53,22 +54,22 @@ func (p *PXC) GetHost() string { } // GetGTIDSet return GTID set by binary log file name -func (p *PXC) GetGTIDSet(binlogName string) (string, error) { +func (p *PXC) GetGTIDSet(ctx context.Context, binlogName string) (string, error) { //select name from mysql.func where name='get_gtid_set_by_binlog' var existFunc string - nameRow := p.db.QueryRow("select name from mysql.func where name='get_gtid_set_by_binlog'") + nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_gtid_set_by_binlog'") err := nameRow.Scan(&existFunc) if err != nil && err != sql.ErrNoRows { return "", errors.Wrap(err, "get udf name") } if len(existFunc) == 0 { - _, err = p.db.Exec("CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'") + _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_gtid_set_by_binlog RETURNS STRING SONAME 'binlog_utils_udf.so'") if err != nil { return "", errors.Wrap(err, "create function") } } var binlogSet string - row := p.db.QueryRow("SELECT get_gtid_set_by_binlog(?)", binlogName) + row := p.db.QueryRowContext(ctx, "SELECT get_gtid_set_by_binlog(?)", binlogName) err = row.Scan(&binlogSet) if err != nil && !strings.Contains(err.Error(), "Binary log does not exist") { return "", errors.Wrap(err, "scan set") @@ -85,8 +86,8 @@ type Binlog struct { } // GetBinLogList return binary log files list -func (p *PXC) GetBinLogList() ([]Binlog, error) { - rows, err := p.db.Query("SHOW BINARY LOGS") +func (p *PXC) GetBinLogList(ctx context.Context) ([]Binlog, error) { + rows, err := p.db.QueryContext(ctx, "SHOW BINARY LOGS") if err != nil { return nil, errors.Wrap(err, "show binary logs") } @@ -100,7 +101,7 @@ func (p *PXC) GetBinLogList() ([]Binlog, error) { binlogs = append(binlogs, b) } - _, err = p.db.Exec("FLUSH BINARY LOGS") + _, err = p.db.ExecContext(ctx, "FLUSH BINARY LOGS") if err != nil { return nil, errors.Wrap(err, "flush binary logs") } @@ -109,8 +110,8 @@ func (p *PXC) GetBinLogList() ([]Binlog, error) { } // GetBinLogList return binary log files list -func (p *PXC) GetBinLogNamesList() ([]string, error) { - rows, err := p.db.Query("SHOW BINARY LOGS") +func (p *PXC) GetBinLogNamesList(ctx context.Context) ([]string, error) { + rows, err := p.db.QueryContext(ctx, "SHOW BINARY LOGS") if err != nil { return nil, errors.Wrap(err, "show binary logs") } @@ -129,24 +130,24 @@ func (p *PXC) GetBinLogNamesList() ([]string, error) { } // GetBinLogName returns name of the binary log file by given GTID set -func (p *PXC) GetBinLogName(gtidSet string) (string, error) { +func (p *PXC) GetBinLogName(ctx context.Context, gtidSet string) (string, error) { if len(gtidSet) == 0 { return "", nil } var existFunc string - nameRow := p.db.QueryRow("select name from mysql.func where name='get_binlog_by_gtid_set'") + nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_binlog_by_gtid_set'") err := nameRow.Scan(&existFunc) if err != nil && err != sql.ErrNoRows { return "", errors.Wrap(err, "get udf name") } if len(existFunc) == 0 { - _, err = p.db.Exec("CREATE FUNCTION get_binlog_by_gtid_set RETURNS STRING SONAME 'binlog_utils_udf.so'") + _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_binlog_by_gtid_set RETURNS STRING SONAME 'binlog_utils_udf.so'") if err != nil { return "", errors.Wrap(err, "create function") } } var binlog sql.NullString - row := p.db.QueryRow("SELECT get_binlog_by_gtid_set(?)", gtidSet) + row := p.db.QueryRowContext(ctx, "SELECT get_binlog_by_gtid_set(?)", gtidSet) err = row.Scan(&binlog) if err != nil { @@ -157,21 +158,21 @@ func (p *PXC) GetBinLogName(gtidSet string) (string, error) { } // GetBinLogFirstTimestamp return binary log file first timestamp -func (p *PXC) GetBinLogFirstTimestamp(binlog string) (string, error) { +func (p *PXC) GetBinLogFirstTimestamp(ctx context.Context, binlog string) (string, error) { var existFunc string - nameRow := p.db.QueryRow("select name from mysql.func where name='get_first_record_timestamp_by_binlog'") + nameRow := p.db.QueryRowContext(ctx, "select name from mysql.func where name='get_first_record_timestamp_by_binlog'") err := nameRow.Scan(&existFunc) if err != nil && err != sql.ErrNoRows { return "", errors.Wrap(err, "get udf name") } if len(existFunc) == 0 { - _, err = p.db.Exec("CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") + _, err = p.db.ExecContext(ctx, "CREATE FUNCTION get_first_record_timestamp_by_binlog RETURNS INTEGER SONAME 'binlog_utils_udf.so'") if err != nil { return "", errors.Wrap(err, "create function") } } var timestamp string - row := p.db.QueryRow("SELECT get_first_record_timestamp_by_binlog(?) DIV 1000000", binlog) + row := p.db.QueryRowContext(ctx, "SELECT get_first_record_timestamp_by_binlog(?) DIV 1000000", binlog) err = row.Scan(×tamp) if err != nil { @@ -181,9 +182,9 @@ func (p *PXC) GetBinLogFirstTimestamp(binlog string) (string, error) { return timestamp, nil } -func (p *PXC) SubtractGTIDSet(set, subSet string) (string, error) { +func (p *PXC) SubtractGTIDSet(ctx context.Context, set, subSet string) (string, error) { var result string - row := p.db.QueryRow("SELECT GTID_SUBTRACT(?,?)", set, subSet) + row := p.db.QueryRowContext(ctx, "SELECT GTID_SUBTRACT(?,?)", set, subSet) err := row.Scan(&result) if err != nil { return "", errors.Wrap(err, "scan gtid subtract result") @@ -192,8 +193,8 @@ func (p *PXC) SubtractGTIDSet(set, subSet string) (string, error) { return result, nil } -func getNodesByServiceName(pxcServiceName string) ([]string, error) { - cmd := exec.Command("peer-list", "-on-start=/usr/bin/get-pxc-state", "-service="+pxcServiceName) +func getNodesByServiceName(ctx context.Context, pxcServiceName string) ([]string, error) { + cmd := exec.CommandContext(ctx, "peer-list", "-on-start=/usr/bin/get-pxc-state", "-service="+pxcServiceName) out, err := cmd.CombinedOutput() if err != nil { return nil, errors.Wrap(err, "get peer-list output") @@ -201,8 +202,8 @@ func getNodesByServiceName(pxcServiceName string) ([]string, error) { return strings.Split(string(out), "node:"), nil } -func GetPXCFirstHost(pxcServiceName string) (string, error) { - nodes, err := getNodesByServiceName(pxcServiceName) +func GetPXCFirstHost(ctx context.Context, pxcServiceName string) (string, error) { + nodes, err := getNodesByServiceName(ctx, pxcServiceName) if err != nil { return "", errors.Wrap(err, "get nodes by service name") } @@ -222,8 +223,8 @@ func GetPXCFirstHost(pxcServiceName string) (string, error) { return lastHost, nil } -func GetPXCOldestBinlogHost(pxcServiceName, user, pass string) (string, error) { - nodes, err := getNodesByServiceName(pxcServiceName) +func GetPXCOldestBinlogHost(ctx context.Context, pxcServiceName, user, pass string) (string, error) { + nodes, err := getNodesByServiceName(ctx, pxcServiceName) if err != nil { return "", errors.Wrap(err, "get nodes by service name") } @@ -233,7 +234,7 @@ func GetPXCOldestBinlogHost(pxcServiceName, user, pass string) (string, error) { for _, node := range nodes { if strings.Contains(node, "wsrep_ready:ON:wsrep_connected:ON:wsrep_local_state_comment:Synced:wsrep_cluster_status:Primary") { nodeArr := strings.Split(node, ":") - binlogTime, err := getBinlogTime(nodeArr[0], user, pass) + binlogTime, err := getBinlogTime(ctx, nodeArr[0], user, pass) if err != nil { log.Printf("ERROR: get binlog time %v", err) continue @@ -253,13 +254,13 @@ func GetPXCOldestBinlogHost(pxcServiceName, user, pass string) (string, error) { return oldestHost, nil } -func getBinlogTime(host, user, pass string) (int64, error) { +func getBinlogTime(ctx context.Context, host, user, pass string) (int64, error) { db, err := NewPXC(host, user, pass) if err != nil { return 0, errors.Errorf("creating connection for host %s: %v", host, err) } defer db.Close() - list, err := db.GetBinLogNamesList() + list, err := db.GetBinLogNamesList(ctx) if err != nil { return 0, errors.Errorf("get binlog list for host %s: %v", host, err) } @@ -268,7 +269,7 @@ func getBinlogTime(host, user, pass string) (int64, error) { } var binlogTime int64 for _, binlogName := range list { - binlogTime, err = getBinlogTimeByName(db, binlogName) + binlogTime, err = getBinlogTimeByName(ctx, db, binlogName) if err != nil { log.Printf("ERROR: get binlog timestamp for binlog %s host %s: %v", binlogName, host, err) continue @@ -284,8 +285,8 @@ func getBinlogTime(host, user, pass string) (int64, error) { return binlogTime, nil } -func getBinlogTimeByName(db *PXC, binlogName string) (int64, error) { - ts, err := db.GetBinLogFirstTimestamp(binlogName) +func getBinlogTimeByName(ctx context.Context, db *PXC, binlogName string) (int64, error) { + ts, err := db.GetBinLogFirstTimestamp(ctx, binlogName) if err != nil { return 0, errors.Wrap(err, "get binlog first timestamp") } @@ -297,17 +298,17 @@ func getBinlogTimeByName(db *PXC, binlogName string) (int64, error) { return binlogTime, nil } -func (p *PXC) DropCollectorFunctions() error { - _, err := p.db.Exec("DROP FUNCTION IF EXISTS get_first_record_timestamp_by_binlog") +func (p *PXC) DropCollectorFunctions(ctx context.Context) error { + _, err := p.db.ExecContext(ctx, "DROP FUNCTION IF EXISTS get_first_record_timestamp_by_binlog") if err != nil { return errors.Wrap(err, "drop get_first_record_timestamp_by_binlog function") } - _, err = p.db.Exec("DROP FUNCTION IF EXISTS get_binlog_by_gtid_set") + _, err = p.db.ExecContext(ctx, "DROP FUNCTION IF EXISTS get_binlog_by_gtid_set") if err != nil { return errors.Wrap(err, "drop get_binlog_by_gtid_set function") } - _, err = p.db.Exec("DROP FUNCTION IF EXISTS get_gtid_set_by_binlog") + _, err = p.db.ExecContext(ctx, "DROP FUNCTION IF EXISTS get_gtid_set_by_binlog") if err != nil { return errors.Wrap(err, "drop get_gtid_set_by_binlog function") } diff --git a/cmd/pitr/recoverer/recoverer.go b/cmd/pitr/recoverer/recoverer.go index dcedef662b..b34a0d2178 100644 --- a/cmd/pitr/recoverer/recoverer.go +++ b/cmd/pitr/recoverer/recoverer.go @@ -2,6 +2,7 @@ package recoverer import ( "bytes" + "context" "io" "io/ioutil" "log" @@ -37,15 +38,57 @@ type Recoverer struct { } type Config struct { - PXCServiceName string `env:"PXC_SERVICE,required"` - PXCUser string `env:"PXC_USER,required"` - PXCPass string `env:"PXC_PASS,required"` - BackupStorage BackupS3 - RecoverTime string `env:"PITR_DATE"` - RecoverType string `env:"PITR_RECOVERY_TYPE,required"` - GTID string `env:"PITR_GTID"` - VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"` - BinlogStorage BinlogS3 + PXCServiceName string `env:"PXC_SERVICE,required"` + PXCUser string `env:"PXC_USER,required"` + PXCPass string `env:"PXC_PASS,required"` + BackupStorageS3 BackupS3 + BackupStorageAzure BackupAzure + RecoverTime string `env:"PITR_DATE"` + RecoverType string `env:"PITR_RECOVERY_TYPE,required"` + GTID string `env:"PITR_GTID"` + VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"` + StorageType string `env:"STORAGE_TYPE,required"` + BinlogStorageS3 BinlogS3 + BinlogStorageAzure BinlogAzure +} + +func (c Config) storages() (storage.Storage, storage.Storage, error) { + var binlogStorage, defaultStorage storage.Storage + switch c.StorageType { + case "s3": + bucket, prefix, err := getBucketAndPrefix(c.BinlogStorageS3.BucketURL) + if err != nil { + return nil, nil, errors.Wrap(err, "get bucket and prefix") + } + binlogStorage, err = storage.NewS3(c.BinlogStorageS3.Endpoint, c.BinlogStorageS3.AccessKeyID, c.BinlogStorageS3.AccessKey, bucket, prefix, c.BinlogStorageS3.Region, c.VerifyTLS) + if err != nil { + return nil, nil, errors.Wrap(err, "new s3 storage") + } + + bucket, prefix, err = getBucketAndPrefix(c.BackupStorageS3.BackupDest) + if err != nil { + return nil, nil, errors.Wrap(err, "get bucket and prefix") + } + prefix = prefix[:len(prefix)-1] + defaultStorage, err = storage.NewS3(c.BackupStorageS3.Endpoint, c.BackupStorageS3.AccessKeyID, c.BackupStorageS3.AccessKey, bucket, prefix+".sst_info/", c.BackupStorageS3.Region, c.VerifyTLS) + if err != nil { + return nil, nil, errors.Wrap(err, "new storage manager") + } + case "azure": + var err error + container, prefix := getContainerAndPrefix(c.BinlogStorageAzure.ContainerPath) + binlogStorage, err = storage.NewAzure(c.BinlogStorageAzure.AccountName, c.BinlogStorageAzure.AccountKey, c.BinlogStorageAzure.Endpoint, container, prefix+"/") + if err != nil { + return nil, nil, errors.Wrap(err, "new azure storage") + } + defaultStorage, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, c.BackupStorageAzure.ContainerName, c.BackupStorageAzure.BackupDest+".sst_info/") + if err != nil { + return nil, nil, errors.Wrap(err, "new azure storage") + } + default: + return nil, nil, errors.New("unknown STORAGE_TYPE") + } + return binlogStorage, defaultStorage, nil } type BackupS3 struct { @@ -56,6 +99,15 @@ type BackupS3 struct { BackupDest string `env:"S3_BUCKET_URL,required"` } +type BackupAzure struct { + Endpoint string `env:"AZURE_ENDPOINT,required"` + ContainerName string `env:"AZURE_CONTAINER_NAME,required"` + StorageClass string `env:"AZURE_STORAGE_CLASS"` + AccountName string `env:"AZURE_STORAGE_ACCOUNT,required"` + AccountKey string `env:"AZURE_ACCESS_KEY,required"` + BackupDest string `env:"BACKUP_PATH,required"` +} + type BinlogS3 struct { Endpoint string `env:"BINLOG_S3_ENDPOINT" envDefault:"s3.amazonaws.com"` AccessKeyID string `env:"BINLOG_ACCESS_KEY_ID,required"` @@ -64,30 +116,34 @@ type BinlogS3 struct { BucketURL string `env:"BINLOG_S3_BUCKET_URL,required"` } +type BinlogAzure struct { + Endpoint string `env:"BINLOG_AZURE_ENDPOINT,required"` + ContainerPath string `env:"BINLOG_AZURE_CONTAINER_NAME,required"` + StorageClass string `env:"BINLOG_AZURE_STORAGE_CLASS"` + AccountName string `env:"BINLOG_AZURE_STORAGE_ACCOUNT,required"` + AccountKey string `env:"BINLOG_AZURE_ACCESS_KEY,required"` +} + func (c *Config) Verify() { - if len(c.BackupStorage.Endpoint) == 0 { - c.BackupStorage.Endpoint = "s3.amazonaws.com" + if len(c.BackupStorageS3.Endpoint) == 0 { + c.BackupStorageS3.Endpoint = "s3.amazonaws.com" } - if len(c.BinlogStorage.Endpoint) == 0 { - c.BinlogStorage.Endpoint = "s3.amazonaws.com" + if len(c.BinlogStorageS3.Endpoint) == 0 { + c.BinlogStorageS3.Endpoint = "s3.amazonaws.com" } } type RecoverType string -func New(c Config) (*Recoverer, error) { +func New(ctx context.Context, c Config) (*Recoverer, error) { c.Verify() - bucket, prefix, err := getBucketAndPrefix(c.BinlogStorage.BucketURL) - if err != nil { - return nil, errors.Wrap(err, "get bucket and prefix") - } - s3, err := storage.NewS3(strings.TrimPrefix(strings.TrimPrefix(c.BinlogStorage.Endpoint, "https://"), "http://"), c.BinlogStorage.AccessKeyID, c.BinlogStorage.AccessKey, bucket, prefix, c.BinlogStorage.Region, strings.HasPrefix(c.BinlogStorage.Endpoint, "https"), c.VerifyTLS) + binlogStorage, storage, err := c.storages() if err != nil { - return nil, errors.Wrap(err, "new storage manager") + return nil, errors.Wrap(err, "new binlog storage manager") } - startGTID, err := getStartGTIDSet(c.BackupStorage, c.VerifyTLS) + startGTID, err := getStartGTIDSet(ctx, storage) if err != nil { return nil, errors.Wrap(err, "get start GTID") } @@ -117,7 +173,7 @@ func New(c Config) (*Recoverer, error) { } return &Recoverer{ - storage: s3, + storage: binlogStorage, recoverTime: c.RecoverTime, pxcUser: c.PXCUser, pxcPass: c.PXCPass, @@ -129,6 +185,11 @@ func New(c Config) (*Recoverer, error) { }, nil } +func getContainerAndPrefix(s string) (string, string) { + container, prefix, _ := strings.Cut(s, "/") + return container, prefix +} + func getBucketAndPrefix(bucketURL string) (bucket string, prefix string, err error) { u, err := url.Parse(bucketURL) if err != nil { @@ -155,51 +216,38 @@ func getBucketAndPrefix(bucketURL string) (bucket string, prefix string, err err return bucket, prefix, err } -func getStartGTIDSet(c BackupS3, verifyTLS bool) (string, error) { - bucketArr := strings.Split(c.BackupDest, "/") - if len(bucketArr) < 2 { - return "", errors.New("parsing bucket") - } - - prefix := strings.TrimPrefix(c.BackupDest, bucketArr[0]+"/") - backupPrefix := prefix + "/" - sstPrefix := prefix + ".sst_info/" - - s3, err := storage.NewS3(strings.TrimPrefix(strings.TrimPrefix(c.Endpoint, "https://"), "http://"), c.AccessKeyID, c.AccessKey, bucketArr[0], sstPrefix, c.Region, strings.HasPrefix(c.Endpoint, "https"), verifyTLS) +func getStartGTIDSet(ctx context.Context, s storage.Storage) (string, error) { + sstInfo, err := s.ListObjects(ctx, "sst_info") if err != nil { - return "", errors.Wrap(err, "new storage manager") - } - sstInfo, err := s3.ListObjects("sst_info") - if err != nil { - return "", errors.Wrapf(err, "list %s info fies", prefix) + return "", errors.Wrapf(err, "list objects") } if len(sstInfo) == 0 { return "", errors.New("no info files in sst dir") } sort.Strings(sstInfo) - sstInfoObj, err := s3.GetObject(sstInfo[0]) + sstInfoObj, err := s.GetObject(ctx, sstInfo[0]) if err != nil { - return "", errors.Wrapf(err, "get %s info", prefix) + return "", errors.Wrapf(err, "get object") } + defer sstInfoObj.Close() - s3.SetPrefix(backupPrefix) - - xtrabackupInfo, err := s3.ListObjects("xtrabackup_info") + s.SetPrefix(strings.TrimSuffix(s.GetPrefix(), ".sst_info/") + "/") + xtrabackupInfo, err := s.ListObjects(ctx, "xtrabackup_info") if err != nil { - return "", errors.Wrapf(err, "list %s info fies", prefix) + return "", errors.Wrapf(err, "list objects") } if len(xtrabackupInfo) == 0 { return "", errors.New("no info files in backup") } sort.Strings(xtrabackupInfo) - xtrabackupInfoObj, err := s3.GetObject(xtrabackupInfo[0]) + xtrabackupInfoObj, err := s.GetObject(ctx, xtrabackupInfo[0]) if err != nil { - return "", errors.Wrapf(err, "get %s info", prefix) + return "", errors.Wrapf(err, "get object") } - lastGTID, err := getLastBackupGTID(sstInfoObj, xtrabackupInfoObj) + lastGTID, err := getLastBackupGTID(ctx, sstInfoObj, xtrabackupInfoObj) if err != nil { return "", errors.Wrap(err, "get last backup gtid") } @@ -214,8 +262,8 @@ const ( Skip RecoverType = "skip" // skip transactions ) -func (r *Recoverer) Run() error { - host, err := pxc.GetPXCFirstHost(r.pxcServiceName) +func (r *Recoverer) Run(ctx context.Context) error { + host, err := pxc.GetPXCFirstHost(ctx, r.pxcServiceName) if err != nil { return errors.Wrap(err, "get host") } @@ -224,7 +272,7 @@ func (r *Recoverer) Run() error { return errors.Wrapf(err, "new manager with host %s", host) } - err = r.setBinlogs() + err = r.setBinlogs(ctx) if err != nil { return errors.Wrap(err, "get binlog list") } @@ -248,7 +296,7 @@ func (r *Recoverer) Run() error { return errors.New("wrong recover type") } - err = r.recover() + err = r.recover(ctx) if err != nil { return errors.Wrap(err, "recover") } @@ -256,8 +304,8 @@ func (r *Recoverer) Run() error { return nil } -func (r *Recoverer) recover() (err error) { - err = r.db.DropCollectorFunctions() +func (r *Recoverer) recover(ctx context.Context) (err error) { + err = r.db.DropCollectorFunctions(ctx) if err != nil { return errors.Wrap(err, "drop collector funcs") } @@ -278,7 +326,7 @@ func (r *Recoverer) recover() (err error) { } } - binlogObj, err := r.storage.GetObject(binlog) + binlogObj, err := r.storage.GetObject(ctx, binlog) if err != nil { return errors.Wrap(err, "get obj") } @@ -289,7 +337,7 @@ func (r *Recoverer) recover() (err error) { } cmdString := "mysqlbinlog --disable-log-bin" + r.recoverFlag + " - | mysql -h" + r.db.GetHost() + " -u" + r.pxcUser - cmd := exec.Command("sh", "-c", cmdString) + cmd := exec.CommandContext(ctx, "sh", "-c", cmdString) cmd.Stdin = binlogObj var outb, errb bytes.Buffer @@ -304,13 +352,13 @@ func (r *Recoverer) recover() (err error) { return nil } -func getLastBackupGTID(sstInfo, xtrabackupInfo io.Reader) (string, error) { - sstContent, err := getDecompressedContent(sstInfo, "sst_info") +func getLastBackupGTID(ctx context.Context, sstInfo, xtrabackupInfo io.Reader) (string, error) { + sstContent, err := getDecompressedContent(ctx, sstInfo, "sst_info") if err != nil { return "", errors.Wrap(err, "get sst_info content") } - xtrabackupContent, err := getDecompressedContent(xtrabackupInfo, "xtrabackup_info") + xtrabackupContent, err := getDecompressedContent(ctx, xtrabackupInfo, "xtrabackup_info") if err != nil { return "", errors.Wrap(err, "get xtrabackup info content") } @@ -375,10 +423,10 @@ func getGTIDFromSSTInfo(content []byte) (string, error) { return string(newOut[:e]), nil } -func getDecompressedContent(infoObj io.Reader, filename string) ([]byte, error) { +func getDecompressedContent(ctx context.Context, infoObj io.Reader, filename string) ([]byte, error) { tmpDir := os.TempDir() - cmd := exec.Command("xbstream", "-x", "--decompress") + cmd := exec.CommandContext(ctx, "xbstream", "-x", "--decompress") cmd.Dir = tmpDir cmd.Stdin = infoObj var outb, errb bytes.Buffer @@ -386,7 +434,7 @@ func getDecompressedContent(infoObj io.Reader, filename string) ([]byte, error) cmd.Stderr = &errb err := cmd.Run() if err != nil { - return nil, errors.Wrapf(err, "xbsream cmd run. stderr: %s, stdout: %s", &errb, &outb) + return nil, errors.Wrapf(err, "xbstream cmd run. stderr: %s, stdout: %s", &errb, &outb) } if errb.Len() > 0 { return nil, errors.Errorf("run xbstream error: %s", &errb) @@ -400,8 +448,8 @@ func getDecompressedContent(infoObj io.Reader, filename string) ([]byte, error) return decContent, nil } -func (r *Recoverer) setBinlogs() error { - list, err := r.storage.ListObjects("binlog_") +func (r *Recoverer) setBinlogs(ctx context.Context) error { + list, err := r.storage.ListObjects(ctx, "binlog_") if err != nil { return errors.Wrap(err, "list objects with prefix 'binlog_'") } @@ -413,7 +461,7 @@ func (r *Recoverer) setBinlogs() error { if strings.Contains(binlog, "-gtid-set") { continue } - infoObj, err := r.storage.GetObject(binlog + "-gtid-set") + infoObj, err := r.storage.GetObject(ctx, binlog+"-gtid-set") if err != nil { log.Println("Can't get binlog object with gtid set. Name:", binlog, "error", err) continue @@ -426,7 +474,7 @@ func (r *Recoverer) setBinlogs() error { log.Println("checking current file", " name ", binlog, " gtid ", binlogGTIDSet) if len(r.gtid) > 0 && r.recoverType == Transaction { - subResult, err := r.db.SubtractGTIDSet(binlogGTIDSet, r.gtid) + subResult, err := r.db.SubtractGTIDSet(ctx, binlogGTIDSet, r.gtid) if err != nil { return errors.Wrapf(err, "check if '%s' is a subset of '%s", binlogGTIDSet, r.gtid) } @@ -443,7 +491,7 @@ func (r *Recoverer) setBinlogs() error { } binlogs = append(binlogs, binlog) - subResult, err := r.db.SubtractGTIDSet(r.startGTID, binlogGTIDSet) + subResult, err := r.db.SubtractGTIDSet(ctx, r.startGTID, binlogGTIDSet) log.Println("Checking sub result", " binlog gtid ", binlogGTIDSet, " sub result ", subResult) if err != nil { return errors.Wrapf(err, "check if '%s' is a subset of '%s", r.startGTID, binlogGTIDSet) diff --git a/cmd/pitr/storage/storage.go b/cmd/pitr/storage/storage.go index 7fdf1684a8..b30388c51d 100644 --- a/cmd/pitr/storage/storage.go +++ b/cmd/pitr/storage/storage.go @@ -3,31 +3,37 @@ package storage import ( "context" "crypto/tls" + "fmt" "io" "net/http" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" ) type Storage interface { - GetObject(objectName string) (io.Reader, error) - PutObject(name string, data io.Reader, size int64) error - ListObjects(prefix string) ([]string, error) + GetObject(ctx context.Context, objectName string) (io.ReadCloser, error) + PutObject(ctx context.Context, name string, data io.Reader, size int64) error + ListObjects(ctx context.Context, prefix string) ([]string, error) + SetPrefix(prefix string) + GetPrefix() string } // S3 is a type for working with S3 storages type S3 struct { - minioClient *minio.Client // minio client for work with storage - ctx context.Context // context for client operations - bucketName string // S3 bucket name where binlogs will be stored - prefix string // prefix for S3 requests + client *minio.Client // minio client for work with storage + bucketName string // S3 bucket name where binlogs will be stored + prefix string // prefix for S3 requests } // NewS3 return new Manager, useSSL using ssl for connection with storage -func NewS3(endpoint, accessKeyID, secretAccessKey, bucketName, prefix, region string, useSSL, verifyTLS bool) (*S3, error) { +func NewS3(endpoint, accessKeyID, secretAccessKey, bucketName, prefix, region string, verifyTLS bool) (*S3, error) { + useSSL := strings.Contains(endpoint, "https") + endpoint = strings.TrimPrefix(strings.TrimPrefix(endpoint, "https://"), "http://") transport := http.DefaultTransport transport.(*http.Transport).TLSClientConfig = &tls.Config{ InsecureSkipVerify: !verifyTLS, @@ -43,45 +49,40 @@ func NewS3(endpoint, accessKeyID, secretAccessKey, bucketName, prefix, region st } return &S3{ - minioClient: minioClient, - ctx: context.TODO(), - bucketName: bucketName, - prefix: prefix, + client: minioClient, + bucketName: bucketName, + prefix: prefix, }, nil } -func (s *S3) SetPrefix(prefix string) { - s.prefix = prefix -} - // GetObject return content by given object name -func (s *S3) GetObject(objectName string) (io.Reader, error) { - oldObj, err := s.minioClient.GetObject(s.ctx, s.bucketName, s.prefix+objectName, minio.GetObjectOptions{}) +func (s *S3) GetObject(ctx context.Context, objectName string) (io.ReadCloser, error) { + oldObj, err := s.client.GetObject(ctx, s.bucketName, s.prefix+objectName, minio.GetObjectOptions{}) if err != nil { - return nil, errors.Wrap(err, "get object") + return nil, errors.Wrapf(err, "get object %s", s.prefix+objectName) } return oldObj, nil } // PutObject puts new object to storage with given name and content -func (s *S3) PutObject(name string, data io.Reader, size int64) error { - _, err := s.minioClient.PutObject(s.ctx, s.bucketName, s.prefix+name, data, size, minio.PutObjectOptions{}) +func (s *S3) PutObject(ctx context.Context, name string, data io.Reader, size int64) error { + _, err := s.client.PutObject(ctx, s.bucketName, s.prefix+name, data, size, minio.PutObjectOptions{}) if err != nil { - return errors.Wrap(err, "put object") + return errors.Wrapf(err, "put object %s", s.prefix+name) } return nil } -func (s *S3) ListObjects(prefix string) ([]string, error) { +func (s *S3) ListObjects(ctx context.Context, prefix string) ([]string, error) { opts := minio.ListObjectsOptions{ UseV1: true, Prefix: s.prefix + prefix, } list := []string{} - for object := range s.minioClient.ListObjects(s.ctx, s.bucketName, opts) { + for object := range s.client.ListObjects(ctx, s.bucketName, opts) { if object.Err != nil { return nil, errors.Wrapf(object.Err, "list object %s", object.Key) } @@ -90,3 +91,85 @@ func (s *S3) ListObjects(prefix string) ([]string, error) { return list, nil } + +func (s *S3) SetPrefix(prefix string) { + s.prefix = prefix +} + +func (s *S3) GetPrefix() string { + return s.prefix +} + +// Azure is a type for working with Azure Blob storages +type Azure struct { + client *azblob.Client // azure client for work with storage + container string + prefix string +} + +func NewAzure(storageAccount, accessKey, endpoint, container, prefix string) (*Azure, error) { + credential, err := azblob.NewSharedKeyCredential(storageAccount, accessKey) + if err != nil { + return nil, errors.Wrap(err, "new credentials") + } + if endpoint == "" { + endpoint = fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccount) + } + cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, credential, nil) + if err != nil { + return nil, errors.Wrap(err, "new client") + } + + return &Azure{ + client: cli, + container: container, + prefix: prefix, + }, nil +} + +func (a *Azure) GetObject(ctx context.Context, name string) (io.ReadCloser, error) { + resp, err := a.client.DownloadStream(ctx, a.container, a.prefix+name, &azblob.DownloadStreamOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "download stream: %s", a.prefix+name) + } + return resp.Body, nil +} + +func (a *Azure) PutObject(ctx context.Context, name string, data io.Reader, _ int64) error { + _, err := a.client.UploadStream(ctx, a.container, a.prefix+name, data, nil) + if err != nil { + return errors.Wrapf(err, "upload stream: %s", a.prefix+name) + } + return nil +} + +func (a *Azure) ListObjects(ctx context.Context, prefix string) ([]string, error) { + listPrefix := a.prefix + prefix + pg := a.client.NewListBlobsFlatPager(a.container, &container.ListBlobsFlatOptions{ + Prefix: &listPrefix, + }) + var blobs []string + for pg.More() { + resp, err := pg.NextPage(ctx) + if err != nil { + return nil, errors.Wrapf(err, "next page: %s", prefix) + } + if resp.Segment != nil { + for _, item := range resp.Segment.BlobItems { + if item != nil && item.Name != nil { + name := strings.TrimPrefix(*item.Name, a.prefix) + blobs = append(blobs, name) + } + } + } + } + return blobs, nil +} + +func (a *Azure) SetPrefix(prefix string) { + a.prefix = prefix +} + +func (a *Azure) GetPrefix() string { + return a.prefix +} diff --git a/e2e-tests/pitr/conf/pitr.yml b/e2e-tests/pitr/conf/pitr.yml index cd39c7f97b..066676940d 100755 --- a/e2e-tests/pitr/conf/pitr.yml +++ b/e2e-tests/pitr/conf/pitr.yml @@ -67,7 +67,7 @@ spec: type: filesystem volume: persistentVolumeClaim: - accessModes: [ "ReadWriteOnce" ] + accessModes: ["ReadWriteOnce"] resources: requests: storage: 1Gi @@ -105,3 +105,13 @@ spec: region: us-east-1 bucket: operator-testing/binlog-cluster1 endpointUrl: https://storage.googleapis.com + azure-blob: + type: azure + azure: + credentialsSecret: azure-secret + container: operator-testing + azure-blob-binlogs: + type: azure + azure: + credentialsSecret: azure-secret + container: operator-testing/binlogs diff --git a/e2e-tests/users/run b/e2e-tests/users/run index 9b332990f8..073c999715 100755 --- a/e2e-tests/users/run +++ b/e2e-tests/users/run @@ -19,7 +19,7 @@ spinup_pxc "$cluster" "$conf_dir/$cluster.yml" desc 'test root' patch_secret "my-cluster-secrets" "root" "$newpassencrypted" sleep 15 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uroot -p$newpass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uroot -p'$newpass'" desc 'test proxyadmin' kubectl_bin patch pxc some-name --type=merge -p="{\"spec\":{\"proxysql\":{\"size\":3}}}" @@ -28,34 +28,34 @@ wait_cluster_consistency "$cluster" 3 3 patch_secret "my-cluster-secrets" "proxyadmin" "$newpassencrypted" sleep 15 wait_cluster_consistency "$cluster" 3 3 -compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p$newpass" "$cluster-proxysql-0" "" 'proxysql' -compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p$newpass" "$cluster-proxysql-1" "" 'proxysql' -compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p$newpass" "$cluster-proxysql-2" "" 'proxysql' +compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p'$newpass'" "$cluster-proxysql-0" "" 'proxysql' +compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p'$newpass'" "$cluster-proxysql-1" "" 'proxysql' +compare_mysql_cmd_local "select-2" "SHOW TABLES;" "-h127.0.0.1 -P6032 -uproxyadmin -p'$newpass'" "$cluster-proxysql-2" "" 'proxysql' desc 'test xtrabackup' kubectl_bin patch pxc some-name --type=merge -p="{\"spec\":{\"proxysql\":{\"size\":1}}}" patch_secret "my-cluster-secrets" "xtrabackup" "$newpassencrypted" sleep 30 wait_cluster_consistency "$cluster" 3 2 -compare_mysql_cmd_local "select-3" "SHOW DATABASES;" "-h 127.0.0.1 -uxtrabackup -p$newpass" "$cluster-pxc-0" "" 'pxc' +compare_mysql_cmd_local "select-3" "SHOW DATABASES;" "-h 127.0.0.1 -uxtrabackup -p'$newpass'" "$cluster-pxc-0" "" 'pxc' desc 'test clustercheck' patch_secret "my-cluster-secrets" "clustercheck" "$newpassencrypted" sleep 30 wait_cluster_consistency "$cluster" 3 2 -compare_mysql_cmd_local "select-5" "SHOW DATABASES;" "-h 127.0.0.1 -uclustercheck -p$newpass" "$cluster-pxc-0" "" 'pxc' +compare_mysql_cmd_local "select-5" "SHOW DATABASES;" "-h 127.0.0.1 -uclustercheck -p'$newpass'" "$cluster-pxc-0" "" 'pxc' desc 'test monitor' patch_secret "my-cluster-secrets" "monitor" "$newpassencrypted" sleep 30 wait_cluster_consistency "$cluster" 3 2 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -umonitor -p$newpass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -umonitor -p'$newpass'" desc 'test operator' patch_secret "my-cluster-secrets" "operator" "$newpassencrypted" sleep 30 wait_cluster_consistency "$cluster" 3 2 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p$newpass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p'$newpass'" desc 'change secret name' kubectl_bin patch pxc $cluster --type merge --patch '{"spec": {"secretsName":"my-cluster-secrets-2"}}' @@ -68,22 +68,22 @@ desc 'test new operator' patch_secret "my-cluster-secrets-2" "operator" "$newpassencrypted" sleep 30 wait_cluster_consistency "$cluster" 3 2 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p$newpass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p'$newpass'" newpass=$(getSecretData "my-cluster-secrets-2" "root") desc 'test new users sync' run_mysql \ "CREATE USER 'testsync'@'%' IDENTIFIED BY '$newpass';" \ - "-h $cluster-pxc -uroot -p$newpass" + "-h $cluster-pxc -uroot -p'$newpass'" sleep 30 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -utestsync -p$newpass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -utestsync -p'$newpass'" pass=$(getSecretData "internal-some-name" "operator") desc 'check secret without operator' kubectl_bin apply \ -f "$test_dir/conf/secrets.yml" sleep 15 -compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p$pass" +compare_mysql_cmd "select-4" "SHOW TABLES;" "-h $cluster-proxysql -uoperator -p'$pass'" newpass="test-password2" newpassencrypted=$(echo -n "$newpass" | base64) @@ -93,6 +93,6 @@ wait_cluster_consistency "$cluster" 3 3 patch_secret "my-cluster-secrets" "monitor" "$newpassencrypted" sleep 15 wait_cluster_consistency "$cluster" 3 3 -compare_mysql_cmd "select-3" "SHOW DATABASES;" "-h $cluster-haproxy -umonitor -p$newpass" +compare_mysql_cmd "select-3" "SHOW DATABASES;" "-h $cluster-haproxy -umonitor -p'$newpass'" destroy "${namespace}" diff --git a/pkg/apis/pxc/v1/pxc_prestore_types.go b/pkg/apis/pxc/v1/pxc_prestore_types.go index 2f825dd63f..1e80e09967 100644 --- a/pkg/apis/pxc/v1/pxc_prestore_types.go +++ b/pkg/apis/pxc/v1/pxc_prestore_types.go @@ -75,8 +75,8 @@ func (cr *PerconaXtraDBClusterRestore) CheckNsetDefaults() error { if cr.Spec.PXCCluster == "" { return errors.New("pxcCluster can't be empty") } - if cr.Spec.PITR != nil && cr.Spec.PITR.BackupSource != nil && cr.Spec.PITR.BackupSource.StorageName == "" && cr.Spec.PITR.BackupSource.S3 == nil { - return errors.New("PITR.BackupSource.StorageName and PITR.BackupSource.S3 can't be empty simultaneously") + if cr.Spec.PITR != nil && cr.Spec.PITR.BackupSource != nil && cr.Spec.PITR.BackupSource.StorageName == "" && cr.Spec.PITR.BackupSource.S3 == nil && cr.Spec.PITR.BackupSource.Azure == nil { + return errors.New("PITR.BackupSource.StorageName, PITR.BackupSource.S3 and PITR.BackupSource.Azure can't be empty simultaneously") } if cr.Spec.BackupName == "" && cr.Spec.BackupSource == nil { return errors.New("backupName and BackupSource can't be empty simultaneously") diff --git a/pkg/controller/pxc/secrets.go b/pkg/controller/pxc/secrets.go index a2337c13ee..99aec4cd57 100644 --- a/pkg/controller/pxc/secrets.go +++ b/pkg/controller/pxc/secrets.go @@ -6,14 +6,17 @@ import ( "fmt" "math/big" mrand "math/rand" + "strings" "time" - api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + + api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" + "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/users" ) const internalSecretsPrefix = "internal-" @@ -30,6 +33,9 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileUsersSecret(cr *api.PerconaXtra secretObj, ) if err == nil { + if err := validatePasswords(secretObj); err != nil { + return errors.Wrap(err, "validate passwords") + } isChanged, err := setUserSecretDefaults(secretObj) if err != nil { return errors.Wrap(err, "set user secret defaults") @@ -90,10 +96,11 @@ const ( passwordMinLen = 16 passSymbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + - "0123456789" + "0123456789" + + "!#$%&()*+,-.<=>?@[]^_{}~" ) -// generatePass generate random password +// generatePass generates a random password func generatePass() ([]byte, error) { mrand.Seed(time.Now().UnixNano()) ln := mrand.Intn(passwordMaxLen-passwordMinLen) + passwordMinLen @@ -108,3 +115,18 @@ func generatePass() ([]byte, error) { return b, nil } + +func validatePasswords(secret *corev1.Secret) error { + for user, pass := range secret.Data { + switch user { + case users.ProxyAdmin: + if strings.ContainsAny(string(pass), ";:") { + return errors.New("invalid proxyadmin password, don't use ';' or ':'") + } + default: + continue + } + } + + return nil +} diff --git a/pkg/controller/pxcrestore/restore.go b/pkg/controller/pxcrestore/restore.go index 1e27a9280d..0a02ef284c 100644 --- a/pkg/controller/pxcrestore/restore.go +++ b/pkg/controller/pxcrestore/restore.go @@ -27,14 +27,21 @@ func (r *ReconcilePerconaXtraDBClusterRestore) restore(cr *api.PerconaXtraDBClus case strings.HasPrefix(bcp.Status.Destination, "s3://"): return errors.Wrap(r.restoreS3(cr, bcp, strings.TrimPrefix(destination, "s3://"), cluster, false), "s3") case bcp.Status.Azure != nil: - return errors.Wrap(r.restoreAzure(cr, bcp, bcp.Status.Destination, cluster.Spec), "azure") + return errors.Wrap(r.restoreAzure(cr, bcp, bcp.Status.Destination, cluster.Spec, false), "azure") default: return errors.Errorf("unknown backup storage type") } } func (r *ReconcilePerconaXtraDBClusterRestore) pitr(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster *api.PerconaXtraDBCluster) error { - return errors.Wrap(r.restoreS3(cr, bcp, bcp.Status.Destination[5:], cluster, true), "PITR restore") + dest := bcp.Status.Destination + switch { + case strings.HasPrefix(dest, "s3://"): + return errors.Wrap(r.restoreS3(cr, bcp, strings.TrimPrefix(dest, "s3://"), cluster, true), "PITR restore s3") + case bcp.Status.Azure != nil: + return errors.Wrap(r.restoreAzure(cr, bcp, bcp.Status.Destination, cluster.Spec, true), "PITR restore azure") + } + return errors.Errorf("unknown storage type") } func (r *ReconcilePerconaXtraDBClusterRestore) restorePVC(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, pvcName string, cluster api.PerconaXtraDBClusterSpec) error { @@ -83,8 +90,8 @@ func (r *ReconcilePerconaXtraDBClusterRestore) restorePVC(cr *api.PerconaXtraDBC return r.createJob(job) } -func (r *ReconcilePerconaXtraDBClusterRestore) restoreAzure(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, dest string, cluster api.PerconaXtraDBClusterSpec) error { - job, err := backup.AzureRestoreJob(cr, bcp, cluster, dest) +func (r *ReconcilePerconaXtraDBClusterRestore) restoreAzure(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, dest string, cluster api.PerconaXtraDBClusterSpec, pitr bool) error { + job, err := backup.AzureRestoreJob(cr, bcp, cluster, dest, pitr) if err != nil { return err } diff --git a/pkg/pxc/app/deployment/binlog-collector.go b/pkg/pxc/app/deployment/binlog-collector.go index de58ffa63a..1d68535433 100644 --- a/pkg/pxc/app/deployment/binlog-collector.go +++ b/pkg/pxc/app/deployment/binlog-collector.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -17,11 +18,9 @@ import ( "github.com/percona/percona-xtradb-cluster-operator/clientcmd" api "github.com/percona/percona-xtradb-cluster-operator/pkg/apis/pxc/v1" "github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/app" - "github.com/pkg/errors" ) func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster) (appsv1.Deployment, error) { - storage := cr.Spec.Backup.Storages[cr.Spec.Backup.PITR.StorageName] binlogCollectorName := GetBinlogCollectorDeploymentName(cr) pxcUser := "xtrabackup" sleepTime := fmt.Sprintf("%.2f", cr.Spec.Backup.PITR.TimeBetweenUploads) @@ -41,26 +40,11 @@ func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster) (appsv1.Deployme for key, value := range cr.Spec.Backup.Storages[cr.Spec.Backup.PITR.StorageName].Labels { labels[key] = value } - if storage.S3 == nil { - return appsv1.Deployment{}, errors.New("s3 storage is not specified") + envs, err := getStorageEnvs(cr) + if err != nil { + return appsv1.Deployment{}, errors.Wrap(err, "get storage envs") } - envs := []corev1.EnvVar{ - { - Name: "SECRET_ACCESS_KEY", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: app.SecretKeySelector(storage.S3.CredentialsSecret, "AWS_SECRET_ACCESS_KEY"), - }, - }, - { - Name: "ACCESS_KEY_ID", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: app.SecretKeySelector(storage.S3.CredentialsSecret, "AWS_ACCESS_KEY_ID"), - }, - }, - { - Name: "S3_BUCKET_URL", - Value: storage.S3.Bucket, - }, + envs = append(envs, []corev1.EnvVar{ { Name: "PXC_SERVICE", Value: cr.Name + "-pxc", @@ -75,10 +59,6 @@ func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster) (appsv1.Deployme SecretKeyRef: app.SecretKeySelector(cr.Spec.SecretsName, pxcUser), }, }, - { - Name: "DEFAULT_REGION", - Value: cr.Spec.Backup.Storages[cr.Spec.Backup.PITR.StorageName].S3.Region, - }, { Name: "COLLECT_SPAN_SEC", Value: sleepTime, @@ -87,13 +67,7 @@ func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster) (appsv1.Deployme Name: "BUFFER_SIZE", Value: strconv.FormatInt(bufferSize, 10), }, - } - if len(storage.S3.EndpointURL) > 0 { - envs = append(envs, corev1.EnvVar{ - Name: "ENDPOINT", - Value: storage.S3.EndpointURL, - }) - } + }...) container := corev1.Container{ Name: "pitr", Image: cr.Spec.Backup.Image, @@ -152,6 +126,85 @@ func GetBinlogCollectorDeployment(cr *api.PerconaXtraDBCluster) (appsv1.Deployme }, nil } +func getStorageEnvs(cr *api.PerconaXtraDBCluster) ([]corev1.EnvVar, error) { + storage := cr.Spec.Backup.Storages[cr.Spec.Backup.PITR.StorageName] + switch storage.Type { + case api.BackupStorageS3: + if storage.S3 == nil { + return nil, errors.New("s3 storage is not specified") + } + envs := []corev1.EnvVar{ + { + Name: "SECRET_ACCESS_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storage.S3.CredentialsSecret, "AWS_SECRET_ACCESS_KEY"), + }, + }, + { + Name: "ACCESS_KEY_ID", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storage.S3.CredentialsSecret, "AWS_ACCESS_KEY_ID"), + }, + }, + { + Name: "S3_BUCKET_URL", + Value: storage.S3.Bucket, + }, + { + Name: "DEFAULT_REGION", + Value: storage.S3.Region, + }, + { + Name: "STORAGE_TYPE", + Value: "s3", + }, + } + if len(storage.S3.EndpointURL) > 0 { + envs = append(envs, corev1.EnvVar{ + Name: "ENDPOINT", + Value: storage.S3.EndpointURL, + }) + } + return envs, nil + case api.BackupStorageAzure: + if storage.Azure == nil { + return nil, errors.New("azure storage is not specified") + } + return []corev1.EnvVar{ + { + Name: "AZURE_STORAGE_ACCOUNT", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storage.Azure.CredentialsSecret, "AZURE_STORAGE_ACCOUNT_NAME"), + }, + }, + { + Name: "AZURE_ACCESS_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storage.Azure.CredentialsSecret, "AZURE_STORAGE_ACCOUNT_KEY"), + }, + }, + { + Name: "AZURE_STORAGE_CLASS", + Value: storage.Azure.StorageClass, + }, + { + Name: "AZURE_CONTAINER_NAME", + Value: storage.Azure.ContainerName, + }, + { + Name: "AZURE_ENDPOINT", + Value: storage.Azure.Endpoint, + }, + { + Name: "STORAGE_TYPE", + Value: "azure", + }, + }, nil + default: + return nil, errors.Errorf("%s storage has unsupported type %s", cr.Spec.Backup.PITR.StorageName, storage.Type) + } +} + func GetBinlogCollectorDeploymentName(cr *api.PerconaXtraDBCluster) string { return cr.Name + "-pitr" } diff --git a/pkg/pxc/backup/restore.go b/pkg/pxc/backup/restore.go index a3480fcab3..6b2a62c039 100644 --- a/pkg/pxc/backup/restore.go +++ b/pkg/pxc/backup/restore.go @@ -233,7 +233,7 @@ func PVCRestoreJob(cr *api.PerconaXtraDBClusterRestore, cluster api.PerconaXtraD return job, nil } -func AzureRestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster api.PerconaXtraDBClusterSpec, destination string) (*batchv1.Job, error) { +func AzureRestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClusterBackup, cluster api.PerconaXtraDBClusterSpec, destination string, pitr bool) (*batchv1.Job, error) { if bcp.Status.Azure == nil { return nil, errors.New("nil azure storage backup status") } @@ -325,6 +325,73 @@ func AzureRestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDB }, } + if pitr { + if cluster.Backup == nil && len(cluster.Backup.Storages) == 0 { + return nil, errors.New("no storage section") + } + storageAzure := new(api.BackupStorageAzureSpec) + + if len(cr.Spec.PITR.BackupSource.StorageName) > 0 { + storage, ok := cluster.Backup.Storages[cr.Spec.PITR.BackupSource.StorageName] + if ok { + storageAzure = storage.Azure + } + } + if cr.Spec.PITR.BackupSource != nil && cr.Spec.PITR.BackupSource.Azure != nil { + storageAzure = cr.Spec.PITR.BackupSource.Azure + } + + if len(storageAzure.ContainerName) == 0 { + return nil, errors.New("container name is not specified in storage") + } + + command = []string{"pitr", "recover"} + envs = append(envs, []corev1.EnvVar{ + { + Name: "BINLOG_AZURE_STORAGE_ACCOUNT", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storageAzure.CredentialsSecret, "AZURE_STORAGE_ACCOUNT_NAME"), + }, + }, + { + Name: "BINLOG_AZURE_ACCESS_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: app.SecretKeySelector(storageAzure.CredentialsSecret, "AZURE_STORAGE_ACCOUNT_KEY"), + }, + }, + { + Name: "BINLOG_AZURE_STORAGE_CLASS", + Value: storageAzure.StorageClass, + }, + { + Name: "BINLOG_AZURE_CONTAINER_NAME", + Value: storageAzure.ContainerName, + }, + { + Name: "BINLOG_AZURE_ENDPOINT", + Value: storageAzure.Endpoint, + }, + { + Name: "PITR_RECOVERY_TYPE", + Value: cr.Spec.PITR.Type, + }, + { + Name: "PITR_GTID", + Value: cr.Spec.PITR.GTID, + }, + { + Name: "PITR_DATE", + Value: cr.Spec.PITR.Date, + }, + { + Name: "STORAGE_TYPE", + Value: "azure", + }, + }...) + jobName = "pitr-job-" + cr.Name + "-" + cr.Spec.PXCCluster + volumeMounts = []corev1.VolumeMount{} + jobPVCs = []corev1.Volume{} + } job := &batchv1.Job{ TypeMeta: metav1.TypeMeta{ APIVersion: "batch/v1", @@ -558,6 +625,10 @@ func S3RestoreJob(cr *api.PerconaXtraDBClusterRestore, bcp *api.PerconaXtraDBClu Name: "PITR_DATE", Value: cr.Spec.PITR.Date, }) + envs = append(envs, corev1.EnvVar{ + Name: "STORAGE_TYPE", + Value: "s3", + }) jobName = "pitr-job-" + cr.Name + "-" + cr.Spec.PXCCluster volumeMounts = []corev1.VolumeMount{} jobPVCs = []corev1.Volume{}