Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add set_update_hook/2 #260

Merged
merged 8 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion c_src/sqlite3_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
#define MAX_PATHNAME 512

static ErlNifResourceType* connection_type = NULL;
static ErlNifResourceType* statement_type = NULL;
static ErlNifResourceType* statement_type = NULL;
static sqlite3_mem_methods default_alloc_methods = {0};

typedef struct connection
{
sqlite3* db;
ErlNifMutex* mutex;
ErlNifPid update_hook_pid;
} connection_t;
Comment on lines 22 to 27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could capture the App.Repo atom and store it with the connection and pass that in the callback instead of the raw db name. This would probably change how the sqlite database is opened.


typedef struct statement
Expand Down Expand Up @@ -999,6 +1000,73 @@ exqlite_enable_load_extension(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
return make_atom(env, "ok");
}

//
// Data Change Notifications
//

void
update_callback(void* arg, int sqlite_operation_type, char const* sqlite_database, char const* sqlite_table, sqlite3_int64 sqlite_rowid)
{
connection_t* conn = (connection_t*)arg;

if (conn == NULL) {
return;
}

ErlNifEnv* msg_env = enif_alloc_env();
ERL_NIF_TERM change_type;

switch (sqlite_operation_type) {
case SQLITE_INSERT:
change_type = make_atom(msg_env, "insert");
break;
case SQLITE_DELETE:
change_type = make_atom(msg_env, "delete");
break;
case SQLITE_UPDATE:
change_type = make_atom(msg_env, "update");
break;
default:
return;
}
ERL_NIF_TERM rowid = enif_make_int64(msg_env, sqlite_rowid);
ERL_NIF_TERM database = make_binary(msg_env, sqlite_database, strlen(sqlite_database));
ERL_NIF_TERM table = make_binary(msg_env, sqlite_table, strlen(sqlite_table));
ERL_NIF_TERM msg = enif_make_tuple4(msg_env, change_type, database, table, rowid);

if (!enif_send(NULL, &conn->update_hook_pid, msg_env, msg)) {
sqlite3_update_hook(conn->db, NULL, NULL);
}

enif_free_env(msg_env);
}

static ERL_NIF_TERM
exqlite_set_update_hook(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
assert(env);
connection_t* conn = NULL;

if (argc != 2) {
return enif_make_badarg(env);
}

if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
return make_error_tuple(env, "invalid_connection");
}

if (!enif_get_local_pid(env, argv[1], &conn->update_hook_pid)) {
return make_error_tuple(env, "invalid_pid");
}

// Passing the connection as the third argument causes it to be
// passed as the first argument to update_callback. This allows us
// to extract the hook pid and reset the hook if the pid is not alive.
sqlite3_update_hook(conn->db, update_callback, conn);

return make_atom(env, "ok");
}

