Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Time To Seal metric to access node to track time it takes to seal a transaction #6512

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
def498c
Added TTS metrics
AndriiDiachuk Oct 1, 2024
8fffbb6
Generated mocks
AndriiDiachuk Oct 1, 2024
90addfd
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 1, 2024
9f4feb5
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 2, 2024
4558746
Fixed failing test
AndriiDiachuk Oct 2, 2024
41b0fad
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into add-ti…
AndriiDiachuk Oct 2, 2024
6348f03
Renamed struct and id field
AndriiDiachuk Oct 3, 2024
9bd79db
Using cached data for if statement
AndriiDiachuk Oct 3, 2024
a4e8537
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into add-ti…
AndriiDiachuk Oct 14, 2024
0cb72f8
Expanded CollectionExecutedMetricImpl with map for sealed metrics
AndriiDiachuk Oct 15, 2024
82abb35
Removed ByID mocks and added filed init in constructor
AndriiDiachuk Oct 15, 2024
a2c12e7
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 16, 2024
2401406
Added mutex for sealed metrics
AndriiDiachuk Oct 16, 2024
944451f
Merge branch 'master' into add-time-to-seal-metrics
AndriiDiachuk Oct 18, 2024
2ebce19
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
b76f46b
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
b3a1c84
Update module/state_synchronization/indexer/collection_executed_metri…
AndriiDiachuk Oct 18, 2024
70b82e6
removed comment
AndriiDiachuk Oct 18, 2024
bf4d8e8
Using IdentifierMap instead of map
AndriiDiachuk Oct 18, 2024
ef8d3af
Added err handling
AndriiDiachuk Oct 18, 2024
7630bf9
Adding transactions also in Collection finilized method
AndriiDiachuk Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type AccessNodeConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
Expand Down Expand Up @@ -241,6 +242,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
pingEnabled: false,
retryEnabled: false,
rpcMetricsEnabled: false,
Expand Down Expand Up @@ -1234,6 +1236,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.pingEnabled,
"ping-enabled",
defaultConfig.pingEnabled,
Expand Down Expand Up @@ -1688,6 +1694,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
}).
Expand Down
7 changes: 7 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type ObserverServiceConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
executionDataSyncEnabled bool
executionDataIndexingEnabled bool
executionDataDBMode string
Expand Down Expand Up @@ -234,6 +235,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
executionDataSyncEnabled: false,
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(),
Expand Down Expand Up @@ -678,6 +680,10 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.BoolVar(&builder.executionDataIndexingEnabled,
"execution-data-indexing-enabled",
Expand Down Expand Up @@ -1759,6 +1765,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
})
Expand Down
1 change: 1 addition & 0 deletions model/flow/transaction_timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type TransactionTiming struct {
Received time.Time
Finalized time.Time
Executed time.Time
Sealed time.Time
}

