Skip to content

Commit

Permalink
Merge branch 'main' into dev/K8SPXC-1067
Browse files Browse the repository at this point in the history
  • Loading branch information
pooknull authored Nov 22, 2022
2 parents 36250f9 + 415adbe commit 49ae069
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 254 deletions.
27 changes: 17 additions & 10 deletions build/pxc-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ function join {
echo "${joined%?}"
}

escape_special() {
echo "$1" \
| sed 's/\\/\\\\/g' \
| sed 's/'\''/'\\\\\''/g' \
| sed 's/"/\\\"/g'
}

MYSQL_VERSION=$(mysqld -V | awk '{print $3}' | awk -F'.' '{print $1"."$2}')
MYSQL_PATCH_VERSION=$(mysqld -V | awk '{print $3}' | awk -F'.' '{print $3}' | awk -F'-' '{print $1}')

Expand Down Expand Up @@ -256,10 +263,10 @@ else

fi

WSREP_CLUSTER_NAME=$(grep wsrep_cluster_name ${CFG}| cut -d '=' -f 2| tr -d ' ' )
WSREP_CLUSTER_NAME=$(grep wsrep_cluster_name ${CFG} | cut -d '=' -f 2 | tr -d ' ')
if [[ -z ${WSREP_CLUSTER_NAME} || ${WSREP_CLUSTER_NAME} == 'noname' ]]; then
echo "Cluster name is invalid, please check DNS"
exit 1
echo "Cluster name is invalid, please check DNS"
exit 1
fi

# if we have CLUSTER_JOIN - then we do not need to perform datadir initialize
Expand Down Expand Up @@ -341,7 +348,7 @@ if [ -z "$CLUSTER_JOIN" ] && [ "$1" = 'mysqld' -a -z "$wantHelp" ]; then
# no, we don't care if read finds a terminating character in this heredoc
# https://unix.stackexchange.com/questions/265149/why-is-set-o-errexit-breaking-this-read-heredoc-expression/265151#265151
read -r -d '' rootCreate <<-EOSQL || true
CREATE USER 'root'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ;
CREATE USER 'root'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '$(escape_special "${MYSQL_ROOT_PASSWORD}")' ;
GRANT ALL ON *.* TO 'root'@'${MYSQL_ROOT_HOST}' WITH GRANT OPTION ;
EOSQL
fi
Expand Down Expand Up @@ -370,28 +377,28 @@ if [ -z "$CLUSTER_JOIN" ] && [ "$1" = 'mysqld' -a -z "$wantHelp" ]; then
SET @@SESSION.SQL_LOG_BIN=0;
DELETE FROM mysql.user WHERE user NOT IN ('mysql.sys', 'mysqlxsys', 'root', 'mysql.infoschema', 'mysql.pxc.internal.session', 'mysql.pxc.sst.role', 'mysql.session') OR host NOT IN ('localhost') ;
ALTER USER 'root'@'localhost' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ;
ALTER USER 'root'@'localhost' IDENTIFIED BY '$(escape_special "${MYSQL_ROOT_PASSWORD}")' ;
GRANT ALL ON *.* TO 'root'@'localhost' WITH GRANT OPTION ;
${rootCreate}
/*!80016 REVOKE SYSTEM_USER ON *.* FROM root */;
CREATE USER 'operator'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '${OPERATOR_ADMIN_PASSWORD}' ;
CREATE USER 'operator'@'${MYSQL_ROOT_HOST}' IDENTIFIED BY '$(escape_special "${OPERATOR_ADMIN_PASSWORD}")' ;
GRANT ALL ON *.* TO 'operator'@'${MYSQL_ROOT_HOST}' WITH GRANT OPTION ;
CREATE USER 'xtrabackup'@'%' IDENTIFIED BY '${XTRABACKUP_PASSWORD}';
CREATE USER 'xtrabackup'@'%' IDENTIFIED BY '$(escape_special "${XTRABACKUP_PASSWORD}")';
GRANT ALL ON *.* TO 'xtrabackup'@'%';
CREATE USER 'monitor'@'${MONITOR_HOST}' IDENTIFIED BY '${MONITOR_PASSWORD}' WITH MAX_USER_CONNECTIONS 100;
CREATE USER 'monitor'@'${MONITOR_HOST}' IDENTIFIED BY '$(escape_special "${MONITOR_PASSWORD}")' WITH MAX_USER_CONNECTIONS 100;
GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'monitor'@'${MONITOR_HOST}';
GRANT SELECT ON performance_schema.* TO 'monitor'@'${MONITOR_HOST}';
${monitorConnectGrant}
CREATE USER 'clustercheck'@'localhost' IDENTIFIED BY '${CLUSTERCHECK_PASSWORD}';
CREATE USER 'clustercheck'@'localhost' IDENTIFIED BY '$(escape_special "${CLUSTERCHECK_PASSWORD}")';
GRANT PROCESS ON *.* TO 'clustercheck'@'localhost';
${systemUserGrant}
CREATE USER 'replication'@'%' IDENTIFIED BY '${REPLICATION_PASSWORD}';
CREATE USER 'replication'@'%' IDENTIFIED BY '$(escape_special "${REPLICATION_PASSWORD}")';
GRANT REPLICATION SLAVE ON *.* to 'replication'@'%';
DROP DATABASE IF EXISTS test;
FLUSH PRIVILEGES ;
Expand Down
131 changes: 81 additions & 50 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"bytes"
"context"
"crypto/md5"
"fmt"
"io"
Expand All @@ -13,6 +14,7 @@ import (
"syscall"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/minio/minio-go/v7"
"github.com/pkg/errors"

Expand All @@ -31,17 +33,31 @@ type Collector struct {
}

type Config struct {
PXCServiceName string `env:"PXC_SERVICE,required"`
PXCUser string `env:"PXC_USER,required"`
PXCPass string `env:"PXC_PASS,required"`
S3Endpoint string `env:"ENDPOINT" envDefault:"s3.amazonaws.com"`
S3AccessKeyID string `env:"ACCESS_KEY_ID,required"`
S3AccessKey string `env:"SECRET_ACCESS_KEY,required"`
S3BucketURL string `env:"S3_BUCKET_URL,required"`
S3Region string `env:"DEFAULT_REGION,required"`
BufferSize int64 `env:"BUFFER_SIZE"`
CollectSpanSec float64 `env:"COLLECT_SPAN_SEC" envDefault:"60"`
VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"`
PXCServiceName string `env:"PXC_SERVICE,required"`
PXCUser string `env:"PXC_USER,required"`
PXCPass string `env:"PXC_PASS,required"`
StorageType string `env:"STORAGE_TYPE,required"`
BackupStorageS3 BackupS3
BackupStorageAzure BackupAzure
BufferSize int64 `env:"BUFFER_SIZE"`
CollectSpanSec float64 `env:"COLLECT_SPAN_SEC" envDefault:"60"`
VerifyTLS bool `env:"VERIFY_TLS" envDefault:"true"`
}

type BackupS3 struct {
Endpoint string `env:"ENDPOINT" envDefault:"s3.amazonaws.com"`
AccessKeyID string `env:"ACCESS_KEY_ID,required"`
AccessKey string `env:"SECRET_ACCESS_KEY,required"`
BucketURL string `env:"S3_BUCKET_URL,required"`
Region string `env:"DEFAULT_REGION,required"`
}

type BackupAzure struct {
Endpoint string `env:"AZURE_ENDPOINT,required"`
ContainerName string `env:"AZURE_CONTAINER_NAME,required"`
StorageClass string `env:"AZURE_STORAGE_CLASS"`
AccountName string `env:"AZURE_STORAGE_ACCOUNT,required"`
AccountKey string `env:"AZURE_ACCESS_KEY,required"`
}

const (
Expand All @@ -50,27 +66,40 @@ const (
)

func New(c Config) (*Collector, error) {
bucketArr := strings.Split(c.S3BucketURL, "/")
prefix := ""
// if c.S3BucketURL looks like "my-bucket/data/more-data" we need prefix to be "data/more-data/"
if len(bucketArr) > 1 {
prefix = strings.TrimPrefix(c.S3BucketURL, bucketArr[0]+"/") + "/"
}
s3, err := storage.NewS3(strings.TrimPrefix(strings.TrimPrefix(c.S3Endpoint, "https://"), "http://"), c.S3AccessKeyID, c.S3AccessKey, bucketArr[0], prefix, c.S3Region, strings.HasPrefix(c.S3Endpoint, "https"), c.VerifyTLS)
if err != nil {
return nil, errors.Wrap(err, "new storage manager")
var s storage.Storage
var err error
switch c.StorageType {
case "s3":
bucketArr := strings.Split(c.BackupStorageS3.BucketURL, "/")
prefix := ""
// if c.S3BucketURL looks like "my-bucket/data/more-data" we need prefix to be "data/more-data/"
if len(bucketArr) > 1 {
prefix = strings.TrimPrefix(c.BackupStorageS3.BucketURL, bucketArr[0]+"/") + "/"
}
s, err = storage.NewS3(c.BackupStorageS3.Endpoint, c.BackupStorageS3.AccessKeyID, c.BackupStorageS3.AccessKey, bucketArr[0], prefix, c.BackupStorageS3.Region, c.VerifyTLS)
if err != nil {
return nil, errors.Wrap(err, "new storage manager")
}
case "azure":
container, prefix, _ := strings.Cut(c.BackupStorageAzure.ContainerName, "/")
s, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, container, prefix+"/")
if err != nil {
return nil, errors.Wrap(err, "new azure storage")
}
default:
return nil, errors.New("unknown STORAGE_TYPE")
}

return &Collector{
storage: s3,
storage: s,
pxcUser: c.PXCUser,
pxcServiceName: c.PXCServiceName,
verifyTLS: c.VerifyTLS,
}, nil
}

func (c *Collector) Run() error {
err := c.newDB()
func (c *Collector) Run(ctx context.Context) error {
err := c.newDB(ctx)
if err != nil {
return errors.Wrap(err, "new db connection")
}
Expand All @@ -80,18 +109,21 @@ func (c *Collector) Run() error {
// read it from aws file
c.lastSet = ""

err = c.CollectBinLogs()
err = c.CollectBinLogs(ctx)
if err != nil {
return errors.Wrap(err, "collect binlog files")
}

return nil
}

func (c *Collector) lastGTIDSet(sourceID string) (string, error) {
func (c *Collector) lastGTIDSet(ctx context.Context, sourceID string) (string, error) {
// get last binlog set stored on S3
lastSetObject, err := c.storage.GetObject(lastSetFilePrefix + sourceID)
lastSetObject, err := c.storage.GetObject(ctx, lastSetFilePrefix+sourceID)
if err != nil {
if bloberror.HasCode(errors.Cause(err), bloberror.BlobNotFound) {
return "", nil
}
return "", errors.Wrap(err, "get last set content")
}
lastSet, err := ioutil.ReadAll(lastSetObject)
Expand All @@ -101,7 +133,7 @@ func (c *Collector) lastGTIDSet(sourceID string) (string, error) {
return string(lastSet), nil
}

func (c *Collector) newDB() error {
func (c *Collector) newDB(ctx context.Context) error {
file, err := os.Open("/etc/mysql/mysql-users-secret/xtrabackup")
if err != nil {
return errors.Wrap(err, "open file")
Expand All @@ -112,7 +144,7 @@ func (c *Collector) newDB() error {
}
c.pxcPass = string(pxcPass)

host, err := pxc.GetPXCOldestBinlogHost(c.pxcServiceName, c.pxcUser, c.pxcPass)
host, err := pxc.GetPXCOldestBinlogHost(ctx, c.pxcServiceName, c.pxcUser, c.pxcPass)
if err != nil {
return errors.Wrap(err, "get host")
}
Expand All @@ -131,24 +163,24 @@ func (c *Collector) close() error {
return c.db.Close()
}

func (c *Collector) CurrentSourceID(logs []pxc.Binlog) (string, error) {
func (c *Collector) CurrentSourceID(ctx context.Context, logs []pxc.Binlog) (string, error) {
var (
gtidSet string
err error
)
for i := len(logs) - 1; i >= 0 && gtidSet == ""; i-- {
gtidSet, err = c.db.GetGTIDSet(logs[i].Name)
gtidSet, err = c.db.GetGTIDSet(ctx, logs[i].Name)
if err != nil {
return gtidSet, err
}
}
return strings.Split(gtidSet, ":")[0], nil
}

func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error) {
func (c *Collector) removeEmptyBinlogs(ctx context.Context, logs []pxc.Binlog) ([]pxc.Binlog, error) {
result := make([]pxc.Binlog, 0)
for _, v := range logs {
set, err := c.db.GetGTIDSet(v.Name)
set, err := c.db.GetGTIDSet(ctx, v.Name)
if err != nil {
return nil, errors.Wrap(err, "get GTID set")
}
Expand All @@ -162,9 +194,9 @@ func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error)
return result, nil
}

func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) {
func (c *Collector) filterBinLogs(ctx context.Context, logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) {
if lastBinlogName == "" {
return c.removeEmptyBinlogs(logs)
return c.removeEmptyBinlogs(ctx, logs)
}

logsLen := len(logs)
Expand All @@ -178,7 +210,7 @@ func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]p
return nil, nil
}

set, err := c.db.GetGTIDSet(logs[startIndex].Name)
set, err := c.db.GetGTIDSet(ctx, logs[startIndex].Name)
if err != nil {
return nil, errors.Wrap(err, "get gtid set of last uploaded binlog")
}
Expand All @@ -188,7 +220,7 @@ func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]p
startIndex++
}

return c.removeEmptyBinlogs(logs[startIndex:])
return c.removeEmptyBinlogs(ctx, logs[startIndex:])
}

func createGapFile(gtidSet string) error {
Expand All @@ -206,13 +238,13 @@ func createGapFile(gtidSet string) error {
return nil
}

func (c *Collector) CollectBinLogs() error {
list, err := c.db.GetBinLogList()
func (c *Collector) CollectBinLogs(ctx context.Context) error {
list, err := c.db.GetBinLogList(ctx)
if err != nil {
return errors.Wrap(err, "get binlog list")
}

sourceID, err := c.CurrentSourceID(list)
sourceID, err := c.CurrentSourceID(ctx, list)
if err != nil {
return errors.Wrap(err, "get current source id")
}
Expand All @@ -222,7 +254,7 @@ func (c *Collector) CollectBinLogs() error {
return nil
}

c.lastSet, err = c.lastGTIDSet(sourceID)
c.lastSet, err = c.lastGTIDSet(ctx, sourceID)
if err != nil {
return errors.Wrap(err, "get last uploaded gtid set")
}
Expand All @@ -231,7 +263,7 @@ func (c *Collector) CollectBinLogs() error {

if c.lastSet != "" {
// get last uploaded binlog file name
lastUploadedBinlogName, err = c.db.GetBinLogName(c.lastSet)
lastUploadedBinlogName, err = c.db.GetBinLogName(ctx, c.lastSet)
if err != nil {
return errors.Wrap(err, "get last uploaded binlog name by gtid set")
}
Expand All @@ -245,7 +277,7 @@ func (c *Collector) CollectBinLogs() error {
}
}

list, err = c.filterBinLogs(list, lastUploadedBinlogName)
list, err = c.filterBinLogs(ctx, list, lastUploadedBinlogName)
if err != nil {
return errors.Wrap(err, "filter empty binlogs")
}
Expand All @@ -256,7 +288,7 @@ func (c *Collector) CollectBinLogs() error {
}

for _, binlog := range list {
err = c.manageBinlog(binlog)
err = c.manageBinlog(ctx, binlog)
if err != nil {
return errors.Wrap(err, "manage binlog")
}
Expand All @@ -275,9 +307,8 @@ func mergeErrors(a, b error) error {
return b
}

func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {

binlogTmstmp, err := c.db.GetBinLogFirstTimestamp(binlog.Name)
func (c *Collector) manageBinlog(ctx context.Context, binlog pxc.Binlog) (err error) {
binlogTmstmp, err := c.db.GetBinLogFirstTimestamp(ctx, binlog.Name)
if err != nil {
return errors.Wrapf(err, "get first timestamp for %s", binlog.Name)
}
Expand All @@ -302,7 +333,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {
}

errBuf := &bytes.Buffer{}
cmd := exec.Command("mysqlbinlog", "-R", "--raw", "-h"+c.db.GetHost(), "-u"+c.pxcUser, binlog.Name)
cmd := exec.CommandContext(ctx, "mysqlbinlog", "-R", "--raw", "-h"+c.db.GetHost(), "-u"+c.pxcUser, binlog.Name)
cmd.Env = append(cmd.Env, "MYSQL_PWD="+c.pxcPass)
cmd.Dir = os.TempDir()
cmd.Stderr = errBuf
Expand Down Expand Up @@ -337,7 +368,7 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {

go readBinlog(file, pw, errBuf, binlog.Name)

err = c.storage.PutObject(binlogName, pr, -1)
err = c.storage.PutObject(ctx, binlogName, pr, -1)
if err != nil {
return errors.Wrapf(err, "put %s object", binlog.Name)
}
Expand All @@ -349,15 +380,15 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {
return errors.Wrap(err, "wait mysqlbinlog command error:"+errBuf.String())
}

err = c.storage.PutObject(binlogName+gtidPostfix, &setBuffer, int64(setBuffer.Len()))
err = c.storage.PutObject(ctx, binlogName+gtidPostfix, &setBuffer, int64(setBuffer.Len()))
if err != nil {
return errors.Wrap(err, "put gtid-set object")
}
// no error handling because WriteString() always return nil error
// nolint:errcheck
setBuffer.WriteString(binlog.GTIDSet)

err = c.storage.PutObject(lastSetFilePrefix+strings.Split(binlog.GTIDSet, ":")[0], &setBuffer, int64(setBuffer.Len()))
err = c.storage.PutObject(ctx, lastSetFilePrefix+strings.Split(binlog.GTIDSet, ":")[0], &setBuffer, int64(setBuffer.Len()))
if err != nil {
return errors.Wrap(err, "put last-set object")
}
Expand Down
Loading

0 comments on commit 49ae069

Please sign in to comment.