//
// Most of our nif functions are going to be IO bounded
//
Expand All @@ -1019,6 +1087,7 @@ static ErlNifFunc nif_funcs[] = {
{"deserialize", 3, exqlite_deserialize, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"release", 2, exqlite_release, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"enable_load_extension", 2, exqlite_enable_load_extension, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_update_hook", 2, exqlite_set_update_hook, ERL_NIF_DIRTY_JOB_IO_BOUND},
};

ERL_NIF_INIT(Elixir.Exqlite.Sqlite3NIF, nif_funcs, on_load, NULL, NULL, on_unload)
30 changes: 30 additions & 0 deletions lib/exqlite/sqlite3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,36 @@ defmodule Exqlite.Sqlite3 do
end
end

@doc """
Send data change notifications to a process.

Each time an insert, update, or delete is performed on the connection provided
as the first argument, a message will be sent to the pid provided as the second argument.

The message is of the form: `{action, db_name, table, row_id}`, where:

* `action` is one of `:insert`, `:update` or `:delete`
* `db_name` is a string representing the database name where the change took place
* `table` is a string representing the table name where the change took place
* `row_id` is an integer representing the unique row id assigned by SQLite

## Restrictions

* There are some conditions where the update hook will not be invoked by SQLite.
See the documentation for [more details](https://www.sqlite.org/c3ref/update_hook.html)
* Only one pid can listen to the changes on a given database connection at a time.
If this function is called multiple times for the same connection, only the last pid will
receive the notifications
* Updates only happen for the connection that is opened. For example, there
are two connections A and B. When an update happens on connection B, the
hook set for connection A will not receive the update, but the hook for
connection B will receive the update.
"""
greg-rychlewski marked this conversation as resolved.
Show resolved Hide resolved
@spec set_update_hook(db(), pid()) :: :ok | {:error, reason()}
def set_update_hook(conn, pid) do
Sqlite3NIF.set_update_hook(conn, pid)
end

defp convert(%Date{} = val), do: Date.to_iso8601(val)
defp convert(%Time{} = val), do: Time.to_iso8601(val)
defp convert(%NaiveDateTime{} = val), do: NaiveDateTime.to_iso8601(val)
Expand Down
3 changes: 3 additions & 0 deletions lib/exqlite/sqlite3_nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ defmodule Exqlite.Sqlite3NIF do
@spec enable_load_extension(db(), integer()) :: :ok | {:error, reason()}
def enable_load_extension(_conn, _flag), do: :erlang.nif_error(:not_loaded)

@spec set_update_hook(db(), pid()) :: :ok | {:error, reason()}
def set_update_hook(_conn, _pid), do: :erlang.nif_error(:not_loaded)

# add statement inspection tooling https://sqlite.org/c3ref/expanded_sql.html
end
68 changes: 68 additions & 0 deletions test/exqlite/sqlite3_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,72 @@ defmodule Exqlite.Sqlite3Test do
assert {:row, [1, "hello"]} = Sqlite3.step(conn, statement)
end
end

describe "set_update_hook/2" do
defmodule ChangeListener do
use GenServer

def start_link({parent, name}),
do: GenServer.start_link(__MODULE__, {parent, name})

def init({parent, name}), do: {:ok, {parent, name}}

def handle_info({_action, _db, _table, _row_id} = change, {parent, name}) do
send(parent, {change, name})
{:noreply, {parent, name}}
end
end

setup do
{:ok, path} = Temp.path()
{:ok, conn} = Sqlite3.open(path)
:ok = Sqlite3.execute(conn, "create table test(num integer)")

on_exit(fn ->
Sqlite3.close(conn)
File.rm(path)
end)

[conn: conn, path: path]
end

test "can listen to data change notifications", context do
{:ok, listener_pid} = ChangeListener.start_link({self(), :listener})
Sqlite3.set_update_hook(context.conn, listener_pid)

:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (11)")
:ok = Sqlite3.execute(context.conn, "update test set num = 1000")
:ok = Sqlite3.execute(context.conn, "delete from test where num = 1000")

assert_receive {{:insert, "main", "test", 1}, _}, 1000
assert_receive {{:insert, "main", "test", 2}, _}, 1000
assert_receive {{:update, "main", "test", 1}, _}, 1000
assert_receive {{:update, "main", "test", 2}, _}, 1000
assert_receive {{:delete, "main", "test", 1}, _}, 1000
assert_receive {{:delete, "main", "test", 2}, _}, 1000
end

test "only one pid can listen at a time", context do
{:ok, listener1_pid} = ChangeListener.start_link({self(), :listener1})
{:ok, listener2_pid} = ChangeListener.start_link({self(), :listener2})

Sqlite3.set_update_hook(context.conn, listener1_pid)
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
assert_receive {{:insert, "main", "test", 1}, :listener1}, 1000

Sqlite3.set_update_hook(context.conn, listener2_pid)
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
assert_receive {{:insert, "main", "test", 2}, :listener2}, 1000
refute_receive {{:insert, "main", "test", 2}, :listener1}, 1000
end

test "notifications don't cross connections", context do
{:ok, listener_pid} = ChangeListener.start_link({self(), :listener})
{:ok, new_conn} = Sqlite3.open(context.path)
Sqlite3.set_update_hook(new_conn, listener_pid)
:ok = Sqlite3.execute(context.conn, "insert into test(num) values (10)")
refute_receive {{:insert, "main", "test", 1}, _}, 1000
end
end
end
Loading