From fa9cb36e7653a6a2fd2b702bc82f7d4eb34573a8 Mon Sep 17 00:00:00 2001 From: John Bell Date: Wed, 20 Mar 2024 22:29:55 +0000 Subject: [PATCH 1/4] Add support for overriding created_at timestamps for copy transform type workflows --- lib/event_store.ex | 11 ++++++-- lib/event_store/sql/statements.ex | 4 +-- .../sql/statements/insert_events.sql.eex | 9 ++++-- .../insert_events_any_version.sql.eex | 9 ++++-- lib/event_store/storage/appender.ex | 17 +++++++++-- lib/event_store/streams/stream.ex | 6 ++-- test/event_store_test.exs | 28 +++++++++++++++++++ 7 files changed, 70 insertions(+), 14 deletions(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index a7b63b5f..5ad4207c 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -527,9 +527,16 @@ defmodule EventStore do Snapshotter.delete_snapshot(conn, source_uuid, opts) end + @accepted_overrides [:created_at] defp parse_opts(opts) do - name = name(opts) - config = Config.lookup(name) + overrides = Keyword.take(opts, @accepted_overrides) + + config = + opts + |> name() + |> Config.lookup() + |> Keyword.merge(overrides) + conn = Keyword.get(opts, :conn) || Keyword.fetch!(config, :conn) timeout = timeout(opts, config) diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index f1528b47..c9801731 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -11,8 +11,8 @@ defmodule EventStore.Sql.Statements do for {fun, args} <- [ {:count_streams, [:schema]}, {:create_stream, [:schema]}, - {:insert_events, [:schema, :stream_id, :number_of_events]}, - {:insert_events_any_version, [:schema, :stream_id, :number_of_events]}, + {:insert_events, [:schema, :stream_id, :number_of_events, :created_at]}, + {:insert_events_any_version, [:schema, :stream_id, :number_of_events, :created_at]}, {:insert_link_events, [:schema, :number_of_events]}, {:soft_delete_stream, [:schema]}, {:hard_delete_stream, [:schema]}, diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index 49bcae15..b460112c 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -24,12 +24,17 @@ WITH <% end %> ), stream AS ( - <%= if stream_id do %> + <%= cond do %> + <% stream_id -> %> UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = $1::bigint returning stream_id - <% else %> + <% created_at -> %> + INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at) + VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>) + returning stream_id + <% true -> %> INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version) VALUES ($1, $2::bigint) returning stream_id diff --git a/lib/event_store/sql/statements/insert_events_any_version.sql.eex b/lib/event_store/sql/statements/insert_events_any_version.sql.eex index 7be918e9..123e1509 100644 --- a/lib/event_store/sql/statements/insert_events_any_version.sql.eex +++ b/lib/event_store/sql/statements/insert_events_any_version.sql.eex @@ -1,11 +1,16 @@ WITH stream AS ( - <%= if stream_id do %> + <%= cond do %> + <% stream_id -> %> UPDATE "<%= schema %>".streams SET stream_version = stream_version + $2::bigint WHERE stream_id = $1::bigint returning stream_id, stream_version - $2::bigint as initial_stream_version - <% else %> + <% created_at -> %> + INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version, created_at) + VALUES ($1, $2::bigint, $<%= number_of_events*9 + 3 %>) + returning stream_id, stream_version - $2::bigint as initial_stream_version + <% true -> %> INSERT INTO "<%= schema %>".streams (stream_uuid, stream_version) VALUES ($1, $2::bigint) returning stream_id, stream_version - $2::bigint as initial_stream_version diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index ffd6b5c2..27b192ed 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -99,15 +99,23 @@ defmodule EventStore.Storage.Appender do defp insert_event_batch(conn, stream_id, stream_uuid, events, event_count, opts) do {schema, opts} = Keyword.pop(opts, :schema) {expected_version, opts} = Keyword.pop(opts, :expected_version) + {created_at, opts} = Keyword.pop(opts, :created_at) statement = case expected_version do - :any_version -> Statements.insert_events_any_version(schema, stream_id, event_count) - _expected_version -> Statements.insert_events(schema, stream_id, event_count) + :any_version -> + Statements.insert_events_any_version(schema, stream_id, event_count, created_at) + + _expected_version -> + Statements.insert_events(schema, stream_id, event_count, created_at) end stream_id_or_uuid = stream_id || stream_uuid - params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events) + + params = + [stream_id_or_uuid, event_count] + |> Enum.concat(build_insert_parameters(events)) + |> maybe_append(created_at, stream_id) case Postgrex.query(conn, statement, params, opts) do {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} @@ -116,6 +124,9 @@ defmodule EventStore.Storage.Appender do end end + defp maybe_append(params, value, nil) when not is_nil(value), do: params ++ [value] + defp maybe_append(params, _, _), do: params + defp build_insert_parameters(events) do events |> Enum.with_index(1) diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index f82d89b5..2cd1a998 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -145,16 +145,16 @@ defmodule EventStore.Streams.Stream do serializer, opts ) do - prepared_events = prepare_events(events, stream, serializer) + prepared_events = prepare_events(events, stream, serializer, opts) write_to_stream(conn, prepared_events, stream, expected_version, opts) end - defp prepare_events(events, %StreamInfo{} = stream, serializer) do + defp prepare_events(events, %StreamInfo{} = stream, serializer, opts) do %StreamInfo{stream_uuid: stream_uuid, stream_version: stream_version} = stream events - |> Enum.map(&map_to_recorded_event(&1, utc_now(), serializer)) + |> Enum.map(&map_to_recorded_event(&1, opts[:created_at] || utc_now(), serializer)) |> Enum.with_index(1) |> Enum.map(fn {recorded_event, index} -> %RecordedEvent{ diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 794c3bc9..3bb4169b 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -203,6 +203,34 @@ defmodule EventStore.EventStoreTest do assert recorded_event.data.event == unicode_text end + test "override created_at" do + created_at = DateTime.utc_now() |> DateTime.add(-1, :day) + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + + :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at: created_at) + + [recorded_event] = EventStore.stream_all_forward() |> Enum.to_list() + {:ok, stream_info} = EventStore.stream_info(stream_uuid) + + assert recorded_event.created_at == created_at + assert stream_info.created_at == created_at + end + + test "override created_at any_version" do + created_at = DateTime.utc_now() |> DateTime.add(-1, :day) + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + + :ok = EventStore.append_to_stream(stream_uuid, :any_version, events, created_at: created_at) + + [recorded_event] = EventStore.stream_all_forward() |> Enum.to_list() + {:ok, stream_info} = EventStore.stream_info(stream_uuid) + + assert recorded_event.created_at == created_at + assert stream_info.created_at == created_at + end + describe "transient subscription" do test "should notify subscribers after event persisted to stream" do stream_uuid = UUID.uuid4() From 5f9c526e1797a364d1083ab6711b58358ff97a75 Mon Sep 17 00:00:00 2001 From: John Bell Date: Thu, 21 Mar 2024 00:31:09 +0000 Subject: [PATCH 2/4] Add additional test for checking streams with existing events --- test/event_store_test.exs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 3bb4169b..32b943cd 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -217,6 +217,24 @@ defmodule EventStore.EventStoreTest do assert stream_info.created_at == created_at end + test "override created_at existing stream" do + created_at = DateTime.utc_now() |> DateTime.add(-1, :day) + created_at2 = DateTime.utc_now() |> DateTime.add(-1, :hour) + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + events2 = EventFactory.create_events(1) + + :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at: created_at) + :ok = EventStore.append_to_stream(stream_uuid, :any_version, events2, created_at: created_at2) + + [event1, event2] = EventStore.stream_all_forward() |> Enum.to_list() + {:ok, stream_info} = EventStore.stream_info(stream_uuid) + + assert event1.created_at == created_at + assert stream_info.created_at == created_at + assert event2.created_at == created_at2 + end + test "override created_at any_version" do created_at = DateTime.utc_now() |> DateTime.add(-1, :day) stream_uuid = UUID.uuid4() @@ -231,6 +249,24 @@ defmodule EventStore.EventStoreTest do assert stream_info.created_at == created_at end + test "override created_at any_version existing stream" do + created_at = DateTime.utc_now() |> DateTime.add(-1, :day) + created_at2 = DateTime.utc_now() |> DateTime.add(-1, :hour) + stream_uuid = UUID.uuid4() + events = EventFactory.create_events(1) + events2 = EventFactory.create_events(1) + + :ok = EventStore.append_to_stream(stream_uuid, :any_version, events, created_at: created_at) + :ok = EventStore.append_to_stream(stream_uuid, :any_version, events2, created_at: created_at2) + + [event1, event2] = EventStore.stream_all_forward() |> Enum.to_list() + {:ok, stream_info} = EventStore.stream_info(stream_uuid) + + assert event1.created_at == created_at + assert stream_info.created_at == created_at + assert event2.created_at == created_at2 + end + describe "transient subscription" do test "should notify subscribers after event persisted to stream" do stream_uuid = UUID.uuid4() From 462d321f76f4a7e3867ff38cc2006d63e9579968 Mon Sep 17 00:00:00 2001 From: John Bell Date: Thu, 21 Mar 2024 00:43:24 +0000 Subject: [PATCH 3/4] Minor refactor for readability --- lib/event_store/storage/appender.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 27b192ed..46d1d0b1 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -115,7 +115,7 @@ defmodule EventStore.Storage.Appender do params = [stream_id_or_uuid, event_count] |> Enum.concat(build_insert_parameters(events)) - |> maybe_append(created_at, stream_id) + |> append_if(!stream_id, created_at) case Postgrex.query(conn, statement, params, opts) do {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} @@ -124,8 +124,8 @@ defmodule EventStore.Storage.Appender do end end - defp maybe_append(params, value, nil) when not is_nil(value), do: params ++ [value] - defp maybe_append(params, _, _), do: params + defp append_if(params, true, value) when not is_nil(value), do: params ++ [value] + defp append_if(params, _, _), do: params defp build_insert_parameters(events) do events From 705a8e24ef83fed9c855f214d2bfcddcd300174a Mon Sep 17 00:00:00 2001 From: John Bell Date: Thu, 19 Sep 2024 20:26:16 +0100 Subject: [PATCH 4/4] Code review: rename override and move extraction of keywords to apend_to_stream function --- lib/event_store.ex | 15 ++++++--------- lib/event_store/storage/appender.ex | 2 +- lib/event_store/streams/stream.ex | 2 +- test/event_store_test.exs | 26 ++++++++++++++++++++------ 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index 5ad4207c..a02560a8 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -294,13 +294,17 @@ defmodule EventStore do Supervisor.stop(supervisor, :normal, timeout) end + @accepted_overrides_append_to_stream [:created_at_override] + def append_to_stream(stream_uuid, expected_version, events, opts \\ []) def append_to_stream(@all_stream, _expected_version, _events, _opts), do: {:error, :cannot_append_to_all_stream} def append_to_stream(stream_uuid, expected_version, events, opts) do + overrides = Keyword.take(opts, @accepted_overrides_append_to_stream) {conn, opts} = parse_opts(opts) + opts = Keyword.merge(opts, overrides) Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) end @@ -527,16 +531,9 @@ defmodule EventStore do Snapshotter.delete_snapshot(conn, source_uuid, opts) end - @accepted_overrides [:created_at] defp parse_opts(opts) do - overrides = Keyword.take(opts, @accepted_overrides) - - config = - opts - |> name() - |> Config.lookup() - |> Keyword.merge(overrides) - + name = name(opts) + config = Config.lookup(name) conn = Keyword.get(opts, :conn) || Keyword.fetch!(config, :conn) timeout = timeout(opts, config) diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 46d1d0b1..0b71d3e4 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -99,7 +99,7 @@ defmodule EventStore.Storage.Appender do defp insert_event_batch(conn, stream_id, stream_uuid, events, event_count, opts) do {schema, opts} = Keyword.pop(opts, :schema) {expected_version, opts} = Keyword.pop(opts, :expected_version) - {created_at, opts} = Keyword.pop(opts, :created_at) + {created_at, opts} = Keyword.pop(opts, :created_at_override) statement = case expected_version do diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 2cd1a998..2221b615 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -154,7 +154,7 @@ defmodule EventStore.Streams.Stream do %StreamInfo{stream_uuid: stream_uuid, stream_version: stream_version} = stream events - |> Enum.map(&map_to_recorded_event(&1, opts[:created_at] || utc_now(), serializer)) + |> Enum.map(&map_to_recorded_event(&1, opts[:created_at_override] || utc_now(), serializer)) |> Enum.with_index(1) |> Enum.map(fn {recorded_event, index} -> %RecordedEvent{ diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 32b943cd..66aa64e5 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -208,7 +208,7 @@ defmodule EventStore.EventStoreTest do stream_uuid = UUID.uuid4() events = EventFactory.create_events(1) - :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at: created_at) + :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at_override: created_at) [recorded_event] = EventStore.stream_all_forward() |> Enum.to_list() {:ok, stream_info} = EventStore.stream_info(stream_uuid) @@ -224,8 +224,12 @@ defmodule EventStore.EventStoreTest do events = EventFactory.create_events(1) events2 = EventFactory.create_events(1) - :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at: created_at) - :ok = EventStore.append_to_stream(stream_uuid, :any_version, events2, created_at: created_at2) + :ok = EventStore.append_to_stream(stream_uuid, 0, events, created_at_override: created_at) + + :ok = + EventStore.append_to_stream(stream_uuid, :any_version, events2, + created_at_override: created_at2 + ) [event1, event2] = EventStore.stream_all_forward() |> Enum.to_list() {:ok, stream_info} = EventStore.stream_info(stream_uuid) @@ -240,7 +244,10 @@ defmodule EventStore.EventStoreTest do stream_uuid = UUID.uuid4() events = EventFactory.create_events(1) - :ok = EventStore.append_to_stream(stream_uuid, :any_version, events, created_at: created_at) + :ok = + EventStore.append_to_stream(stream_uuid, :any_version, events, + created_at_override: created_at + ) [recorded_event] = EventStore.stream_all_forward() |> Enum.to_list() {:ok, stream_info} = EventStore.stream_info(stream_uuid) @@ -256,8 +263,15 @@ defmodule EventStore.EventStoreTest do events = EventFactory.create_events(1) events2 = EventFactory.create_events(1) - :ok = EventStore.append_to_stream(stream_uuid, :any_version, events, created_at: created_at) - :ok = EventStore.append_to_stream(stream_uuid, :any_version, events2, created_at: created_at2) + :ok = + EventStore.append_to_stream(stream_uuid, :any_version, events, + created_at_override: created_at + ) + + :ok = + EventStore.append_to_stream(stream_uuid, :any_version, events2, + created_at_override: created_at2 + ) [event1, event2] = EventStore.stream_all_forward() |> Enum.to_list() {:ok, stream_info} = EventStore.stream_info(stream_uuid)