Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MessageDb): Replace logging with complete OT metrics #407

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

nordfjord
Copy link
Contributor

@nordfjord nordfjord commented Jul 20, 2023

No description provided.

src/Equinox/Tracing.fs Outdated Show resolved Hide resolved
member act.AddAppendBytes(bytes: int) = act.SetTag("eqx.append_bytes", bytes)

member act.IncMetric(count: int, bytes: int) =
let currentCount = act.GetTagItem("eqx.count") |> ValueOption.ofObj |> ValueOption.map unbox<int> |> ValueOption.defaultValue 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match GetTagItem() with :? int as x -> x | _ -> 0 might be more legible and more efficient?
(and can be a helper off to the side - or is there a GetOrDefault somewhere in System.Diagnostics?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed IncMetric

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do consecutive append attempts (and rereads, if the store does not do it for free as part of a resync op) work then?

|> Arb.fromGen
static member RequestId = Arb.generate<Guid> |> Gen.map (fun x -> RequestId.parse %x) |> Arb.fromGen


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this line move to L10 and put a copy before L11

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want two spaces between the Arb an FsCheckGenerators module?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, a blank after the module line, and one between modules

if act <> null then act.IncMetric(count, bytes).AddLastVersion(slice.LastVersion) |> ignore
let log = if not (log.IsEnabled Events.LogEventLevel.Debug) then log else log |> Log.propResolvedEvents "Json" slice.Messages
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would someone do if they wanted to see the raw json in a debugger?

Copy link
Contributor Author

@nordfjord nordfjord Jul 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this is being used presently. I copied it in from SqlStreamStore, but have never actually ran with debug logs even in dev. When I want to see raw stuff I just run a query against the DB

select * from get_stream_messages('MyCategory-streamId')

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes selectively upping logging level can be useful for hard to diagnose things
or in messy test scenarios with asynchrony that dont play well with a debugger
or if you want to show the workings in interactive mode
talking about being able to double check encodings and/or escaping etc
For when your json lib converts strings to dates and the like ;)
The MemoryStoreLogger (some is in Propulsion.MemoryStore; some is in the templates) kinda covers this but from a different angle (I'll often turn up the logging for a property test failure where I have the seed)

if not slice.IsEnd then
batchCount <- batchCount + 1
pos <- slice.LastVersion + 1L
return! loop () }
task {
do! loop ()
let act = Activity.Current
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out to L125; as it does the top, it might as well do the tail too
at that point removing the loggedReadSlice layer in L70 becomes questionable - maybe it should also be inline into the consume loop (if backward loading ever became possible, the lack of DRYness can probably be handled by the Tracing.fs)

match result with
| MdbSyncResult.Written x ->
if act <> null then
act.SetStatus(ActivityStatusCode.Ok).AddTag("eqx.new_version", x) |> ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have a RecordSuccess? also move out that tag literal

act.SetStatus(ActivityStatusCode.Ok).AddTag("eqx.new_version", x) |> ignore
| MdbSyncResult.ConflictUnknown ->
let eventTypes = [| for x in events -> x.EventType |]
if act <> null then act.RecordConflict().AddTag("eqx.event_types", eventTypes) |> ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract to an extension?

"Write", count, match evt with Log.WriteConflict _ -> true | _ -> false)
let bytes = eventDataBytes events
if act <> null then act.AddExpectedVersion(version).AddAppendBytes(bytes) |> ignore
let! result = writeEventsAsync writer streamName version events ct
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there is only one roundtrip and npgsql has a span for it, hm

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I'm moving from Cosmos V2 to Cosmos V3 and they change the underlying metric, I'd still like to know my RTT metrics
And if I then move from Cosmos to Dynamo and the SDK does not cover it?
I guess I just need to let go (seealso L72)

let snapshotStream = Snapshot.streamName category streamId
let category = Snapshot.snapshotCategory category
let act = Activity.Current
if act <> null then act.SetTag("eqx.snapshot_written", true) |> ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an operation success indicator / is one required for the snapshot write completing (and might it prematurely set the other success indicator) - I guess a good reason to have a named extension method making it obvious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is one required for the snapshot write completing

I've been thinking about the semantics of snapshots. They're an optimisation, so if we fail to write a snapshot, should we just swallow it?

Copy link
Collaborator