func (t TransactionTiming) ID() Identifier {
Expand Down
4 changes: 4 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,10 @@ type TransactionMetrics interface {
// works if the transaction was earlier added as received.
TransactionFinalized(txID flow.Identifier, when time.Time)

// TransactionSealed reports the time spent between the transaction being received and sealed. Reporting only
// works if the transaction was earlier added as received.
TransactionSealed(txID flow.Identifier, when time.Time)

// TransactionExecuted reports the time spent between the transaction being received and executed. Reporting only
// works if the transaction was earlier added as received.
TransactionExecuted(txID flow.Identifier, when time.Time)
Expand Down
1 change: 1 addition & 0 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (nc *NoopCollector) ScriptExecutionNotIndexed()
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionSealed(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionValidated() {}
Expand Down
53 changes: 53 additions & 0 deletions module/metrics/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type TransactionCollector struct {
logTimeToFinalized bool
logTimeToExecuted bool
logTimeToFinalizedExecuted bool
logTimeToSealed bool
timeToFinalized prometheus.Summary
timeToExecuted prometheus.Summary
timeToFinalizedExecuted prometheus.Summary
timeToSealed prometheus.Summary
transactionSubmission *prometheus.CounterVec
transactionSize prometheus.Histogram
scriptExecutedDuration *prometheus.HistogramVec
Expand All @@ -40,6 +42,7 @@ func NewTransactionCollector(
logTimeToFinalized bool,
logTimeToExecuted bool,
logTimeToFinalizedExecuted bool,
logTimeToSealed bool,
) *TransactionCollector {

tc := &TransactionCollector{
Expand All @@ -48,6 +51,7 @@ func NewTransactionCollector(
logTimeToFinalized: logTimeToFinalized,
logTimeToExecuted: logTimeToExecuted,
logTimeToFinalizedExecuted: logTimeToFinalizedExecuted,
logTimeToSealed: logTimeToSealed,
timeToFinalized: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_finalized_seconds",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -91,6 +95,20 @@ func NewTransactionCollector(
AgeBuckets: 5,
BufCap: 500,
}),
timeToSealed: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_seal_seconds",
Namespace: namespaceAccess,
Subsystem: subsystemTransactionTiming,
Help: "the duration of how long it took between the transaction was received until it was sealed",
Objectives: map[float64]float64{
0.01: 0.001,
0.5: 0.05,
0.99: 0.001,
},
MaxAge: 10 * time.Minute,
AgeBuckets: 5,
BufCap: 500,
}),
transactionSubmission: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "transaction_submission",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -269,6 +287,27 @@ func (tc *TransactionCollector) TransactionExecuted(txID flow.Identifier, when t
}
}

func (tc *TransactionCollector) TransactionSealed(txID flow.Identifier, when time.Time) {
t, updated := tc.transactionTimings.Adjust(txID, func(t *flow.TransactionTiming) *flow.TransactionTiming {
t.Sealed = when
return t
})

if !updated {
tc.log.Debug().
Str("transaction_id", txID.String()).
Msg("failed to update TransactionSealed metric")
return
}

tc.trackTTS(t, tc.logTimeToSealed)

// remove transaction timing from mempool if sealed
if !t.Sealed.IsZero() {
tc.transactionTimings.Remove(txID)
}
}

func (tc *TransactionCollector) trackTTF(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Finalized.IsZero() {
return
Expand Down Expand Up @@ -317,6 +356,20 @@ func (tc *TransactionCollector) trackTTFE(t *flow.TransactionTiming, log bool) {
}
}

func (tc *TransactionCollector) trackTTS(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Sealed.IsZero() {
return
}
duration := t.Sealed.Sub(t.Received).Seconds()

tc.timeToSealed.Observe(duration)

if log {
tc.log.Info().Str("transaction_id", t.TransactionID.String()).Float64("duration", duration).
Msg("transaction time to sealed")
}
}

func (tc *TransactionCollector) TransactionSubmissionFailed() {
tc.transactionSubmission.WithLabelValues("failed").Inc()
}
Expand Down
5 changes: 5 additions & 0 deletions module/mock/access_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions module/mock/transaction_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"errors"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -25,6 +26,9 @@ type CollectionExecutedMetricImpl struct {

collections storage.Collections
blocks storage.Blocks

blockTransactions map[flow.Identifier][]flow.Identifier // Map to track transactions for each block for sealed metrics
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.RWMutex
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}

func NewCollectionExecutedMetricImpl(
Expand All @@ -44,6 +48,7 @@ func NewCollectionExecutedMetricImpl(
blocksToMarkExecuted: blocksToMarkExecuted,
collections: collections,
blocks: blocks,
blockTransactions: make(map[flow.Identifier][]flow.Identifier),
}, nil
}

Expand Down Expand Up @@ -86,9 +91,33 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) {
continue
}

c.mutex.Lock()
for _, t := range l.Transactions {
c.blockTransactions[blockID] = append(c.blockTransactions[blockID], t)
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
c.accessMetrics.TransactionFinalized(t, now)
}
c.mutex.Unlock()
}
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved

// Process block seals
for _, s := range block.Payload.Seals {
c.mutex.RLock()
transactions, found := c.blockTransactions[s.BlockID]
c.mutex.RUnlock() // release the read lock after reading
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved

if found && len(transactions) != 0 {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
// Lock for writing since we will modify the transactions list
c.mutex.Lock()
// Process all transactions
for _, t := range transactions {
c.accessMetrics.TransactionSealed(t, now)
}

// Remove the entire block of transactions once processed
delete(c.blockTransactions, s.BlockID)
// Unlock after modifications are done
c.mutex.Unlock()
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}
}

if ti, found := c.blocksToMarkExecuted.ByID(blockID); found {
Expand Down
Loading