Skip to content

Commit

Permalink
chore(pkg/replication): add replication lag metric
Browse files Browse the repository at this point in the history
  • Loading branch information
ostafen committed Sep 19, 2024
1 parent 86e956f commit f8c1041
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
6 changes: 4 additions & 2 deletions pkg/database/lazy_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package database
import (
"context"
"crypto/sha256"
"io"
"errors"
"path/filepath"
"time"

Expand All @@ -29,6 +29,8 @@ import (
"github.com/codenotary/immudb/pkg/api/schema"
)

var ErrNoNewTransactions = errors.New("no new transactions")

type lazyDB struct {
m *DBManager

Expand Down Expand Up @@ -440,7 +442,7 @@ func (db *lazyDB) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest)

if !req.AllowPreCommitted {
if req.Tx > state.TxId {
return nil, 0, [sha256.Size]byte{}, io.EOF
return nil, 0, [sha256.Size]byte{}, ErrNoNewTransactions
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/replication/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ var (
Name: "immudb_replication_allow_commit_up_to_tx_id",
Help: "most recently received confirmation up to which commit id the replica is allowed to durably commit",
}, []string{"db"})

_metricsReplicationLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "immudb_replication_lag",
Help: "The difference between the last transaction committed by the primary and replicated by the replica",
}, []string{"db"})
)

type metrics struct {
Expand All @@ -76,6 +81,7 @@ type metrics struct {
replicatorsInRetryDelay prometheus.Gauge
primaryCommittedTxID prometheus.Gauge
allowCommitUpToTxID prometheus.Gauge
replicationLag prometheus.Gauge
}

// metricsForDb returns metrics object for particular database name
Expand All @@ -89,6 +95,7 @@ func metricsForDb(dbName string) metrics {
replicatorsInRetryDelay: _metricsReplicatorsInRetryDelay.WithLabelValues(dbName),
primaryCommittedTxID: _metricsReplicationPrimaryCommittedTxID.WithLabelValues(dbName),
allowCommitUpToTxID: _metricsAllowCommitUpToTxID.WithLabelValues(dbName),
replicationLag: _metricsReplicationLag.WithLabelValues(dbName),
}
}

Expand All @@ -99,6 +106,7 @@ func (m *metrics) reset() {
m.replicatorsInRetryDelay.Set(0)
m.primaryCommittedTxID.Set(0)
m.allowCommitUpToTxID.Set(0)
m.replicationLag.Set(0)
}

// replicationTimeHistogramTimer returns prometheus timer for replicationTimeHistogram
Expand Down
17 changes: 17 additions & 0 deletions pkg/replication/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,14 @@ func (txr *TxReplicator) fetchNextTx() error {
defer txr.disconnect()
}

txr.maybeUpdateReplicationLag(commitState.TxId, emd)

if err != nil && !errors.Is(err, io.EOF) {
if strings.Contains(err.Error(), database.ErrNoNewTransactions.Error()) {
txr.metrics.replicationLag.Set(0)
return err
}

if strings.Contains(err.Error(), "replica commit state diverged from primary") {
txr.logger.Errorf("replica commit state at '%s' diverged from primary's", txr.db.GetName())
return ErrReplicaDivergedFromPrimary
Expand Down Expand Up @@ -497,3 +504,13 @@ func (txr *TxReplicator) Error() error {

return txr.err
}

func (txr *TxReplicator) maybeUpdateReplicationLag(lastCommittedTxID uint64, metadata map[string][]byte) {
primaryLastCommittedTxIDBin, ok := metadata["committed-txid-bin"]
if !ok {
return
}

lag := binary.BigEndian.Uint64(primaryLastCommittedTxIDBin) - lastCommittedTxID
txr.metrics.replicationLag.Set(float64(lag))
}
13 changes: 11 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,18 @@ func (s *ImmuServer) Start() (err error) {
startedAt = time.Now()

if s.Options.MetricsServer {
s.metricsServer = StartMetrics(1*time.Minute, s.Options.MetricsBind(), s.Options.TLSConfig, s.Logger, s.metricFuncServerUptimeCounter,
s.metricFuncComputeDBSizes, s.metricFuncComputeDBEntries, s.metricFuncComputeLoadedDBSize, s.metricFuncComputeSessionCount,
s.metricsServer = StartMetrics(
1*time.Minute,
s.Options.MetricsBind(),
s.Options.TLSConfig,
s.Logger,
s.metricFuncServerUptimeCounter,
s.metricFuncComputeDBSizes,
s.metricFuncComputeDBEntries,
s.metricFuncComputeLoadedDBSize,
s.metricFuncComputeSessionCount,
s.Options.PProf)

defer func() {
if err := s.metricsServer.Close(); err != nil {
s.Logger.Errorf("failed to shutdown metric server: %s", err)
Expand Down
25 changes: 13 additions & 12 deletions pkg/server/stream_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,24 @@ func (s *ImmuServer) exportTx(req *schema.ExportTxRequest, txsServer schema.Immu
return err
}

var streamMetadata map[string][]byte
var bCommittedTxID [8]byte
state, err := db.CurrentState()
if err == nil {
binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
}

// In asynchronous replication, the last committed transaction value is sent to the replica
// to enable updating its replication lag.
streamMetadata := map[string][]byte{
"committed-txid-bin": bCommittedTxID[:],
}

if req.ReplicaState != nil {
var bMayCommitUpToTxID [8]byte
binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID)

var bCommittedTxID [8]byte
state, err := db.CurrentState()
if err == nil {
binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
}

streamMetadata = map[string][]byte{
"may-commit-up-to-txid-bin": bMayCommitUpToTxID[:],
"may-commit-up-to-alh-bin": mayCommitUpToAlh[:],
"committed-txid-bin": bCommittedTxID[:],
}
streamMetadata["may-commit-up-to-txid-bin"] = bMayCommitUpToTxID[:]
streamMetadata["may-commit-up-to-alh-bin"] = mayCommitUpToAlh[:]

if setTrailer {
// trailer metadata is kept for backward compatibility
Expand Down

0 comments on commit f8c1041

Please sign in to comment.