@bartelink bartelink Jul 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, nobody wants silent failures. If someone asked for it, maybe. But you'd probably want metrics to be able to extract the failure rate.
Also for people with streams that take seriously long to walk and/or fold, one might even implement proactive snapshotting (or traverse all streams to update snapshots because you deem it that critical to reasonable perf).
Having the snapshots there can be critical to meeting an SLA
And if it turns out that this is the very thing that's making your system unstable or actually ruining perf instead of improving it, you want to know

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for people with streams that take seriously long to walk and/or fold, one might implement proactive snapshotting.

Yeah, I added a frequency parameter to the JS version which I think will see a lot of use, will contribute here when I get the time!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also if a nullable varchar max was made available to abuse to store snapshots inside the main stream, you wouldnt suddenly want different semantics. Keep it direct and surprise free I say.

src/Equinox/Core.fs Outdated Show resolved Hide resolved
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp)
match lock x fetchStateConsistently with
| _, ValueSome cachedValue, true ->
if act <> null then act.AddCacheHit(true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cache hit is different to the other one, which is only using the cache entry as a base for the load (and this one has a max age associated, see the .AddStale comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on L55, could record whether we waited for someone else to shoot the bear, or we were the instigator (not bothered as much about that when compared to the "used cached value directly" vs "used cache entry to do an incremental load" fact)

@bartelink bartelink changed the title Drop serilog from message-db feat(MessageDb): remove logging Jul 23, 2023
@@ -293,7 +293,7 @@ let dumpStats log = function
| Store.Context.Dynamo _ -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump log
| Store.Context.Es _ -> Equinox.EventStoreDb.Log.InternalMetrics.dump log
| Store.Context.Sql _ -> Equinox.SqlStreamStore.Log.InternalMetrics.dump log
| Store.Context.Mdb _ -> Equinox.MessageDb.Log.InternalMetrics.dump log
| Store.Context.Mdb _ -> ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment please!

test <@ spanDict = Map.ofList m @>

type GeneralTests() =

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MUST IGNORE DOUBLE BLANK LINES

let retries = "eqx.retries"
/// The number of events we appended
[<Literal>]
let append_count = "eqx.append_count"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the rule about _ vs . ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have one 😓. dots indicate a grouping?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, we have bytes and count of the same thing, so I say . then!

/// The new version of the stream after appending events
[<Literal>]
let new_version = "eqx.new_version"
/// In case of conflict, which event types did we try to append
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// In case of conflict, which event types did we try to append
/// (iff conflict) first 3 distinct event types the append included

also should it be append.types ?

[<Literal>]
let append_types = "eqx.append_types"

module Load =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do split above tags into sections with Shared / Query / Transact, think it makes sense to keep these aligned with the actual operations, and it will show the mapping neatly

.SetTag(Tags.stream_id, streamId)
.SetTag(Tags.stream_name, streamName)
.SetTag(Tags.requires_leader, requiresLeader)
.SetTag(Tags.any_cached_value, System.TimeSpan.Zero = maxAge)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if keeping for back compat, use the literal inline here

let append_types = "eqx.append_types"

module Load =
let setTags (category, streamId, streamName, requiresLeader, maxAge) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let setTags (category, streamId, streamName, requiresLeader, maxAge) =
let set (category, streamId, streamName, requiresLeader, maxAge) =

@@ -293,7 +293,7 @@ let dumpStats log = function
| Store.Context.Dynamo _ -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump log
| Store.Context.Es _ -> Equinox.EventStoreDb.Log.InternalMetrics.dump log
| Store.Context.Sql _ -> Equinox.SqlStreamStore.Log.InternalMetrics.dump log
| Store.Context.Mdb _ -> Equinox.MessageDb.Log.InternalMetrics.dump log
| Store.Context.Mdb _ -> () // MessageDB does not expose InternalMetrics. Use an ActivityListener instead
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Store.Context.Mdb _ -> () // MessageDB does not expose InternalMetrics. Use an ActivityListener instead
| Store.Context.Mdb _ -> () // MessageDB does not expose InternalMetrics. Use an ActivityListener instead

Did you just tell me to ...

@@ -0,0 +1,479 @@
module Equinox.MessageDb.Integration.MessageDbIntegration

open System.Threading
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort

let span (m: (string * obj) list) (span: Activity) =
let spanDict = Map.ofList [
for key, _ in m do
key, span.GetTagItem(key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

test <@ value = result @>
}

let loadCached hit batches count (span: Activity) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

yield System.Collections.Generic.KeyValuePair<_, _>(x.EventType, bytesToString x.Data) })
let withLoggedRetries<'t> retryPolicy (contextLabel: string) (f: ILogger -> CancellationToken -> Task<'t>) log ct: Task<'t> =
module Activity =
let setTags (tags: (string * obj)[]) =
Copy link
Collaborator

@bartelink bartelink Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline? maybe just call it set as the call sites are littered with Tag (or call it tags ?)

let logPeriodicRate name count = log.Information("rp{name} {count:n0}", name, count)
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64)
let withRetryTag count =
Activity.setTags [|Tags.retries, count|]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Activity.setTags [|Tags.retries, count|]
Activity.setTags [| Tags.retries, count |]

| Some retryPolicy ->
let withLoggingContextWrapping count =
let log = if count = 1 then log else log |> prop contextLabel count
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count was an attempt number, has it changed? if so, maybe count should become retryCount?

(resultLog |> Log.event evt).Information("Mdb{action:l} count={count} conflict={conflict}",
"Write", count, match evt with Log.WriteConflict _ -> true | _ -> false)
let private writeEventsLogged (writer: MessageDbWriter) streamName version events ct: Task<MdbSyncResult> = task {
Activity.setTags [|Tags.append_bytes, eventDataBytes events|]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces inside all [|

let eventTypes =
if events.Length <= 3
then [| for x in events -> x.EventType |]
else [| for x in Seq.take 3 events -> x.EventType |]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not doing a Seq.distinct, then use an unconditional Seq.truncate instead of the condition


module Read =

module LoadMethod =
[<Literal>]
let last = "Last"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could make a ReadLast module and call this methodName ?


let batchLog = log |> Log.prop "batchIndex" batchCount
let! slice = readSlice pos batchCount batchLog ct
let! slice = readSlice pos ct
version <- max version slice.LastVersion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max is only relevant for reading backwards?


let batchLog = log |> Log.prop "batchIndex" batchCount
let! slice = readSlice pos batchCount batchLog ct
let! slice = readSlice pos ct
version <- max version slice.LastVersion
state <- slice.Messages |> Seq.chooseV tryDecode |> fold state
batchCount <- batchCount + 1
eventCount <- eventCount + slice.Messages.Length
pos <- slice.LastVersion + 1L
Copy link
Collaborator

@bartelink bartelink Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potentially off by 1?
need comment on this, or L80 or some header comment on version vs pos vs message_db version numbering
If its not 0 based, then let tags.fs call out which stores are being archaic ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's inclusive. so if I load from v1 I'll get event 1,2,3. If I load from v2 I'll get 2,3 etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this was more aimed at L85)

if LastVersion is a StreamVersion (-1 based) and version is a version (0 based)
I've lost track of it all
For me
a) I'd try to keep as many layers 0 based as possible (and refer to those as version)
b) depending on how relevant the internal store versioning scheme is, I'd try to have a layer that deals with that (and use a separate name, e.g. streamVersion, but ideally something that sticks out and makes you think)

If a pos is a version, and reading at pos 1 is reading events to apply on top of the snapshot version 1 which is based on one event, it's beautiful for me

let read _ = readLastEventAsync reader streamName requiresLeader eventType

let! t, page = Log.withLoggedRetries retryPolicy "readAttempt" read log |> Stopwatch.time ct
Activity.setTags [|Tags.batches, batchCount; Tags.loaded_count, eventCount; Tags.read_version, version|]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loaded_bytes?
maybe a common helper to share with loadLastEvent?

@@ -323,38 +175,39 @@ type MessageDbContext(client: MessageDbClient, batchOptions: BatchOptions) =
member val BatchOptions = batchOptions

member _.TokenEmpty = Token.create -1L
Copy link
Collaborator

@bartelink bartelink Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did I mention that I hate -1 ? maybe the store interface can use the term streamVersion (-1) based and the rest uses 0 based version/index?

Hm maybe the answer is to consistently use the term streamVersion in names in here if not 0 based - then there'll be a clear transition point whenever an act tag needs a 0 based one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking I might move the conversion into the client and just deal with 0-based versions everywhere else. Making the token "data" unit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that was the objective of my whining, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants