diff --git a/lib/exqlite.ex b/lib/exqlite.ex index af42a7f5..63c0590c 100644 --- a/lib/exqlite.ex +++ b/lib/exqlite.ex @@ -1,104 +1,104 @@ -defmodule Exqlite do - @moduledoc """ - SQLite3 driver for Elixir. - """ - - alias Exqlite.Connection - alias Exqlite.Error - alias Exqlite.Query - alias Exqlite.Result - - @doc "See `Exqlite.Connection.connect/1`" - @spec start_link([Connection.connection_opt()]) :: {:ok, pid()} | {:error, Error.t()} - def start_link(opts) do - DBConnection.start_link(Connection, opts) - end - - @spec query(DBConnection.conn(), iodata(), list(), list()) :: - {:ok, Result.t()} | {:error, Exception.t()} - def query(conn, statement, params \\ [], opts \\ []) do - query = %Query{name: "", statement: IO.iodata_to_binary(statement)} - - case DBConnection.prepare_execute(conn, query, params, opts) do - {:ok, _query, result} -> - {:ok, result} - - otherwise -> - otherwise - end - end - - @spec query!(DBConnection.conn(), iodata(), list(), list()) :: Result.t() - def query!(conn, statement, params \\ [], opts \\ []) do - case query(conn, statement, params, opts) do - {:ok, result} -> result - {:error, err} -> raise err - end - end - - @spec prepare(DBConnection.conn(), iodata(), list()) :: - {:ok, Query.t()} | {:error, Exception.t()} - def prepare(conn, name, statement, opts \\ []) do - query = %Query{name: name, statement: statement} - DBConnection.prepare(conn, query, opts) - end - - @spec prepare!(DBConnection.conn(), iodata(), iodata(), list()) :: Query.t() - def prepare!(conn, name, statement, opts \\ []) do - query = %Query{name: name, statement: statement} - DBConnection.prepare!(conn, query, opts) - end - - @spec prepare_execute(DBConnection.conn(), iodata(), iodata(), list(), list()) :: - {:ok, Query.t(), Result.t()} | {:error, Error.t()} - def prepare_execute(conn, name, statement, params, opts \\ []) do - query = %Query{name: name, statement: statement} - DBConnection.prepare_execute(conn, query, params, opts) - end - - @spec prepare_execute!(DBConnection.conn(), iodata(), iodata(), list(), list()) :: - {Query.t(), Result.t()} - def prepare_execute!(conn, name, statement, params, opts \\ []) do - query = %Query{name: name, statement: statement} - DBConnection.prepare_execute!(conn, query, params, opts) - end - - @spec execute(DBConnection.conn(), Query.t(), list(), list()) :: - {:ok, Result.t()} | {:error, Error.t()} - def execute(conn, query, params, opts \\ []) do - DBConnection.execute(conn, query, params, opts) - end - - @spec execute!(DBConnection.conn(), Query.t(), list(), list()) :: Result.t() - def execute!(conn, query, params, opts \\ []) do - DBConnection.execute!(conn, query, params, opts) - end - - @spec close(DBConnection.conn(), Query.t(), list()) :: :ok | {:error, Exception.t()} - def close(conn, query, opts \\ []) do - with {:ok, _} <- DBConnection.close(conn, query, opts) do - :ok - end - end - - @spec close!(DBConnection.conn(), Query.t(), list()) :: :ok - def close!(conn, query, opts \\ []) do - DBConnection.close!(conn, query, opts) - :ok - end - - @spec transaction(DBConnection.conn(), (DBConnection.t() -> result), list()) :: - {:ok, result} | {:error, any} - when result: var - def transaction(conn, fun, opts \\ []) do - DBConnection.transaction(conn, fun, opts) - end - - @spec rollback(DBConnection.t(), term()) :: no_return() - def rollback(conn, reason), do: DBConnection.rollback(conn, reason) - - @spec child_spec([Connection.connection_opt()]) :: :supervisor.child_spec() - def child_spec(opts) do - DBConnection.child_spec(Connection, opts) - end -end +# defmodule Exqlite do +# @moduledoc """ +# SQLite3 driver for Elixir. +# """ + +# alias Exqlite.Connection +# alias Exqlite.Error +# alias Exqlite.Query +# alias Exqlite.Result + +# @doc "See `Exqlite.Connection.connect/1`" +# @spec start_link([Connection.connection_opt()]) :: {:ok, pid()} | {:error, Error.t()} +# def start_link(opts) do +# DBConnection.start_link(Connection, opts) +# end + +# @spec query(DBConnection.conn(), iodata(), list(), list()) :: +# {:ok, Result.t()} | {:error, Exception.t()} +# def query(conn, statement, params \\ [], opts \\ []) do +# query = %Query{name: "", statement: IO.iodata_to_binary(statement)} + +# case DBConnection.prepare_execute(conn, query, params, opts) do +# {:ok, _query, result} -> +# {:ok, result} + +# otherwise -> +# otherwise +# end +# end + +# @spec query!(DBConnection.conn(), iodata(), list(), list()) :: Result.t() +# def query!(conn, statement, params \\ [], opts \\ []) do +# case query(conn, statement, params, opts) do +# {:ok, result} -> result +# {:error, err} -> raise err +# end +# end + +# @spec prepare(DBConnection.conn(), iodata(), list()) :: +# {:ok, Query.t()} | {:error, Exception.t()} +# def prepare(conn, name, statement, opts \\ []) do +# query = %Query{name: name, statement: statement} +# DBConnection.prepare(conn, query, opts) +# end + +# @spec prepare!(DBConnection.conn(), iodata(), iodata(), list()) :: Query.t() +# def prepare!(conn, name, statement, opts \\ []) do +# query = %Query{name: name, statement: statement} +# DBConnection.prepare!(conn, query, opts) +# end + +# @spec prepare_execute(DBConnection.conn(), iodata(), iodata(), list(), list()) :: +# {:ok, Query.t(), Result.t()} | {:error, Error.t()} +# def prepare_execute(conn, name, statement, params, opts \\ []) do +# query = %Query{name: name, statement: statement} +# DBConnection.prepare_execute(conn, query, params, opts) +# end + +# @spec prepare_execute!(DBConnection.conn(), iodata(), iodata(), list(), list()) :: +# {Query.t(), Result.t()} +# def prepare_execute!(conn, name, statement, params, opts \\ []) do +# query = %Query{name: name, statement: statement} +# DBConnection.prepare_execute!(conn, query, params, opts) +# end + +# @spec execute(DBConnection.conn(), Query.t(), list(), list()) :: +# {:ok, Result.t()} | {:error, Error.t()} +# def execute(conn, query, params, opts \\ []) do +# DBConnection.execute(conn, query, params, opts) +# end + +# @spec execute!(DBConnection.conn(), Query.t(), list(), list()) :: Result.t() +# def execute!(conn, query, params, opts \\ []) do +# DBConnection.execute!(conn, query, params, opts) +# end + +# @spec close(DBConnection.conn(), Query.t(), list()) :: :ok | {:error, Exception.t()} +# def close(conn, query, opts \\ []) do +# with {:ok, _} <- DBConnection.close(conn, query, opts) do +# :ok +# end +# end + +# @spec close!(DBConnection.conn(), Query.t(), list()) :: :ok +# def close!(conn, query, opts \\ []) do +# DBConnection.close!(conn, query, opts) +# :ok +# end + +# @spec transaction(DBConnection.conn(), (DBConnection.t() -> result), list()) :: +# {:ok, result} | {:error, any} +# when result: var +# def transaction(conn, fun, opts \\ []) do +# DBConnection.transaction(conn, fun, opts) +# end + +# @spec rollback(DBConnection.t(), term()) :: no_return() +# def rollback(conn, reason), do: DBConnection.rollback(conn, reason) + +# @spec child_spec([Connection.connection_opt()]) :: :supervisor.child_spec() +# def child_spec(opts) do +# DBConnection.child_spec(Connection, opts) +# end +# end diff --git a/lib/exqlite/basic.ex b/lib/exqlite/basic.ex index 73a33e0d..bee76374 100644 --- a/lib/exqlite/basic.ex +++ b/lib/exqlite/basic.ex @@ -3,46 +3,40 @@ defmodule Exqlite.Basic do A very basis API without lots of options to allow simpler usage for basic needs. """ - alias Exqlite.Connection - alias Exqlite.Query alias Exqlite.Sqlite3 alias Exqlite.Error - alias Exqlite.Result def open(path) do - Connection.connect(database: path) + Sqlite3.open(path) end - def close(%Connection{} = conn) do - case Sqlite3.close(conn.db) do + def close(db) do + case Sqlite3.close(db) do :ok -> :ok - {:error, reason} -> {:error, %Error{message: to_string(reason)}} + {:error, reason} -> {:error, Error.exception(message: to_string(reason))} end end - def exec(%Connection{} = conn, stmt, args \\ []) do - %Query{statement: stmt} |> Connection.handle_execute(args, [], conn) + def exec(db, stmt, args \\ []) do + with {:ok, stmt} <- Sqlite3.prepare(db, stmt), + :ok <- maybe_bind(db, stmt, args), + {:ok, columns} <- Sqlite3.columns(db, stmt), + {:ok, rows} <- Sqlite3.fetch_all(db, stmt), + do: {:ok, rows, columns} end - def rows(exec_result) do - case exec_result do - {:ok, %Query{}, %Result{rows: rows, columns: columns}, %Connection{}} -> - {:ok, rows, columns} - - {:error, %Error{message: message}, %Connection{}} -> - {:error, to_string(message)} - end + def load_extension(db, path) do + exec(db, "select load_extension(?)", [path]) end - def load_extension(conn, path) do - exec(conn, "select load_extension(?)", [path]) + def enable_load_extension(db) do + Sqlite3.enable_load_extension(db, true) end - def enable_load_extension(conn) do - Sqlite3.enable_load_extension(conn.db, true) + def disable_load_extension(db) do + Sqlite3.enable_load_extension(db, false) end - def disable_load_extension(conn) do - Sqlite3.enable_load_extension(conn.db, false) - end + defp maybe_bind(_db, _stmt, []), do: :ok + defp maybe_bind(db, stmt, params), do: Sqlite3.bind(db, stmt, params) end diff --git a/lib/exqlite/connection.ex b/lib/exqlite/connection.ex index 09624821..acd0a2f9 100644 --- a/lib/exqlite/connection.ex +++ b/lib/exqlite/connection.ex @@ -1,706 +1,706 @@ -defmodule Exqlite.Connection do - @moduledoc """ - This module implements connection details as defined in DBProtocol. - - ## Attributes - - - `db` - The sqlite3 database reference. - - `path` - The path that was used to open. - - `transaction_status` - The status of the connection. Can be `:idle` or `:transaction`. - - ## Unknowns - - - How are pooled connections going to work? Since sqlite3 doesn't allow for - simultaneous access. We would need to check if the write ahead log is - enabled on the database. We can't assume and set the WAL pragma because the - database may be stored on a network volume which would cause potential - issues. - - Notes: - - we try to closely follow structure and naming convention of myxql. - - sqlite thrives when there are many small conventions, so we may not implement - some strategies employed by other adapters. See https://sqlite.org/np1queryprob.html - """ - - use DBConnection - alias Exqlite.Error - alias Exqlite.Pragma - alias Exqlite.Query - alias Exqlite.Result - alias Exqlite.Sqlite3 - require Logger - - defstruct [ - :db, - :directory, - :path, - :transaction_status, - :status, - :chunk_size - ] - - @type t() :: %__MODULE__{ - db: Sqlite3.db(), - directory: String.t() | nil, - path: String.t(), - transaction_status: :idle | :transaction, - status: :idle | :busy - } - - @type journal_mode() :: :delete | :truncate | :persist | :memory | :wal | :off - @type temp_store() :: :default | :file | :memory - @type synchronous() :: :extra | :full | :normal | :off - @type auto_vacuum() :: :none | :full | :incremental - @type locking_mode() :: :normal | :exclusive - - @type connection_opt() :: - {:database, String.t()} - | {:journal_mode, journal_mode()} - | {:temp_store, temp_store()} - | {:synchronous, synchronous()} - | {:foreign_keys, :on | :off} - | {:cache_size, integer()} - | {:cache_spill, :on | :off} - | {:case_sensitive_like, boolean()} - | {:auto_vacuum, auto_vacuum()} - | {:locking_mode, locking_mode()} - | {:secure_delete, :on | :off} - | {:wal_auto_check_point, integer()} - | {:busy_timeout, integer()} - | {:chunk_size, integer()} - | {:journal_size_limit, integer()} - | {:soft_heap_limit, integer()} - | {:hard_heap_limit, integer()} - | {:key, String.t()} - | {:custom_pragmas, [{keyword(), integer() | boolean() | String.t()}]} - - @impl true - @doc """ - Initializes the Ecto Exqlite adapter. - - For connection configurations we use the defaults that come with SQLite3, but - we recommend which options to choose. We do not default to the recommended - because we don't know what your environment is like. - - Allowed options: - - * `:database` - The path to the database. In memory is allowed. You can use - `:memory` or `":memory:"` to designate that. - * `:mode` - use `:readwrite` to open the database for reading and writing - or `:readonly` to open it in read-only mode. `:readwrite` will also create - the database if it doesn't already exist. Defaults to `:readwrite`. - * `:journal_mode` - Sets the journal mode for the sqlite connection. Can be - one of the following `:delete`, `:truncate`, `:persist`, `:memory`, - `:wal`, or `:off`. Defaults to `:delete`. It is recommended that you use - `:wal` due to support for concurrent reads. Note: `:wal` does not mean - concurrent writes. - * `:temp_store` - Sets the storage used for temporary tables. Default is - `:default`. Allowed values are `:default`, `:file`, `:memory`. It is - recommended that you use `:memory` for storage. - * `:synchronous` - Can be `:extra`, `:full`, `:normal`, or `:off`. Defaults - to `:normal`. - * `:foreign_keys` - Sets if foreign key checks should be enforced or not. - Can be `:on` or `:off`. Default is `:on`. - * `:cache_size` - Sets the cache size to be used for the connection. This is - an odd setting as a positive value is the number of pages in memory to use - and a negative value is the size in kilobytes to use. Default is `-2000`. - It is recommended that you use `-64000`. - * `:cache_spill` - The cache_spill pragma enables or disables the ability of - the pager to spill dirty cache pages to the database file in the middle of - a transaction. By default it is `:on`, and for most applications, it - should remain so. - * `:case_sensitive_like` - * `:auto_vacuum` - Defaults to `:none`. Can be `:none`, `:full` or - `:incremental`. Depending on the database size, `:incremental` may be - beneficial. - * `:locking_mode` - Defaults to `:normal`. Allowed values are `:normal` or - `:exclusive`. See [sqlite documentation][1] for more information. - * `:secure_delete` - Defaults to `:off`. If enabled, it will cause SQLite3 - to overwrite records that were deleted with zeros. - * `:wal_auto_check_point` - Sets the write-ahead log auto-checkpoint - interval. Default is `1000`. Setting the auto-checkpoint size to zero or a - negative value turns auto-checkpointing off. - * `:busy_timeout` - Sets the busy timeout in milliseconds for a connection. - Default is `2000`. - * `:chunk_size` - The chunk size for bulk fetching. Defaults to `50`. - * `:key` - Optional key to set during database initialization. This PRAGMA - is often used to set up database level encryption. - * `:journal_size_limit` - The size limit in bytes of the journal. - * `:soft_heap_limit` - The size limit in bytes for the heap limit. - * `:hard_heap_limit` - The size limit in bytes for the heap. - * `:custom_pragmas` - A list of custom pragmas to set on the connection, for example to configure extensions. - * `:load_extensions` - A list of paths identifying extensions to load. Defaults to `[]`. - The provided list will be merged with the global extensions list, set on `:exqlite, :load_extensions`. - Be aware that the path should handle pointing to a library compiled for the current architecture. - Example configuration: - - ``` - arch_dir = - System.cmd("uname", ["-sm"]) - |> elem(0) - |> String.trim() - |> String.replace(" ", "-") - |> String.downcase() # => "darwin-arm64" - - config :myapp, arch_dir: arch_dir - - # global - config :exqlite, load_extensions: [ "./priv/sqlite/\#{arch_dir}/rotate" ] - - # per connection in a Phoenix app - config :myapp, Myapp.Repo, - database: "path/to/db", - load_extensions: [ - "./priv/sqlite/\#{arch_dir}/vector0", - "./priv/sqlite/\#{arch_dir}/vss0" - ] - ``` - - For more information about the options above, see [sqlite documentation][1] - - [1]: https://www.sqlite.org/pragma.html - """ - @spec connect([connection_opt()]) :: {:ok, t()} | {:error, Exception.t()} - def connect(options) do - database = Keyword.get(options, :database) - - options = - Keyword.put_new( - options, - :chunk_size, - Application.get_env(:exqlite, :default_chunk_size, 50) - ) - - case database do - nil -> - {:error, - %Error{ - message: """ - You must provide a :database to the database. \ - Example: connect(database: "./") or connect(database: :memory)\ - """ - }} - - :memory -> - do_connect(":memory:", options) - - _ -> - do_connect(database, options) - end - end - - @impl true - def disconnect(_err, %__MODULE__{db: db}) do - case Sqlite3.close(db) do - :ok -> :ok - {:error, reason} -> {:error, %Error{message: to_string(reason)}} - end - end - - @impl true - def checkout(%__MODULE__{status: :idle} = state) do - {:ok, %{state | status: :busy}} - end - - def checkout(%__MODULE__{status: :busy} = state) do - {:disconnect, %Error{message: "Database is busy"}, state} - end - - @impl true - def ping(state), do: {:ok, state} - - ## - ## Handlers - ## - - @impl true - def handle_prepare(%Query{} = query, options, state) do - with {:ok, query} <- prepare(query, options, state) do - {:ok, query, state} - end - end - - @impl true - def handle_execute(%Query{} = query, params, options, state) do - with {:ok, query} <- prepare(query, options, state) do - execute(:execute, query, params, state) - end - end - - @doc """ - Begin a transaction. - - For full info refer to sqlite docs: https://sqlite.org/lang_transaction.html - - Note: default transaction mode is DEFERRED. - """ - @impl true - def handle_begin(options, %{transaction_status: transaction_status} = state) do - # This doesn't handle more than 2 levels of transactions. - # - # One possible solution would be to just track the number of open - # transactions and use that for driving the transaction status being idle or - # in a transaction. - # - # I do not know why the other official adapters do not track this and just - # append level on the savepoint. Instead the rollbacks would just completely - # revert the issues when it may be desirable to fix something while in the - # transaction and then commit. - case Keyword.get(options, :mode, :deferred) do - :deferred when transaction_status == :idle -> - handle_transaction(:begin, "BEGIN TRANSACTION", state) - - :transaction when transaction_status == :idle -> - handle_transaction(:begin, "BEGIN TRANSACTION", state) - - :immediate when transaction_status == :idle -> - handle_transaction(:begin, "BEGIN IMMEDIATE TRANSACTION", state) - - :exclusive when transaction_status == :idle -> - handle_transaction(:begin, "BEGIN EXCLUSIVE TRANSACTION", state) - - mode - when mode in [:deferred, :immediate, :exclusive, :savepoint] and - transaction_status == :transaction -> - handle_transaction(:begin, "SAVEPOINT exqlite_savepoint", state) - end - end - - @impl true - def handle_commit(options, %{transaction_status: transaction_status} = state) do - case Keyword.get(options, :mode, :deferred) do - :savepoint when transaction_status == :transaction -> - handle_transaction( - :commit_savepoint, - "RELEASE SAVEPOINT exqlite_savepoint", - state - ) - - mode - when mode in [:deferred, :immediate, :exclusive, :transaction] and - transaction_status == :transaction -> - handle_transaction(:commit, "COMMIT", state) - end - end - - @impl true - def handle_rollback(options, %{transaction_status: transaction_status} = state) do - case Keyword.get(options, :mode, :deferred) do - :savepoint when transaction_status == :transaction -> - with {:ok, _result, state} <- - handle_transaction( - :rollback_savepoint, - "ROLLBACK TO SAVEPOINT exqlite_savepoint", - state - ) do - handle_transaction( - :rollback_savepoint, - "RELEASE SAVEPOINT exqlite_savepoint", - state - ) - end - - mode - when mode in [:deferred, :immediate, :exclusive, :transaction] -> - handle_transaction(:rollback, "ROLLBACK TRANSACTION", state) - end - end - - @doc """ - Close a query prepared by `handle_prepare/3` with the database. Return - `{:ok, result, state}` on success and to continue, - `{:error, exception, state}` to return an error and continue, or - `{:disconnect, exception, state}` to return an error and disconnect. - - This callback is called in the client process. - """ - @impl true - def handle_close(query, _opts, state) do - Sqlite3.release(state.db, query.ref) - {:ok, nil, state} - end - - @impl true - def handle_declare(%Query{} = query, params, opts, state) do - # We emulate cursor functionality by just using a prepared statement and - # step through it. Thus we just return the query ref as the cursor. - with {:ok, query} <- prepare_no_cache(query, opts, state), - {:ok, query} <- bind_params(query, params, state) do - {:ok, query, query.ref, state} - end - end - - @impl true - def handle_deallocate(%Query{} = query, _cursor, _opts, state) do - Sqlite3.release(state.db, query.ref) - {:ok, nil, state} - end - - @impl true - def handle_fetch(%Query{statement: statement}, cursor, opts, state) do - chunk_size = opts[:chunk_size] || opts[:max_rows] || state.chunk_size - - case Sqlite3.multi_step(state.db, cursor, chunk_size) do - {:done, rows} -> - {:halt, %Result{rows: rows, command: :fetch, num_rows: length(rows)}, state} - - {:rows, rows} -> - {:cont, %Result{rows: rows, command: :fetch, num_rows: chunk_size}, state} - - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - - :busy -> - {:error, %Error{message: "Database is busy", statement: statement}, state} - end - end - - @impl true - def handle_status(_opts, state) do - {state.transaction_status, state} - end - - ### ---------------------------------- - # Internal functions and helpers - ### ---------------------------------- - - defp set_pragma(db, pragma_name, value) do - Sqlite3.execute(db, "PRAGMA #{pragma_name} = #{value}") - end - - defp get_pragma(db, pragma_name) do - {:ok, statement} = Sqlite3.prepare(db, "PRAGMA #{pragma_name}") - - case Sqlite3.fetch_all(db, statement) do - {:ok, [[value]]} -> {:ok, value} - _ -> :error - end - end - - defp maybe_set_pragma(db, pragma_name, value) do - case get_pragma(db, pragma_name) do - {:ok, current} -> - if current == value do - :ok - else - set_pragma(db, pragma_name, value) - end - - _ -> - set_pragma(db, pragma_name, value) - end - end - - defp set_key(db, options) do - # we can't use maybe_set_pragma here since - # the only thing that will work on an encrypted - # database without error is setting the key. - case Keyword.fetch(options, :key) do - {:ok, key} -> set_pragma(db, "key", key) - _ -> :ok - end - end - - defp set_custom_pragmas(db, options) do - # we can't use maybe_set_pragma because some pragmas - # are required to be set before the database is e.g. decrypted. - case Keyword.fetch(options, :custom_pragmas) do - {:ok, list} -> do_set_custom_pragmas(db, list) - _ -> :ok - end - end - - defp do_set_custom_pragmas(db, list) do - list - |> Enum.reduce_while(:ok, fn {key, value}, :ok -> - case set_pragma(db, key, value) do - :ok -> {:cont, :ok} - {:error, _reason} -> {:halt, :error} - end - end) - end - - defp set_pragma_if_present(_db, _pragma, nil), do: :ok - defp set_pragma_if_present(db, pragma, value), do: set_pragma(db, pragma, value) - - defp set_journal_size_limit(db, options) do - set_pragma_if_present( - db, - "journal_size_limit", - Keyword.get(options, :journal_size_limit) - ) - end - - defp set_soft_heap_limit(db, options) do - set_pragma_if_present(db, "soft_heap_limit", Keyword.get(options, :soft_heap_limit)) - end - - defp set_hard_heap_limit(db, options) do - set_pragma_if_present(db, "hard_heap_limit", Keyword.get(options, :hard_heap_limit)) - end - - defp set_journal_mode(db, options) do - maybe_set_pragma(db, "journal_mode", Pragma.journal_mode(options)) - end - - defp set_temp_store(db, options) do - set_pragma(db, "temp_store", Pragma.temp_store(options)) - end - - defp set_synchronous(db, options) do - set_pragma(db, "synchronous", Pragma.synchronous(options)) - end - - defp set_foreign_keys(db, options) do - set_pragma(db, "foreign_keys", Pragma.foreign_keys(options)) - end - - defp set_cache_size(db, options) do - maybe_set_pragma(db, "cache_size", Pragma.cache_size(options)) - end - - defp set_cache_spill(db, options) do - set_pragma(db, "cache_spill", Pragma.cache_spill(options)) - end - - defp set_case_sensitive_like(db, options) do - set_pragma(db, "case_sensitive_like", Pragma.case_sensitive_like(options)) - end - - defp set_auto_vacuum(db, options) do - set_pragma(db, "auto_vacuum", Pragma.auto_vacuum(options)) - end - - defp set_locking_mode(db, options) do - set_pragma(db, "locking_mode", Pragma.locking_mode(options)) - end - - defp set_secure_delete(db, options) do - set_pragma(db, "secure_delete", Pragma.secure_delete(options)) - end - - defp set_wal_auto_check_point(db, options) do - set_pragma(db, "wal_autocheckpoint", Pragma.wal_auto_check_point(options)) - end - - defp set_busy_timeout(db, options) do - set_pragma(db, "busy_timeout", Pragma.busy_timeout(options)) - end - - defp load_extensions(db, options) do - global_extensions = Application.get_env(:exqlite, :load_extensions, []) - - extensions = - Keyword.get(options, :load_extensions, []) - |> Enum.concat(global_extensions) - |> Enum.uniq() - - do_load_extensions(db, extensions) - end - - defp do_load_extensions(_db, []), do: :ok - - defp do_load_extensions(db, extensions) do - Sqlite3.enable_load_extension(db, true) - - Enum.each(extensions, fn extension -> - Logger.debug(fn -> "Exqlite: loading extension `#{extension}`" end) - Sqlite3.execute(db, "SELECT load_extension('#{extension}')") - end) - - Sqlite3.enable_load_extension(db, false) - end - - defp do_connect(database, options) do - with {:ok, directory} <- resolve_directory(database), - :ok <- mkdir_p(directory), - {:ok, db} <- Sqlite3.open(database, options), - :ok <- set_key(db, options), - :ok <- set_custom_pragmas(db, options), - :ok <- set_journal_mode(db, options), - :ok <- set_temp_store(db, options), - :ok <- set_synchronous(db, options), - :ok <- set_foreign_keys(db, options), - :ok <- set_cache_size(db, options), - :ok <- set_cache_spill(db, options), - :ok <- set_auto_vacuum(db, options), - :ok <- set_locking_mode(db, options), - :ok <- set_secure_delete(db, options), - :ok <- set_wal_auto_check_point(db, options), - :ok <- set_case_sensitive_like(db, options), - :ok <- set_busy_timeout(db, options), - :ok <- set_journal_size_limit(db, options), - :ok <- set_soft_heap_limit(db, options), - :ok <- set_hard_heap_limit(db, options), - :ok <- load_extensions(db, options) do - state = %__MODULE__{ - db: db, - directory: directory, - path: database, - transaction_status: :idle, - status: :idle, - chunk_size: Keyword.get(options, :chunk_size) - } - - {:ok, state} - else - {:error, reason} -> - {:error, %Exqlite.Error{message: to_string(reason)}} - end - end - - def maybe_put_command(query, options) do - case Keyword.get(options, :command) do - nil -> query - command -> %{query | command: command} - end - end - - # Attempt to retrieve the cached query, if it doesn't exist, we'll prepare one - # and cache it for later. - defp prepare(%Query{statement: statement} = query, options, state) do - query = maybe_put_command(query, options) - - with {:ok, ref} <- Sqlite3.prepare(state.db, IO.iodata_to_binary(statement)), - query <- %{query | ref: ref} do - {:ok, query} - else - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - end - end - - # Prepare a query and do not cache it. - defp prepare_no_cache(%Query{statement: statement} = query, options, state) do - query = maybe_put_command(query, options) - - case Sqlite3.prepare(state.db, statement) do - {:ok, ref} -> - {:ok, %{query | ref: ref}} - - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - end - end - - @spec maybe_changes(Sqlite3.db(), Query.t()) :: integer() | nil - defp maybe_changes(db, %Query{command: command}) - when command in [:update, :insert, :delete] do - case Sqlite3.changes(db) do - {:ok, total} -> total - _ -> nil - end - end - - defp maybe_changes(_, _), do: nil - - # when we have an empty list of columns, that signifies that - # there was no possible return tuple (e.g., update statement without RETURNING) - # and in that case, we return nil to signify no possible result. - defp maybe_rows([], []), do: nil - defp maybe_rows(rows, _cols), do: rows - - defp execute(call, %Query{} = query, params, state) do - with {:ok, query} <- bind_params(query, params, state), - {:ok, columns} <- get_columns(query, state), - {:ok, rows} <- get_rows(query, state), - {:ok, transaction_status} <- Sqlite3.transaction_status(state.db), - changes <- maybe_changes(state.db, query) do - case query.command do - command when command in [:delete, :insert, :update] -> - { - :ok, - query, - Result.new( - command: call, - num_rows: changes, - rows: maybe_rows(rows, columns) - ), - %{state | transaction_status: transaction_status} - } - - _ -> - { - :ok, - query, - Result.new( - command: call, - columns: columns, - rows: rows, - num_rows: Enum.count(rows) - ), - %{state | transaction_status: transaction_status} - } - end - end - end - - defp bind_params(%Query{ref: ref, statement: statement} = query, params, state) - when ref != nil do - case Sqlite3.bind(state.db, ref, params) do - :ok -> - {:ok, query} - - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - end - end - - defp get_columns(%Query{ref: ref, statement: statement}, state) do - case Sqlite3.columns(state.db, ref) do - {:ok, columns} -> - {:ok, columns} - - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - end - end - - defp get_rows(%Query{ref: ref, statement: statement}, state) do - case Sqlite3.fetch_all(state.db, ref, state.chunk_size) do - {:ok, rows} -> - {:ok, rows} - - {:error, reason} -> - {:error, %Error{message: to_string(reason), statement: statement}, state} - end - end - - defp handle_transaction(call, statement, state) do - with :ok <- Sqlite3.execute(state.db, statement), - {:ok, transaction_status} <- Sqlite3.transaction_status(state.db) do - result = %Result{ - command: call, - rows: [], - columns: [], - num_rows: 0 - } - - {:ok, result, %{state | transaction_status: transaction_status}} - else - {:error, reason} -> - {:disconnect, %Error{message: to_string(reason), statement: statement}, state} - end - end - - defp resolve_directory(":memory:"), do: {:ok, nil} - - defp resolve_directory("file:" <> _ = uri) do - case URI.parse(uri) do - %{path: path} when is_binary(path) -> - {:ok, Path.dirname(path)} - - _ -> - {:error, "No path in #{inspect(uri)}"} - end - end - - defp resolve_directory(path), do: {:ok, Path.dirname(path)} - - # SQLITE_OPEN_CREATE will create the DB file if not existing, but - # will not create intermediary directories if they are missing. - # So let's preemptively create the intermediate directories here - # before trying to open the DB file. - defp mkdir_p(nil), do: :ok - defp mkdir_p(directory), do: File.mkdir_p(directory) -end +# defmodule Exqlite.Connection do +# @moduledoc """ +# This module implements connection details as defined in DBProtocol. + +# ## Attributes + +# - `db` - The sqlite3 database reference. +# - `path` - The path that was used to open. +# - `transaction_status` - The status of the connection. Can be `:idle` or `:transaction`. + +# ## Unknowns + +# - How are pooled connections going to work? Since sqlite3 doesn't allow for +# simultaneous access. We would need to check if the write ahead log is +# enabled on the database. We can't assume and set the WAL pragma because the +# database may be stored on a network volume which would cause potential +# issues. + +# Notes: +# - we try to closely follow structure and naming convention of myxql. +# - sqlite thrives when there are many small conventions, so we may not implement +# some strategies employed by other adapters. See https://sqlite.org/np1queryprob.html +# """ + +# use DBConnection +# alias Exqlite.Error +# alias Exqlite.Pragma +# alias Exqlite.Query +# alias Exqlite.Result +# alias Exqlite.Sqlite3 +# require Logger + +# defstruct [ +# :db, +# :directory, +# :path, +# :transaction_status, +# :status, +# :chunk_size +# ] + +# @type t() :: %__MODULE__{ +# db: Sqlite3.db(), +# directory: String.t() | nil, +# path: String.t(), +# transaction_status: :idle | :transaction, +# status: :idle | :busy +# } + +# @type journal_mode() :: :delete | :truncate | :persist | :memory | :wal | :off +# @type temp_store() :: :default | :file | :memory +# @type synchronous() :: :extra | :full | :normal | :off +# @type auto_vacuum() :: :none | :full | :incremental +# @type locking_mode() :: :normal | :exclusive + +# @type connection_opt() :: +# {:database, String.t()} +# | {:journal_mode, journal_mode()} +# | {:temp_store, temp_store()} +# | {:synchronous, synchronous()} +# | {:foreign_keys, :on | :off} +# | {:cache_size, integer()} +# | {:cache_spill, :on | :off} +# | {:case_sensitive_like, boolean()} +# | {:auto_vacuum, auto_vacuum()} +# | {:locking_mode, locking_mode()} +# | {:secure_delete, :on | :off} +# | {:wal_auto_check_point, integer()} +# | {:busy_timeout, integer()} +# | {:chunk_size, integer()} +# | {:journal_size_limit, integer()} +# | {:soft_heap_limit, integer()} +# | {:hard_heap_limit, integer()} +# | {:key, String.t()} +# | {:custom_pragmas, [{keyword(), integer() | boolean() | String.t()}]} + +# @impl true +# @doc """ +# Initializes the Ecto Exqlite adapter. + +# For connection configurations we use the defaults that come with SQLite3, but +# we recommend which options to choose. We do not default to the recommended +# because we don't know what your environment is like. + +# Allowed options: + +# * `:database` - The path to the database. In memory is allowed. You can use +# `:memory` or `":memory:"` to designate that. +# * `:mode` - use `:readwrite` to open the database for reading and writing +# or `:readonly` to open it in read-only mode. `:readwrite` will also create +# the database if it doesn't already exist. Defaults to `:readwrite`. +# * `:journal_mode` - Sets the journal mode for the sqlite connection. Can be +# one of the following `:delete`, `:truncate`, `:persist`, `:memory`, +# `:wal`, or `:off`. Defaults to `:delete`. It is recommended that you use +# `:wal` due to support for concurrent reads. Note: `:wal` does not mean +# concurrent writes. +# * `:temp_store` - Sets the storage used for temporary tables. Default is +# `:default`. Allowed values are `:default`, `:file`, `:memory`. It is +# recommended that you use `:memory` for storage. +# * `:synchronous` - Can be `:extra`, `:full`, `:normal`, or `:off`. Defaults +# to `:normal`. +# * `:foreign_keys` - Sets if foreign key checks should be enforced or not. +# Can be `:on` or `:off`. Default is `:on`. +# * `:cache_size` - Sets the cache size to be used for the connection. This is +# an odd setting as a positive value is the number of pages in memory to use +# and a negative value is the size in kilobytes to use. Default is `-2000`. +# It is recommended that you use `-64000`. +# * `:cache_spill` - The cache_spill pragma enables or disables the ability of +# the pager to spill dirty cache pages to the database file in the middle of +# a transaction. By default it is `:on`, and for most applications, it +# should remain so. +# * `:case_sensitive_like` +# * `:auto_vacuum` - Defaults to `:none`. Can be `:none`, `:full` or +# `:incremental`. Depending on the database size, `:incremental` may be +# beneficial. +# * `:locking_mode` - Defaults to `:normal`. Allowed values are `:normal` or +# `:exclusive`. See [sqlite documentation][1] for more information. +# * `:secure_delete` - Defaults to `:off`. If enabled, it will cause SQLite3 +# to overwrite records that were deleted with zeros. +# * `:wal_auto_check_point` - Sets the write-ahead log auto-checkpoint +# interval. Default is `1000`. Setting the auto-checkpoint size to zero or a +# negative value turns auto-checkpointing off. +# * `:busy_timeout` - Sets the busy timeout in milliseconds for a connection. +# Default is `2000`. +# * `:chunk_size` - The chunk size for bulk fetching. Defaults to `50`. +# * `:key` - Optional key to set during database initialization. This PRAGMA +# is often used to set up database level encryption. +# * `:journal_size_limit` - The size limit in bytes of the journal. +# * `:soft_heap_limit` - The size limit in bytes for the heap limit. +# * `:hard_heap_limit` - The size limit in bytes for the heap. +# * `:custom_pragmas` - A list of custom pragmas to set on the connection, for example to configure extensions. +# * `:load_extensions` - A list of paths identifying extensions to load. Defaults to `[]`. +# The provided list will be merged with the global extensions list, set on `:exqlite, :load_extensions`. +# Be aware that the path should handle pointing to a library compiled for the current architecture. +# Example configuration: + +# ``` +# arch_dir = +# System.cmd("uname", ["-sm"]) +# |> elem(0) +# |> String.trim() +# |> String.replace(" ", "-") +# |> String.downcase() # => "darwin-arm64" + +# config :myapp, arch_dir: arch_dir + +# # global +# config :exqlite, load_extensions: [ "./priv/sqlite/\#{arch_dir}/rotate" ] + +# # per connection in a Phoenix app +# config :myapp, Myapp.Repo, +# database: "path/to/db", +# load_extensions: [ +# "./priv/sqlite/\#{arch_dir}/vector0", +# "./priv/sqlite/\#{arch_dir}/vss0" +# ] +# ``` + +# For more information about the options above, see [sqlite documentation][1] + +# [1]: https://www.sqlite.org/pragma.html +# """ +# @spec connect([connection_opt()]) :: {:ok, t()} | {:error, Exception.t()} +# def connect(options) do +# database = Keyword.get(options, :database) + +# options = +# Keyword.put_new( +# options, +# :chunk_size, +# Application.get_env(:exqlite, :default_chunk_size, 50) +# ) + +# case database do +# nil -> +# {:error, +# %Error{ +# message: """ +# You must provide a :database to the database. \ +# Example: connect(database: "./") or connect(database: :memory)\ +# """ +# }} + +# :memory -> +# do_connect(":memory:", options) + +# _ -> +# do_connect(database, options) +# end +# end + +# @impl true +# def disconnect(_err, %__MODULE__{db: db}) do +# case Sqlite3.close(db) do +# :ok -> :ok +# {:error, reason} -> {:error, %Error{message: to_string(reason)}} +# end +# end + +# @impl true +# def checkout(%__MODULE__{status: :idle} = state) do +# {:ok, %{state | status: :busy}} +# end + +# def checkout(%__MODULE__{status: :busy} = state) do +# {:disconnect, %Error{message: "Database is busy"}, state} +# end + +# @impl true +# def ping(state), do: {:ok, state} + +# ## +# ## Handlers +# ## + +# @impl true +# def handle_prepare(%Query{} = query, options, state) do +# with {:ok, query} <- prepare(query, options, state) do +# {:ok, query, state} +# end +# end + +# @impl true +# def handle_execute(%Query{} = query, params, options, state) do +# with {:ok, query} <- prepare(query, options, state) do +# execute(:execute, query, params, state) +# end +# end + +# @doc """ +# Begin a transaction. + +# For full info refer to sqlite docs: https://sqlite.org/lang_transaction.html + +# Note: default transaction mode is DEFERRED. +# """ +# @impl true +# def handle_begin(options, %{transaction_status: transaction_status} = state) do +# # This doesn't handle more than 2 levels of transactions. +# # +# # One possible solution would be to just track the number of open +# # transactions and use that for driving the transaction status being idle or +# # in a transaction. +# # +# # I do not know why the other official adapters do not track this and just +# # append level on the savepoint. Instead the rollbacks would just completely +# # revert the issues when it may be desirable to fix something while in the +# # transaction and then commit. +# case Keyword.get(options, :mode, :deferred) do +# :deferred when transaction_status == :idle -> +# handle_transaction(:begin, "BEGIN TRANSACTION", state) + +# :transaction when transaction_status == :idle -> +# handle_transaction(:begin, "BEGIN TRANSACTION", state) + +# :immediate when transaction_status == :idle -> +# handle_transaction(:begin, "BEGIN IMMEDIATE TRANSACTION", state) + +# :exclusive when transaction_status == :idle -> +# handle_transaction(:begin, "BEGIN EXCLUSIVE TRANSACTION", state) + +# mode +# when mode in [:deferred, :immediate, :exclusive, :savepoint] and +# transaction_status == :transaction -> +# handle_transaction(:begin, "SAVEPOINT exqlite_savepoint", state) +# end +# end + +# @impl true +# def handle_commit(options, %{transaction_status: transaction_status} = state) do +# case Keyword.get(options, :mode, :deferred) do +# :savepoint when transaction_status == :transaction -> +# handle_transaction( +# :commit_savepoint, +# "RELEASE SAVEPOINT exqlite_savepoint", +# state +# ) + +# mode +# when mode in [:deferred, :immediate, :exclusive, :transaction] and +# transaction_status == :transaction -> +# handle_transaction(:commit, "COMMIT", state) +# end +# end + +# @impl true +# def handle_rollback(options, %{transaction_status: transaction_status} = state) do +# case Keyword.get(options, :mode, :deferred) do +# :savepoint when transaction_status == :transaction -> +# with {:ok, _result, state} <- +# handle_transaction( +# :rollback_savepoint, +# "ROLLBACK TO SAVEPOINT exqlite_savepoint", +# state +# ) do +# handle_transaction( +# :rollback_savepoint, +# "RELEASE SAVEPOINT exqlite_savepoint", +# state +# ) +# end + +# mode +# when mode in [:deferred, :immediate, :exclusive, :transaction] -> +# handle_transaction(:rollback, "ROLLBACK TRANSACTION", state) +# end +# end + +# @doc """ +# Close a query prepared by `handle_prepare/3` with the database. Return +# `{:ok, result, state}` on success and to continue, +# `{:error, exception, state}` to return an error and continue, or +# `{:disconnect, exception, state}` to return an error and disconnect. + +# This callback is called in the client process. +# """ +# @impl true +# def handle_close(query, _opts, state) do +# Sqlite3.release(state.db, query.ref) +# {:ok, nil, state} +# end + +# @impl true +# def handle_declare(%Query{} = query, params, opts, state) do +# # We emulate cursor functionality by just using a prepared statement and +# # step through it. Thus we just return the query ref as the cursor. +# with {:ok, query} <- prepare_no_cache(query, opts, state), +# {:ok, query} <- bind_params(query, params, state) do +# {:ok, query, query.ref, state} +# end +# end + +# @impl true +# def handle_deallocate(%Query{} = query, _cursor, _opts, state) do +# Sqlite3.release(state.db, query.ref) +# {:ok, nil, state} +# end + +# @impl true +# def handle_fetch(%Query{statement: statement}, cursor, opts, state) do +# chunk_size = opts[:chunk_size] || opts[:max_rows] || state.chunk_size + +# case Sqlite3.multi_step(state.db, cursor, chunk_size) do +# {:done, rows} -> +# {:halt, %Result{rows: rows, command: :fetch, num_rows: length(rows)}, state} + +# {:rows, rows} -> +# {:cont, %Result{rows: rows, command: :fetch, num_rows: chunk_size}, state} + +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} + +# :busy -> +# {:error, %Error{message: "Database is busy", statement: statement}, state} +# end +# end + +# @impl true +# def handle_status(_opts, state) do +# {state.transaction_status, state} +# end + +# ### ---------------------------------- +# # Internal functions and helpers +# ### ---------------------------------- + +# defp set_pragma(db, pragma_name, value) do +# Sqlite3.execute(db, "PRAGMA #{pragma_name} = #{value}") +# end + +# defp get_pragma(db, pragma_name) do +# {:ok, statement} = Sqlite3.prepare(db, "PRAGMA #{pragma_name}") + +# case Sqlite3.fetch_all(db, statement) do +# {:ok, [[value]]} -> {:ok, value} +# _ -> :error +# end +# end + +# defp maybe_set_pragma(db, pragma_name, value) do +# case get_pragma(db, pragma_name) do +# {:ok, current} -> +# if current == value do +# :ok +# else +# set_pragma(db, pragma_name, value) +# end + +# _ -> +# set_pragma(db, pragma_name, value) +# end +# end + +# defp set_key(db, options) do +# # we can't use maybe_set_pragma here since +# # the only thing that will work on an encrypted +# # database without error is setting the key. +# case Keyword.fetch(options, :key) do +# {:ok, key} -> set_pragma(db, "key", key) +# _ -> :ok +# end +# end + +# defp set_custom_pragmas(db, options) do +# # we can't use maybe_set_pragma because some pragmas +# # are required to be set before the database is e.g. decrypted. +# case Keyword.fetch(options, :custom_pragmas) do +# {:ok, list} -> do_set_custom_pragmas(db, list) +# _ -> :ok +# end +# end + +# defp do_set_custom_pragmas(db, list) do +# list +# |> Enum.reduce_while(:ok, fn {key, value}, :ok -> +# case set_pragma(db, key, value) do +# :ok -> {:cont, :ok} +# {:error, _reason} -> {:halt, :error} +# end +# end) +# end + +# defp set_pragma_if_present(_db, _pragma, nil), do: :ok +# defp set_pragma_if_present(db, pragma, value), do: set_pragma(db, pragma, value) + +# defp set_journal_size_limit(db, options) do +# set_pragma_if_present( +# db, +# "journal_size_limit", +# Keyword.get(options, :journal_size_limit) +# ) +# end + +# defp set_soft_heap_limit(db, options) do +# set_pragma_if_present(db, "soft_heap_limit", Keyword.get(options, :soft_heap_limit)) +# end + +# defp set_hard_heap_limit(db, options) do +# set_pragma_if_present(db, "hard_heap_limit", Keyword.get(options, :hard_heap_limit)) +# end + +# defp set_journal_mode(db, options) do +# maybe_set_pragma(db, "journal_mode", Pragma.journal_mode(options)) +# end + +# defp set_temp_store(db, options) do +# set_pragma(db, "temp_store", Pragma.temp_store(options)) +# end + +# defp set_synchronous(db, options) do +# set_pragma(db, "synchronous", Pragma.synchronous(options)) +# end + +# defp set_foreign_keys(db, options) do +# set_pragma(db, "foreign_keys", Pragma.foreign_keys(options)) +# end + +# defp set_cache_size(db, options) do +# maybe_set_pragma(db, "cache_size", Pragma.cache_size(options)) +# end + +# defp set_cache_spill(db, options) do +# set_pragma(db, "cache_spill", Pragma.cache_spill(options)) +# end + +# defp set_case_sensitive_like(db, options) do +# set_pragma(db, "case_sensitive_like", Pragma.case_sensitive_like(options)) +# end + +# defp set_auto_vacuum(db, options) do +# set_pragma(db, "auto_vacuum", Pragma.auto_vacuum(options)) +# end + +# defp set_locking_mode(db, options) do +# set_pragma(db, "locking_mode", Pragma.locking_mode(options)) +# end + +# defp set_secure_delete(db, options) do +# set_pragma(db, "secure_delete", Pragma.secure_delete(options)) +# end + +# defp set_wal_auto_check_point(db, options) do +# set_pragma(db, "wal_autocheckpoint", Pragma.wal_auto_check_point(options)) +# end + +# defp set_busy_timeout(db, options) do +# set_pragma(db, "busy_timeout", Pragma.busy_timeout(options)) +# end + +# defp load_extensions(db, options) do +# global_extensions = Application.get_env(:exqlite, :load_extensions, []) + +# extensions = +# Keyword.get(options, :load_extensions, []) +# |> Enum.concat(global_extensions) +# |> Enum.uniq() + +# do_load_extensions(db, extensions) +# end + +# defp do_load_extensions(_db, []), do: :ok + +# defp do_load_extensions(db, extensions) do +# Sqlite3.enable_load_extension(db, true) + +# Enum.each(extensions, fn extension -> +# Logger.debug(fn -> "Exqlite: loading extension `#{extension}`" end) +# Sqlite3.execute(db, "SELECT load_extension('#{extension}')") +# end) + +# Sqlite3.enable_load_extension(db, false) +# end + +# defp do_connect(database, options) do +# with {:ok, directory} <- resolve_directory(database), +# :ok <- mkdir_p(directory), +# {:ok, db} <- Sqlite3.open(database, options), +# :ok <- set_key(db, options), +# :ok <- set_custom_pragmas(db, options), +# :ok <- set_journal_mode(db, options), +# :ok <- set_temp_store(db, options), +# :ok <- set_synchronous(db, options), +# :ok <- set_foreign_keys(db, options), +# :ok <- set_cache_size(db, options), +# :ok <- set_cache_spill(db, options), +# :ok <- set_auto_vacuum(db, options), +# :ok <- set_locking_mode(db, options), +# :ok <- set_secure_delete(db, options), +# :ok <- set_wal_auto_check_point(db, options), +# :ok <- set_case_sensitive_like(db, options), +# :ok <- set_busy_timeout(db, options), +# :ok <- set_journal_size_limit(db, options), +# :ok <- set_soft_heap_limit(db, options), +# :ok <- set_hard_heap_limit(db, options), +# :ok <- load_extensions(db, options) do +# state = %__MODULE__{ +# db: db, +# directory: directory, +# path: database, +# transaction_status: :idle, +# status: :idle, +# chunk_size: Keyword.get(options, :chunk_size) +# } + +# {:ok, state} +# else +# {:error, reason} -> +# {:error, %Exqlite.Error{message: to_string(reason)}} +# end +# end + +# def maybe_put_command(query, options) do +# case Keyword.get(options, :command) do +# nil -> query +# command -> %{query | command: command} +# end +# end + +# # Attempt to retrieve the cached query, if it doesn't exist, we'll prepare one +# # and cache it for later. +# defp prepare(%Query{statement: statement} = query, options, state) do +# query = maybe_put_command(query, options) + +# with {:ok, ref} <- Sqlite3.prepare(state.db, IO.iodata_to_binary(statement)), +# query <- %{query | ref: ref} do +# {:ok, query} +# else +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# # Prepare a query and do not cache it. +# defp prepare_no_cache(%Query{statement: statement} = query, options, state) do +# query = maybe_put_command(query, options) + +# case Sqlite3.prepare(state.db, statement) do +# {:ok, ref} -> +# {:ok, %{query | ref: ref}} + +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# @spec maybe_changes(Sqlite3.db(), Query.t()) :: integer() | nil +# defp maybe_changes(db, %Query{command: command}) +# when command in [:update, :insert, :delete] do +# case Sqlite3.changes(db) do +# {:ok, total} -> total +# _ -> nil +# end +# end + +# defp maybe_changes(_, _), do: nil + +# # when we have an empty list of columns, that signifies that +# # there was no possible return tuple (e.g., update statement without RETURNING) +# # and in that case, we return nil to signify no possible result. +# defp maybe_rows([], []), do: nil +# defp maybe_rows(rows, _cols), do: rows + +# defp execute(call, %Query{} = query, params, state) do +# with {:ok, query} <- bind_params(query, params, state), +# {:ok, columns} <- get_columns(query, state), +# {:ok, rows} <- get_rows(query, state), +# {:ok, transaction_status} <- Sqlite3.transaction_status(state.db), +# changes <- maybe_changes(state.db, query) do +# case query.command do +# command when command in [:delete, :insert, :update] -> +# { +# :ok, +# query, +# Result.new( +# command: call, +# num_rows: changes, +# rows: maybe_rows(rows, columns) +# ), +# %{state | transaction_status: transaction_status} +# } + +# _ -> +# { +# :ok, +# query, +# Result.new( +# command: call, +# columns: columns, +# rows: rows, +# num_rows: Enum.count(rows) +# ), +# %{state | transaction_status: transaction_status} +# } +# end +# end +# end + +# defp bind_params(%Query{ref: ref, statement: statement} = query, params, state) +# when ref != nil do +# case Sqlite3.bind(state.db, ref, params) do +# :ok -> +# {:ok, query} + +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# defp get_columns(%Query{ref: ref, statement: statement}, state) do +# case Sqlite3.columns(state.db, ref) do +# {:ok, columns} -> +# {:ok, columns} + +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# defp get_rows(%Query{ref: ref, statement: statement}, state) do +# case Sqlite3.fetch_all(state.db, ref, state.chunk_size) do +# {:ok, rows} -> +# {:ok, rows} + +# {:error, reason} -> +# {:error, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# defp handle_transaction(call, statement, state) do +# with :ok <- Sqlite3.execute(state.db, statement), +# {:ok, transaction_status} <- Sqlite3.transaction_status(state.db) do +# result = %Result{ +# command: call, +# rows: [], +# columns: [], +# num_rows: 0 +# } + +# {:ok, result, %{state | transaction_status: transaction_status}} +# else +# {:error, reason} -> +# {:disconnect, %Error{message: to_string(reason), statement: statement}, state} +# end +# end + +# defp resolve_directory(":memory:"), do: {:ok, nil} + +# defp resolve_directory("file:" <> _ = uri) do +# case URI.parse(uri) do +# %{path: path} when is_binary(path) -> +# {:ok, Path.dirname(path)} + +# _ -> +# {:error, "No path in #{inspect(uri)}"} +# end +# end + +# defp resolve_directory(path), do: {:ok, Path.dirname(path)} + +# # SQLITE_OPEN_CREATE will create the DB file if not existing, but +# # will not create intermediary directories if they are missing. +# # So let's preemptively create the intermediate directories here +# # before trying to open the DB file. +# defp mkdir_p(nil), do: :ok +# defp mkdir_p(directory), do: File.mkdir_p(directory) +# end diff --git a/lib/exqlite/query.ex b/lib/exqlite/query.ex index 5b12e14d..b083ae0a 100644 --- a/lib/exqlite/query.ex +++ b/lib/exqlite/query.ex @@ -44,23 +44,23 @@ defmodule Exqlite.Query do end end - defimpl DBConnection.Query do - def parse(query, _opts) do - query - end + # defimpl DBConnection.Query do + # def parse(query, _opts) do + # query + # end - def describe(query, _opts) do - query - end + # def describe(query, _opts) do + # query + # end - def encode(_query, params, _opts) do - params - end + # def encode(_query, params, _opts) do + # params + # end - def decode(_query, result, _opts) do - result - end - end + # def decode(_query, result, _opts) do + # result + # end + # end defimpl String.Chars do def to_string(%{statement: statement}) do diff --git a/lib/exqlite/rw_connection.ex b/lib/exqlite/rw_connection.ex new file mode 100644 index 00000000..559f2d92 --- /dev/null +++ b/lib/exqlite/rw_connection.ex @@ -0,0 +1,234 @@ +defmodule Exqlite.RWConnection do + @moduledoc """ + Connections are modelled as processes. + """ + + use GenServer + alias Exqlite.Sqlite3 + + @type t :: GenServer.server() + + @doc """ + Starts a connection process. + + ## Options + + * `:database` (required) - the database process to connect to + + * `:process_options` - the options to be given to the underlying + process. See `GenServer.start_link/3` for all options + + ## Examples + + #{__MODULE__}.start_link( + database: ":memory:", + process_options: [name: MyApp.Conn] + ) + + In your supervision tree it would be started like this: + + children = [ + {#{__MODULE__}, + database: ":memory:", + process_options: [name: MyApp.Conn]} + ] + + """ + def start_link(options) do + {process_options, options} = Keyword.pop(options, :process_options, []) + GenServer.start_link(__MODULE__, options, process_options) + end + + def stop(conn) do + GenServer.stop(conn) + end + + @doc """ + Runs the given `query` with `params` while exclusively locking the connection. + + ## Examples + + query(MyApp.Conn, "create table users(name)") + query(conn, "insert into users(names) values (?), (?)", ["bim", "bom"]) + + """ + def query(conn, query, params \\ []) do + command(conn, :lock, {:query, query, params}, &result/2) + end + + @doc """ + Runs the given `query` with `params` without locking the connection. + Should only be used for reading. + + ## Examples + + read_query(MyApp.Conn, "select * from users") + read_query(conn, "select * from users where name = ?", ["bimbom"]) + + """ + def read_query(conn, query, params \\ []) do + command(conn, :read, {:query, query, params}, &result/2) + end + + defp command(conn, type, command, fun) do + case GenServer.call(conn, {type, command}, :infinity) do + {:ok, db, undo_ref, statement_ref} -> + try do + fun.(db, statement_ref) + after + GenServer.cast(conn, {:undo, type, undo_ref}) + end + + {:error, reason} -> + {:error, error_to_exception(reason)} + end + end + + defp result(db, statement_ref) do + with {:ok, columns} <- Sqlite3.columns(db, statement_ref), + {:ok, rows} <- Sqlite3.fetch_all(db, statement_ref, _chunk_size = 100), + do: {:ok, %Exqlite.Result{columns: columns, rows: rows}} + end + + @impl true + def init(options) do + database = Keyword.fetch!(options, :database) + + with {:ok, db} <- Sqlite3.open(database, options), + :ok <- Sqlite3.execute(db, "pragma journal_mode=wal"), + :ok <- Sqlite3.execute(db, "pragma foreign_keys=on") do + state = %{db: db, lock: :none, queue: :queue.new(), reads: %{}} + {:ok, state} + else + {:error, reason} -> + {:error, error_to_exception(reason)} + end + end + + defp error_to_exception(reason) do + Exqlite.Error.exception(message: to_string(reason)) + end + + @impl true + def handle_call({:read, command}, from, %{lock: :none} = state) do + case handle_command(command, state.db) do + {:ok, statement_ref} -> + {pid, _} = from + unregister_ref = Process.monitor(pid) + reply = {:ok, state.db, unregister_ref, statement_ref} + state = %{state | reads: Map.put(state.reads, unregister_ref, statement_ref)} + {:reply, reply, state} + + {:error, _reason} = error -> + {:reply, error, state} + end + end + + def handle_call({:lock, command}, from, state) do + state = update_in(state.queue, &:queue.in({:lock, command, from}, &1)) + + if map_size(state.reads) > 0 do + {:noreply, %{state | lock: :drain}} + else + {:noreply, maybe_dequeue(state)} + end + end + + def handle_call({:read, command}, from, state) do + state = update_in(state.queue, &:queue.in({:read, command, from}, &1)) + {:noreply, state} + end + + @impl true + def handle_cast({:undo, :lock, ref}, %{lock: {ref, statement_ref}} = state) do + Process.demonitor(ref, [:flush]) + {:noreply, unlock(state, statement_ref)} + end + + def handle_cast({:undo, :read, ref}, state) do + Process.demonitor(ref, [:flush]) + {:noreply, unregister(state, ref)} + end + + @impl true + def handle_info({:DOWN, ref, _, _, _}, %{lock: {ref, statement_ref}} = state) do + {:noreply, unlock(state, statement_ref)} + end + + def handle_info({:DOWN, ref, _, _, _}, state) do + {:noreply, unregister(state, ref)} + end + + def handle_info(_msg, state), do: {:noreply, state} + + defp unlock(state, statement_ref) do + :ok = Sqlite3.release(state.db, statement_ref) + maybe_dequeue(%{state | lock: :none}) + end + + defp unregister(state, ref) do + {statement_ref, reads} = Map.pop!(state.reads, ref) + if statement_ref, do: :ok = Sqlite3.release(state.db, statement_ref) + state = %{state | reads: reads} + + if state.lock == :drain and map_size(reads) == 0 do + maybe_dequeue(%{state | lock: :none}) + else + state + end + end + + defp maybe_dequeue(%{lock: :none, queue: queue} = state) do + case :queue.out(queue) do + {:empty, queue} -> + %{state | queue: queue} + + {{:value, value}, queue} -> + case value do + {:lock, command, from} -> + {pid, _} = from + + case handle_command(command, state.db) do + {:ok, statement_ref} when is_reference(statement_ref) -> + unlock_ref = Process.monitor(pid) + GenServer.reply(from, {:ok, state.db, unlock_ref, statement_ref}) + %{state | lock: {unlock_ref, statement_ref}, queue: queue} + + {:error, _reason} = error -> + GenServer.reply(from, error) + maybe_dequeue(%{state | queue: queue}) + end + + {:read, command, from} -> + {pid, _} = from + + case handle_command(command, state.db) do + {:ok, statement_ref} when is_reference(statement_ref) -> + unregister_ref = Process.monitor(pid) + GenServer.reply(from, {:ok, state.db, unregister_ref, statement_ref}) + + %{ + state + | reads: Map.put(state.reads, unregister_ref, statement_ref), + queue: queue + } + + {:error, _reason} = error -> + GenServer.reply(from, error) + maybe_dequeue(%{state | queue: queue}) + end + end + end + end + + defp maybe_dequeue(state), do: state + + defp handle_command({:query, query, params}, db) do + with {:ok, stmt} = ok <- Sqlite3.prepare(db, query), + :ok <- maybe_bind(db, stmt, params), + do: ok + end + + defp maybe_bind(_db, _stmt, []), do: :ok + defp maybe_bind(db, stmt, params), do: Sqlite3.bind(db, stmt, params) +end diff --git a/lib/exqlite/stream.ex b/lib/exqlite/stream.ex deleted file mode 100644 index 79ff5d86..00000000 --- a/lib/exqlite/stream.ex +++ /dev/null @@ -1,42 +0,0 @@ -defmodule Exqlite.Stream do - @moduledoc false - defstruct [:conn, :query, :params, :options] - @type t :: %Exqlite.Stream{} - - defimpl Enumerable do - def reduce(%Exqlite.Stream{query: %Exqlite.Query{} = query} = stream, acc, fun) do - # Possibly need to pass a chunk size option along so that we can let - # the NIF chunk it. - %Exqlite.Stream{conn: conn, params: params, options: opts} = stream - - stream = %DBConnection.Stream{ - conn: conn, - query: query, - params: params, - opts: opts - } - - DBConnection.reduce(stream, acc, fun) - end - - def reduce(%Exqlite.Stream{query: statement} = stream, acc, fun) do - %Exqlite.Stream{conn: conn, params: params, options: opts} = stream - query = %Exqlite.Query{name: "", statement: statement} - - stream = %DBConnection.PrepareStream{ - conn: conn, - query: query, - params: params, - opts: opts - } - - DBConnection.reduce(stream, acc, fun) - end - - def member?(_, _), do: {:error, __MODULE__} - - def count(_), do: {:error, __MODULE__} - - def slice(_), do: {:error, __MODULE__} - end -end diff --git a/mix.exs b/mix.exs index 65626467..1cf247c9 100644 --- a/mix.exs +++ b/mix.exs @@ -51,7 +51,6 @@ defmodule Exqlite.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:db_connection, "~> 2.1"}, {:ex_sqlean, "~> 0.8.5", only: [:dev, :test]}, {:elixir_make, "~> 0.7", runtime: false}, {:cc_precompiler, "~> 0.1", runtime: false}, diff --git a/mix.lock b/mix.lock index 08953685..77882ca1 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,6 @@ "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.7", "77de20ac77f0e53f20ca82c563520af0237c301a1ec3ab3bc598e8a96c7ee5d9", [:mix], [{:elixir_make, "~> 0.7.3", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "2768b28bf3c2b4f788c995576b39b8cb5d47eb788526d93bd52206c1d8bf4b75"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, - "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, "dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"}, "earmark_parser": {:hex, :earmark_parser, "1.4.32", "fa739a0ecfa34493de19426681b23f6814573faee95dfd4b4aafe15a7b5b32c6", [:mix], [], "hexpm", "b8b0dd77d60373e77a3d7e8afa598f325e49e8663a51bcc2b88ef41838cca755"}, "elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"}, @@ -16,6 +15,5 @@ "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "table": {:hex, :table, "0.1.2", "87ad1125f5b70c5dea0307aa633194083eb5182ec537efc94e96af08937e14a8", [:mix], [], "hexpm", "7e99bc7efef806315c7e65640724bf165c3061cdc5d854060f74468367065029"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "temp": {:hex, :temp, "0.4.7", "2c78482cc2294020a4bc0c95950b907ff386523367d4e63308a252feffbea9f2", [:mix], [], "hexpm", "6af19e7d6a85a427478be1021574d1ae2a1e1b90882586f06bde76c63cd03e0d"}, } diff --git a/test/exqlite/connection_test.exs b/test/exqlite/connection_test.exs index 288cf022..3699c761 100644 --- a/test/exqlite/connection_test.exs +++ b/test/exqlite/connection_test.exs @@ -1,364 +1,364 @@ -defmodule Exqlite.ConnectionTest do - use ExUnit.Case +# defmodule Exqlite.ConnectionTest do +# use ExUnit.Case - alias Exqlite.Connection - alias Exqlite.Query - alias Exqlite.Sqlite3 +# alias Exqlite.Connection +# alias Exqlite.Query +# alias Exqlite.Sqlite3 - describe ".connect/1" do - test "returns error when path is missing from options" do - {:error, error} = Connection.connect([]) +# describe ".connect/1" do +# test "returns error when path is missing from options" do +# {:error, error} = Connection.connect([]) - assert error.message == - ~s{You must provide a :database to the database. Example: connect(database: "./") or connect(database: :memory)} - end +# assert error.message == +# ~s{You must provide a :database to the database. Example: connect(database: "./") or connect(database: :memory)} +# end - test "connects to an in memory database" do - {:ok, state} = Connection.connect(database: ":memory:") +# test "connects to an in memory database" do +# {:ok, state} = Connection.connect(database: ":memory:") - assert state.path == ":memory:" - assert state.db - end +# assert state.path == ":memory:" +# assert state.db +# end - test "connects to in memory when the memory atom is passed" do - {:ok, state} = Connection.connect(database: :memory) +# test "connects to in memory when the memory atom is passed" do +# {:ok, state} = Connection.connect(database: :memory) - assert state.path == ":memory:" - assert state.db - end +# assert state.path == ":memory:" +# assert state.db +# end - test "connects to a file" do - path = Temp.path!() - {:ok, state} = Connection.connect(database: path) +# test "connects to a file" do +# path = Temp.path!() +# {:ok, state} = Connection.connect(database: path) - assert state.path == path - assert state.db +# assert state.path == path +# assert state.db - File.rm(path) - end +# File.rm(path) +# end - test "connects to a file from URL" do - path = Temp.path!() +# test "connects to a file from URL" do +# path = Temp.path!() - {:ok, state} = Connection.connect(database: "file:#{path}?mode=rwc") +# {:ok, state} = Connection.connect(database: "file:#{path}?mode=rwc") - assert state.directory == Path.dirname(path) - assert state.db - end +# assert state.directory == Path.dirname(path) +# assert state.db +# end - test "fails to write a file from URL with mode=ro" do - path = Temp.path!() +# test "fails to write a file from URL with mode=ro" do +# path = Temp.path!() - {:ok, db} = Sqlite3.open(path) +# {:ok, db} = Sqlite3.open(path) - :ok = - Sqlite3.execute(db, "create table test (id ingeger primary key, stuff text)") +# :ok = +# Sqlite3.execute(db, "create table test (id ingeger primary key, stuff text)") - :ok = - Sqlite3.execute(db, "insert into test (id, stuff) values (999, 'Some stuff')") +# :ok = +# Sqlite3.execute(db, "insert into test (id, stuff) values (999, 'Some stuff')") - :ok = Sqlite3.close(db) +# :ok = Sqlite3.close(db) - {:ok, conn} = Connection.connect(database: "file:#{path}?mode=ro") +# {:ok, conn} = Connection.connect(database: "file:#{path}?mode=ro") - assert conn.directory == Path.dirname(path) - assert conn.db +# assert conn.directory == Path.dirname(path) +# assert conn.db - assert match?( - {:ok, _, %{rows: [[1]]}, _}, - %Query{statement: "select count(*) from test"} - |> Connection.handle_execute([], [], conn) - ) +# assert match?( +# {:ok, _, %{rows: [[1]]}, _}, +# %Query{statement: "select count(*) from test"} +# |> Connection.handle_execute([], [], conn) +# ) - {:error, %{message: message}, _} = - %Query{ - statement: "insert into test (id, stuff) values (888, 'some more stuff')" - } - |> Connection.handle_execute([], [], conn) +# {:error, %{message: message}, _} = +# %Query{ +# statement: "insert into test (id, stuff) values (888, 'some more stuff')" +# } +# |> Connection.handle_execute([], [], conn) - # In most of the test matrix the message is "attempt to write a readonly database", - # but in Elixir 1.13, OTP 23, OS windows-2019 it is "not an error". - assert message in ["attempt to write a readonly database", "not an error"] +# # In most of the test matrix the message is "attempt to write a readonly database", +# # but in Elixir 1.13, OTP 23, OS windows-2019 it is "not an error". +# assert message in ["attempt to write a readonly database", "not an error"] - File.rm(path) - end +# File.rm(path) +# end - test "setting custom_pragmas" do - path = Temp.path!() +# test "setting custom_pragmas" do +# path = Temp.path!() - {:ok, state} = - Connection.connect( - database: path, - custom_pragmas: [ - checkpoint_fullfsync: 0 - ] - ) +# {:ok, state} = +# Connection.connect( +# database: path, +# custom_pragmas: [ +# checkpoint_fullfsync: 0 +# ] +# ) - assert state.db +# assert state.db - assert {:ok, 0} = get_pragma(state.db, :checkpoint_fullfsync) +# assert {:ok, 0} = get_pragma(state.db, :checkpoint_fullfsync) - File.rm(path) - end +# File.rm(path) +# end - test "setting journal_size_limit" do - path = Temp.path!() - size_limit = 20 * 1024 * 1024 - {:ok, state} = Connection.connect(database: path, journal_size_limit: size_limit) +# test "setting journal_size_limit" do +# path = Temp.path!() +# size_limit = 20 * 1024 * 1024 +# {:ok, state} = Connection.connect(database: path, journal_size_limit: size_limit) - assert state.db +# assert state.db - assert {:ok, ^size_limit} = get_pragma(state.db, :journal_size_limit) +# assert {:ok, ^size_limit} = get_pragma(state.db, :journal_size_limit) - File.rm(path) - end +# File.rm(path) +# end - test "setting soft_heap_limit" do - path = Temp.path!() - size_limit = 20 * 1024 * 1024 - {:ok, state} = Connection.connect(database: path, soft_heap_limit: size_limit) +# test "setting soft_heap_limit" do +# path = Temp.path!() +# size_limit = 20 * 1024 * 1024 +# {:ok, state} = Connection.connect(database: path, soft_heap_limit: size_limit) - assert state.db +# assert state.db - assert {:ok, ^size_limit} = get_pragma(state.db, :soft_heap_limit) +# assert {:ok, ^size_limit} = get_pragma(state.db, :soft_heap_limit) - File.rm(path) - end +# File.rm(path) +# end - test "setting hard_heap_limit" do - path = Temp.path!() - size_limit = 20 * 1024 * 1024 - {:ok, state} = Connection.connect(database: path, hard_heap_limit: size_limit) +# test "setting hard_heap_limit" do +# path = Temp.path!() +# size_limit = 20 * 1024 * 1024 +# {:ok, state} = Connection.connect(database: path, hard_heap_limit: size_limit) - assert state.db +# assert state.db - assert {:ok, ^size_limit} = get_pragma(state.db, :hard_heap_limit) +# assert {:ok, ^size_limit} = get_pragma(state.db, :hard_heap_limit) - File.rm(path) - end +# File.rm(path) +# end - test "setting connection mode" do - path = Temp.path!() +# test "setting connection mode" do +# path = Temp.path!() - # Create readwrite connection - {:ok, rw_state} = Connection.connect(database: path) - create_table_query = "create table test (id integer primary key, stuff text)" - :ok = Sqlite3.execute(rw_state.db, create_table_query) +# # Create readwrite connection +# {:ok, rw_state} = Connection.connect(database: path) +# create_table_query = "create table test (id integer primary key, stuff text)" +# :ok = Sqlite3.execute(rw_state.db, create_table_query) - insert_value_query = "insert into test (stuff) values ('This is a test')" - :ok = Sqlite3.execute(rw_state.db, insert_value_query) +# insert_value_query = "insert into test (stuff) values ('This is a test')" +# :ok = Sqlite3.execute(rw_state.db, insert_value_query) - # Read from database with a readonly connection - {:ok, ro_state} = Connection.connect(database: path, mode: :readonly) +# # Read from database with a readonly connection +# {:ok, ro_state} = Connection.connect(database: path, mode: :readonly) - select_query = "select id, stuff from test order by id asc" - {:ok, statement} = Sqlite3.prepare(ro_state.db, select_query) - {:row, columns} = Sqlite3.step(ro_state.db, statement) +# select_query = "select id, stuff from test order by id asc" +# {:ok, statement} = Sqlite3.prepare(ro_state.db, select_query) +# {:row, columns} = Sqlite3.step(ro_state.db, statement) - assert [1, "This is a test"] == columns +# assert [1, "This is a test"] == columns - # Readonly connection cannot insert - assert {:error, "attempt to write a readonly database"} == - Sqlite3.execute(ro_state.db, insert_value_query) - end - end +# # Readonly connection cannot insert +# assert {:error, "attempt to write a readonly database"} == +# Sqlite3.execute(ro_state.db, insert_value_query) +# end +# end - defp get_pragma(db, pragma_name) do - {:ok, statement} = Sqlite3.prepare(db, "PRAGMA #{pragma_name}") +# defp get_pragma(db, pragma_name) do +# {:ok, statement} = Sqlite3.prepare(db, "PRAGMA #{pragma_name}") - case Sqlite3.fetch_all(db, statement) do - {:ok, [[value]]} -> {:ok, value} - _ -> :error - end - end +# case Sqlite3.fetch_all(db, statement) do +# {:ok, [[value]]} -> {:ok, value} +# _ -> :error +# end +# end - describe ".disconnect/2" do - test "disconnects a database that was never connected" do - conn = %Connection{db: nil, path: nil} +# describe ".disconnect/2" do +# test "disconnects a database that was never connected" do +# conn = %Connection{db: nil, path: nil} - assert :ok == Connection.disconnect(nil, conn) - end +# assert :ok == Connection.disconnect(nil, conn) +# end - test "disconnects a connected database" do - {:ok, conn} = Connection.connect(database: :memory) +# test "disconnects a connected database" do +# {:ok, conn} = Connection.connect(database: :memory) - assert :ok == Connection.disconnect(nil, conn) - end - end +# assert :ok == Connection.disconnect(nil, conn) +# end +# end - describe ".handle_execute/4" do - test "returns records" do - path = Temp.path!() +# describe ".handle_execute/4" do +# test "returns records" do +# path = Temp.path!() - {:ok, db} = Sqlite3.open(path) +# {:ok, db} = Sqlite3.open(path) - :ok = - Sqlite3.execute(db, "create table users (id integer primary key, name text)") +# :ok = +# Sqlite3.execute(db, "create table users (id integer primary key, name text)") - :ok = Sqlite3.execute(db, "insert into users (id, name) values (1, 'Jim')") - :ok = Sqlite3.execute(db, "insert into users (id, name) values (2, 'Bob')") - :ok = Sqlite3.execute(db, "insert into users (id, name) values (3, 'Dave')") - :ok = Sqlite3.execute(db, "insert into users (id, name) values (4, 'Steve')") - Sqlite3.close(db) +# :ok = Sqlite3.execute(db, "insert into users (id, name) values (1, 'Jim')") +# :ok = Sqlite3.execute(db, "insert into users (id, name) values (2, 'Bob')") +# :ok = Sqlite3.execute(db, "insert into users (id, name) values (3, 'Dave')") +# :ok = Sqlite3.execute(db, "insert into users (id, name) values (4, 'Steve')") +# Sqlite3.close(db) - {:ok, conn} = Connection.connect(database: path) +# {:ok, conn} = Connection.connect(database: path) - {:ok, _query, result, _conn} = - %Query{statement: "select * from users where id < ?"} - |> Connection.handle_execute([4], [], conn) +# {:ok, _query, result, _conn} = +# %Query{statement: "select * from users where id < ?"} +# |> Connection.handle_execute([4], [], conn) - assert result.command == :execute - assert result.columns == ["id", "name"] - assert result.rows == [[1, "Jim"], [2, "Bob"], [3, "Dave"]] +# assert result.command == :execute +# assert result.columns == ["id", "name"] +# assert result.rows == [[1, "Jim"], [2, "Bob"], [3, "Dave"]] - File.rm(path) - end +# File.rm(path) +# end - test "returns correctly for empty result" do - path = Temp.path!() +# test "returns correctly for empty result" do +# path = Temp.path!() - {:ok, db} = Sqlite3.open(path) +# {:ok, db} = Sqlite3.open(path) - :ok = - Sqlite3.execute(db, "create table users (id integer primary key, name text)") +# :ok = +# Sqlite3.execute(db, "create table users (id integer primary key, name text)") - Sqlite3.close(db) +# Sqlite3.close(db) - {:ok, conn} = Connection.connect(database: path) +# {:ok, conn} = Connection.connect(database: path) - {:ok, _query, result, _conn} = - %Query{ - statement: "UPDATE users set name = 'wow' where id = 1", - command: :update - } - |> Connection.handle_execute([], [], conn) +# {:ok, _query, result, _conn} = +# %Query{ +# statement: "UPDATE users set name = 'wow' where id = 1", +# command: :update +# } +# |> Connection.handle_execute([], [], conn) - assert result.rows == nil +# assert result.rows == nil - {:ok, _query, result, _conn} = - %Query{ - statement: "UPDATE users set name = 'wow' where id = 5 returning *", - command: :update - } - |> Connection.handle_execute([], [], conn) +# {:ok, _query, result, _conn} = +# %Query{ +# statement: "UPDATE users set name = 'wow' where id = 5 returning *", +# command: :update +# } +# |> Connection.handle_execute([], [], conn) - assert result.rows == [] +# assert result.rows == [] - File.rm(path) - end +# File.rm(path) +# end - test "returns timely and in order for big data sets" do - path = Temp.path!() +# test "returns timely and in order for big data sets" do +# path = Temp.path!() - {:ok, db} = Sqlite3.open(path) +# {:ok, db} = Sqlite3.open(path) - :ok = - Sqlite3.execute(db, "create table users (id integer primary key, name text)") +# :ok = +# Sqlite3.execute(db, "create table users (id integer primary key, name text)") - users = - Enum.map(1..10_000, fn i -> - [i, "User-#{i}"] - end) +# users = +# Enum.map(1..10_000, fn i -> +# [i, "User-#{i}"] +# end) - users - |> Enum.chunk_every(20) - |> Enum.each(fn chunk -> - values = Enum.map_join(chunk, ", ", fn [id, name] -> "(#{id}, '#{name}')" end) - Sqlite3.execute(db, "insert into users (id, name) values #{values}") - end) +# users +# |> Enum.chunk_every(20) +# |> Enum.each(fn chunk -> +# values = Enum.map_join(chunk, ", ", fn [id, name] -> "(#{id}, '#{name}')" end) +# Sqlite3.execute(db, "insert into users (id, name) values #{values}") +# end) - :ok = Exqlite.Sqlite3.close(db) +# :ok = Exqlite.Sqlite3.close(db) - {:ok, conn} = Connection.connect(database: path) +# {:ok, conn} = Connection.connect(database: path) - {:ok, _query, result, _conn} = - Connection.handle_execute( - %Exqlite.Query{ - statement: "SELECT * FROM users" - }, - [], - [timeout: 1], - conn - ) +# {:ok, _query, result, _conn} = +# Connection.handle_execute( +# %Exqlite.Query{ +# statement: "SELECT * FROM users" +# }, +# [], +# [timeout: 1], +# conn +# ) - assert result.command == :execute - assert length(result.rows) == 10_000 - assert users == result.rows +# assert result.command == :execute +# assert length(result.rows) == 10_000 +# assert users == result.rows - File.rm(path) - end - end +# File.rm(path) +# end +# end - describe ".handle_prepare/3" do - test "returns a prepared query" do - {:ok, conn} = Connection.connect(database: :memory) +# describe ".handle_prepare/3" do +# test "returns a prepared query" do +# {:ok, conn} = Connection.connect(database: :memory) - {:ok, _query, _result, conn} = - %Query{statement: "create table users (id integer primary key, name text)"} - |> Connection.handle_execute([], [], conn) +# {:ok, _query, _result, conn} = +# %Query{statement: "create table users (id integer primary key, name text)"} +# |> Connection.handle_execute([], [], conn) - {:ok, query, conn} = - %Query{statement: "select * from users where id < ?"} - |> Connection.handle_prepare([], conn) +# {:ok, query, conn} = +# %Query{statement: "select * from users where id < ?"} +# |> Connection.handle_prepare([], conn) - assert conn - assert query - assert query.ref - assert query.statement - end +# assert conn +# assert query +# assert query.ref +# assert query.statement +# end - test "users table does not exist" do - {:ok, conn} = Connection.connect(database: :memory) +# test "users table does not exist" do +# {:ok, conn} = Connection.connect(database: :memory) - {:error, error, _state} = - %Query{statement: "select * from users where id < ?"} - |> Connection.handle_prepare([], conn) +# {:error, error, _state} = +# %Query{statement: "select * from users where id < ?"} +# |> Connection.handle_prepare([], conn) - assert error.message == "no such table: users" - end - end +# assert error.message == "no such table: users" +# end +# end - describe ".checkout/1" do - test "checking out an idle connection" do - {:ok, conn} = Connection.connect(database: :memory) +# describe ".checkout/1" do +# test "checking out an idle connection" do +# {:ok, conn} = Connection.connect(database: :memory) - {:ok, conn} = Connection.checkout(conn) - assert conn.status == :busy - end +# {:ok, conn} = Connection.checkout(conn) +# assert conn.status == :busy +# end - test "checking out a busy connection" do - {:ok, conn} = Connection.connect(database: :memory) - conn = %{conn | status: :busy} +# test "checking out a busy connection" do +# {:ok, conn} = Connection.connect(database: :memory) +# conn = %{conn | status: :busy} - {:disconnect, error, _conn} = Connection.checkout(conn) +# {:disconnect, error, _conn} = Connection.checkout(conn) - assert error.message == "Database is busy" - end - end +# assert error.message == "Database is busy" +# end +# end - describe ".ping/1" do - test "returns the state passed unchanged" do - {:ok, conn} = Connection.connect(database: :memory) +# describe ".ping/1" do +# test "returns the state passed unchanged" do +# {:ok, conn} = Connection.connect(database: :memory) - assert {:ok, conn} == Connection.ping(conn) - end - end +# assert {:ok, conn} == Connection.ping(conn) +# end +# end - describe ".handle_close/3" do - test "releases the underlying prepared statement" do - {:ok, conn} = Connection.connect(database: :memory) +# describe ".handle_close/3" do +# test "releases the underlying prepared statement" do +# {:ok, conn} = Connection.connect(database: :memory) - {:ok, query, _result, conn} = - %Query{statement: "create table users (id integer primary key, name text)"} - |> Connection.handle_execute([], [], conn) +# {:ok, query, _result, conn} = +# %Query{statement: "create table users (id integer primary key, name text)"} +# |> Connection.handle_execute([], [], conn) - assert {:ok, nil, conn} == Connection.handle_close(query, [], conn) - - {:ok, query, conn} = - %Query{statement: "select * from users where id < ?"} - |> Connection.handle_prepare([], conn) - - assert {:ok, nil, conn} == Connection.handle_close(query, [], conn) - end - end -end +# assert {:ok, nil, conn} == Connection.handle_close(query, [], conn) + +# {:ok, query, conn} = +# %Query{statement: "select * from users where id < ?"} +# |> Connection.handle_prepare([], conn) + +# assert {:ok, nil, conn} == Connection.handle_close(query, [], conn) +# end +# end +# end diff --git a/test/exqlite/extensions_test.exs b/test/exqlite/extensions_test.exs index c0e757b7..e8c77dd0 100644 --- a/test/exqlite/extensions_test.exs +++ b/test/exqlite/extensions_test.exs @@ -9,16 +9,15 @@ defmodule Exqlite.ExtensionsTest do :ok = Basic.enable_load_extension(conn) {:ok, [[nil]], _} = - Basic.load_extension(conn, ExSqlean.path_for("re")) |> Basic.rows() + Basic.load_extension(conn, ExSqlean.path_for("re")) {:ok, [[1]], _} = Basic.exec(conn, "select regexp_like('the year is 2021', '2021')") - |> Basic.rows() :ok = Basic.disable_load_extension(conn) {:error, "not authorized"} = - Basic.load_extension(conn, ExSqlean.path_for("re")) |> Basic.rows() + Basic.load_extension(conn, ExSqlean.path_for("re")) end test "works for 're' (regex)" do @@ -28,15 +27,13 @@ defmodule Exqlite.ExtensionsTest do :ok = Basic.enable_load_extension(conn) {:ok, [[nil]], _} = - Basic.load_extension(conn, ExSqlean.path_for("re")) |> Basic.rows() + Basic.load_extension(conn, ExSqlean.path_for("re")) {:ok, [[0]], _} = Basic.exec(conn, "select regexp_like('the year is 2021', '2k21')") - |> Basic.rows() {:ok, [[1]], _} = Basic.exec(conn, "select regexp_like('the year is 2021', '2021')") - |> Basic.rows() end test "stats extension" do @@ -49,7 +46,6 @@ defmodule Exqlite.ExtensionsTest do {:ok, [[50.5]], ["median(value)"]} = Basic.exec(conn, "select median(value) from generate_series(1, 100)") - |> Basic.rows() end end end diff --git a/test/exqlite/integration_test.exs b/test/exqlite/integration_test.exs index e860cc90..3ec396a8 100644 --- a/test/exqlite/integration_test.exs +++ b/test/exqlite/integration_test.exs @@ -1,201 +1,201 @@ -defmodule Exqlite.IntegrationTest do - use ExUnit.Case - - alias Exqlite.Connection - alias Exqlite.Sqlite3 - alias Exqlite.Query - - test "simple prepare execute and close" do - path = Temp.path!() - {:ok, db} = Sqlite3.open(path) - :ok = Sqlite3.execute(db, "create table test (id ingeger primary key, stuff text)") - :ok = Sqlite3.close(db) - - {:ok, conn} = Connection.connect(database: path) - - {:ok, query, _} = - %Exqlite.Query{statement: "SELECT * FROM test WHERE id = :id"} - |> Connection.handle_prepare([2], conn) - - {:ok, _query, result, conn} = Connection.handle_execute(query, [2], [], conn) - assert result - - {:ok, _, conn} = Connection.handle_close(query, [], conn) - assert conn - - File.rm(path) - end - - test "transaction handling with concurrent connections" do - path = Temp.path!() - - {:ok, conn1} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - {:ok, conn2} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - {:ok, _result, conn1} = Connection.handle_begin([], conn1) - assert conn1.transaction_status == :transaction - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) - {:ok, _result, conn1} = Connection.handle_rollback([], conn1) - assert conn1.transaction_status == :idle - - {:ok, _result, conn2} = Connection.handle_begin([], conn2) - assert conn2.transaction_status == :transaction - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn2} = Connection.handle_execute(query, [], [], conn2) - {:ok, _result, conn2} = Connection.handle_rollback([], conn2) - assert conn2.transaction_status == :idle - - File.rm(path) - end - - test "handles busy correctly" do - path = Temp.path!() - - {:ok, conn1} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory, - busy_timeout: 0 - ) - - {:ok, conn2} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory, - busy_timeout: 0 - ) - - {:ok, _result, conn1} = Connection.handle_begin([mode: :immediate], conn1) - assert conn1.transaction_status == :transaction - {:disconnect, _err, conn2} = Connection.handle_begin([mode: :immediate], conn2) - assert conn2.transaction_status == :idle - {:ok, _result, conn1} = Connection.handle_commit([mode: :immediate], conn1) - assert conn1.transaction_status == :idle - {:ok, _result, conn2} = Connection.handle_begin([mode: :immediate], conn2) - assert conn2.transaction_status == :transaction - {:ok, _result, conn2} = Connection.handle_commit([mode: :immediate], conn2) - assert conn2.transaction_status == :idle - - Connection.disconnect(nil, conn1) - Connection.disconnect(nil, conn2) - - File.rm(path) - end - - test "transaction with interleaved connections" do - path = Temp.path!() - - {:ok, conn1} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - {:ok, conn2} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - {:ok, _result, conn1} = Connection.handle_begin([mode: :immediate], conn1) - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) - - # transaction overlap - {:ok, _result, conn2} = Connection.handle_begin([], conn2) - assert conn2.transaction_status == :transaction - {:ok, _result, conn1} = Connection.handle_rollback([], conn1) - assert conn1.transaction_status == :idle - - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn2} = Connection.handle_execute(query, [], [], conn2) - {:ok, _result, conn2} = Connection.handle_rollback([], conn2) - assert conn2.transaction_status == :idle - - Connection.disconnect(nil, conn1) - Connection.disconnect(nil, conn2) - - File.rm(path) - end - - test "transaction handling with single connection" do - path = Temp.path!() - - {:ok, conn1} = - Connection.connect( - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - {:ok, _result, conn1} = Connection.handle_begin([], conn1) - assert conn1.transaction_status == :transaction - - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) - {:ok, _result, conn1} = Connection.handle_rollback([], conn1) - assert conn1.transaction_status == :idle - - {:ok, _result, conn1} = Connection.handle_begin([], conn1) - assert conn1.transaction_status == :transaction - - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) - {:ok, _result, conn1} = Connection.handle_rollback([], conn1) - assert conn1.transaction_status == :idle - - File.rm(path) - end - - test "exceeding timeout" do - path = Temp.path!() - - {:ok, conn} = - DBConnection.start_link(Connection, - idle_interval: 5_000, - database: path, - journal_mode: :wal, - cache_size: -64_000, - temp_store: :memory - ) - - query = %Query{statement: "create table foo(id integer, val integer)"} - {:ok, _, _} = DBConnection.execute(conn, query, []) - - values = for i <- 1..10_001, do: "(#{i}, #{i})" - - query = %Query{ - statement: "insert into foo(id, val) values #{Enum.join(values, ",")}" - } - - {:ok, _, _} = DBConnection.execute(conn, query, []) - - query = %Query{statement: "select * from foo"} - {:ok, _, _} = DBConnection.execute(conn, query, [], timeout: 1) - - File.rm(path) - end -end +# defmodule Exqlite.IntegrationTest do +# use ExUnit.Case + +# alias Exqlite.Connection +# alias Exqlite.Sqlite3 +# alias Exqlite.Query + +# test "simple prepare execute and close" do +# path = Temp.path!() +# {:ok, db} = Sqlite3.open(path) +# :ok = Sqlite3.execute(db, "create table test (id ingeger primary key, stuff text)") +# :ok = Sqlite3.close(db) + +# {:ok, conn} = Connection.connect(database: path) + +# {:ok, query, _} = +# %Exqlite.Query{statement: "SELECT * FROM test WHERE id = :id"} +# |> Connection.handle_prepare([2], conn) + +# {:ok, _query, result, conn} = Connection.handle_execute(query, [2], [], conn) +# assert result + +# {:ok, _, conn} = Connection.handle_close(query, [], conn) +# assert conn + +# File.rm(path) +# end + +# test "transaction handling with concurrent connections" do +# path = Temp.path!() + +# {:ok, conn1} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# {:ok, conn2} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# {:ok, _result, conn1} = Connection.handle_begin([], conn1) +# assert conn1.transaction_status == :transaction +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) +# {:ok, _result, conn1} = Connection.handle_rollback([], conn1) +# assert conn1.transaction_status == :idle + +# {:ok, _result, conn2} = Connection.handle_begin([], conn2) +# assert conn2.transaction_status == :transaction +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn2} = Connection.handle_execute(query, [], [], conn2) +# {:ok, _result, conn2} = Connection.handle_rollback([], conn2) +# assert conn2.transaction_status == :idle + +# File.rm(path) +# end + +# test "handles busy correctly" do +# path = Temp.path!() + +# {:ok, conn1} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory, +# busy_timeout: 0 +# ) + +# {:ok, conn2} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory, +# busy_timeout: 0 +# ) + +# {:ok, _result, conn1} = Connection.handle_begin([mode: :immediate], conn1) +# assert conn1.transaction_status == :transaction +# {:disconnect, _err, conn2} = Connection.handle_begin([mode: :immediate], conn2) +# assert conn2.transaction_status == :idle +# {:ok, _result, conn1} = Connection.handle_commit([mode: :immediate], conn1) +# assert conn1.transaction_status == :idle +# {:ok, _result, conn2} = Connection.handle_begin([mode: :immediate], conn2) +# assert conn2.transaction_status == :transaction +# {:ok, _result, conn2} = Connection.handle_commit([mode: :immediate], conn2) +# assert conn2.transaction_status == :idle + +# Connection.disconnect(nil, conn1) +# Connection.disconnect(nil, conn2) + +# File.rm(path) +# end + +# test "transaction with interleaved connections" do +# path = Temp.path!() + +# {:ok, conn1} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# {:ok, conn2} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# {:ok, _result, conn1} = Connection.handle_begin([mode: :immediate], conn1) +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) + +# # transaction overlap +# {:ok, _result, conn2} = Connection.handle_begin([], conn2) +# assert conn2.transaction_status == :transaction +# {:ok, _result, conn1} = Connection.handle_rollback([], conn1) +# assert conn1.transaction_status == :idle + +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn2} = Connection.handle_execute(query, [], [], conn2) +# {:ok, _result, conn2} = Connection.handle_rollback([], conn2) +# assert conn2.transaction_status == :idle + +# Connection.disconnect(nil, conn1) +# Connection.disconnect(nil, conn2) + +# File.rm(path) +# end + +# test "transaction handling with single connection" do +# path = Temp.path!() + +# {:ok, conn1} = +# Connection.connect( +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# {:ok, _result, conn1} = Connection.handle_begin([], conn1) +# assert conn1.transaction_status == :transaction + +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) +# {:ok, _result, conn1} = Connection.handle_rollback([], conn1) +# assert conn1.transaction_status == :idle + +# {:ok, _result, conn1} = Connection.handle_begin([], conn1) +# assert conn1.transaction_status == :transaction + +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _query, _result, conn1} = Connection.handle_execute(query, [], [], conn1) +# {:ok, _result, conn1} = Connection.handle_rollback([], conn1) +# assert conn1.transaction_status == :idle + +# File.rm(path) +# end + +# test "exceeding timeout" do +# path = Temp.path!() + +# {:ok, conn} = +# DBConnection.start_link(Connection, +# idle_interval: 5_000, +# database: path, +# journal_mode: :wal, +# cache_size: -64_000, +# temp_store: :memory +# ) + +# query = %Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _, _} = DBConnection.execute(conn, query, []) + +# values = for i <- 1..10_001, do: "(#{i}, #{i})" + +# query = %Query{ +# statement: "insert into foo(id, val) values #{Enum.join(values, ",")}" +# } + +# {:ok, _, _} = DBConnection.execute(conn, query, []) + +# query = %Query{statement: "select * from foo"} +# {:ok, _, _} = DBConnection.execute(conn, query, [], timeout: 1) + +# File.rm(path) +# end +# end diff --git a/test/exqlite/query_test.exs b/test/exqlite/query_test.exs index 46e255fc..8a329d0e 100644 --- a/test/exqlite/query_test.exs +++ b/test/exqlite/query_test.exs @@ -4,21 +4,16 @@ defmodule Exqlite.QueryTest do setup :create_conn! test "table reader integration", %{conn: conn} do - assert {:ok, _} = Exqlite.query(conn, "CREATE TABLE tab(x integer, y text);", []) + assert {:ok, _} = + Exqlite.RWConnection.query(conn, "CREATE TABLE tab(x integer, y text);") assert {:ok, _} = - Exqlite.query( + Exqlite.RWConnection.query( conn, - "INSERT INTO tab(x, y) VALUES (1, 'a'), (2, 'b'), (3, 'c');", - [] + "INSERT INTO tab(x, y) VALUES (1, 'a'), (2, 'b'), (3, 'c');" ) - assert {:ok, res} = - Exqlite.query( - conn, - "SELECT * FROM tab;", - [] - ) + assert {:ok, res} = Exqlite.RWConnection.query(conn, "SELECT * FROM tab;") assert res |> Table.to_rows() |> Enum.to_list() == [ %{"x" => 1, "y" => "a"}, @@ -31,11 +26,9 @@ defmodule Exqlite.QueryTest do assert Enum.to_list(columns["y"]) == ["a", "b", "c"] end - defp create_conn!(_) do - opts = [database: "#{Temp.path!()}.db"] - - {:ok, pid} = start_supervised(Exqlite.child_spec(opts)) - - [conn: pid] + defp create_conn!(_context) do + path = Temp.path!() + on_exit(fn -> File.rm(path) end) + {:ok, conn: start_supervised!({Exqlite.RWConnection, database: "#{path}.db"})} end end diff --git a/test/exqlite/rw_connection_test.exs b/test/exqlite/rw_connection_test.exs new file mode 100644 index 00000000..ea6ccd14 --- /dev/null +++ b/test/exqlite/rw_connection_test.exs @@ -0,0 +1,186 @@ +defmodule Exqlite.RWConnectionTest do + use ExUnit.Case + alias Exqlite.{RWConnection, Result} + + setup do + path = Temp.path!() + on_exit(fn -> File.rm!(path) end) + {:ok, path: path} + end + + describe "start_link" do + test "starts connection process", %{path: path} do + assert {:ok, conn} = RWConnection.start_link(database: path) + + assert {:ok, %Result{columns: ["journal_mode"], rows: [["wal"]]}} = + RWConnection.read_query(conn, "pragma journal_mode") + + assert {:ok, %Exqlite.Result{columns: ["foreign_keys"], rows: [[1]]}} = + RWConnection.read_query(conn, "pragma foreign_keys") + end + end + + describe "query" do + setup :conn + + test "handles incorrect queries", %{conn: conn} do + assert {:error, %Exqlite.Error{message: "incomplete input"}} = + RWConnection.query(conn, "select") + + assert {:error, %Exqlite.Error{message: "arguments_wrong_length"}} = + RWConnection.query(conn, "select ? + ?", [1]) + + assert {:error, %Exqlite.Error{message: "arguments_wrong_length"}} = + RWConnection.query(conn, "select ? + ?", [1, 2, 3]) + + assert {:error, %Exqlite.Error{message: ~s[near "eh": syntax error]}} = + RWConnection.query(conn, "eh") + end + + test "locks connection", %{conn: conn} do + assert :sys.get_state(conn).lock == :none + + task = + Task.async(fn -> + RWConnection.query(conn, burn(1_000_000)) + end) + + :timer.sleep(100) + assert {writer_ref, statement_ref} = :sys.get_state(conn).lock + assert is_reference(writer_ref) + assert is_reference(statement_ref) + + Task.await(task) + assert :sys.get_state(conn).lock == :none + end + + test "queues commands when locked", %{conn: conn} do + Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + task = Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + + :timer.sleep(100) + assert :queue.len(:sys.get_state(conn).queue) == 2 + + Task.await(task) + assert :queue.len(:sys.get_state(conn).queue) == 0 + assert :sys.get_state(conn).lock == :none + end + + test "drains reads before locking", %{conn: conn} do + Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + task = Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + + :timer.sleep(100) + assert :sys.get_state(conn).lock == :drain + assert map_size(:sys.get_state(conn).reads) == 2 + assert :queue.len(:sys.get_state(conn).queue) == 1 + + Task.await(task) + + assert :sys.get_state(conn).lock == :none + assert map_size(:sys.get_state(conn).reads) == 0 + assert :queue.len(:sys.get_state(conn).queue) == 0 + end + end + + describe "read_query" do + setup :conn + + test "handles incorrect queries", %{conn: conn} do + assert {:error, %Exqlite.Error{message: "incomplete input"}} = + RWConnection.read_query(conn, "select") + + assert {:error, %Exqlite.Error{message: "arguments_wrong_length"}} = + RWConnection.read_query(conn, "select ? + ?", [1]) + + assert {:error, %Exqlite.Error{message: "arguments_wrong_length"}} = + RWConnection.read_query(conn, "select ? + ?", [1, 2, 3]) + + assert {:error, %Exqlite.Error{message: ~s[near "eh": syntax error]}} = + RWConnection.read_query(conn, "eh") + end + + test "doesn't lock connection", %{conn: conn} do + assert :sys.get_state(conn).lock == :none + + task = Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + + :timer.sleep(100) + assert :sys.get_state(conn).lock == :none + assert map_size(:sys.get_state(conn).reads) == 1 + + Task.await(task) + assert :sys.get_state(conn).lock == :none + assert map_size(:sys.get_state(conn).reads) == 0 + end + + test "concurrent", %{conn: conn} do + test = self() + + Task.async(fn -> + began = System.monotonic_time() + RWConnection.read_query(conn, burn(1_000_000)) + send(test, {:t1, began, System.monotonic_time()}) + end) + + Task.async(fn -> + began = System.monotonic_time() + RWConnection.read_query(conn, burn(1_000_000)) + send(test, {:t2, began, System.monotonic_time()}) + end) + + assert_receive {:t1, t1_began, t1_over}, :timer.seconds(5) + assert_receive {:t2, t2_began, t2_over}, :timer.seconds(5) + + assert t1_over > t2_began + assert t2_over > t1_began + end + + test "queues reads when connection is locked", %{conn: conn} do + Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + task = Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + + :timer.sleep(100) + assert {_, _} = _refs = :sys.get_state(conn).lock + assert map_size(:sys.get_state(conn).reads) == 0 + assert :queue.len(:sys.get_state(conn).queue) == 1 + + Task.await(task) + assert :sys.get_state(conn).lock == :none + assert map_size(:sys.get_state(conn).reads) == 0 + assert :queue.len(:sys.get_state(conn).queue) == 0 + end + + test "queues reads when draining", %{conn: conn} do + Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + Task.async(fn -> RWConnection.query(conn, burn(1_000_000)) end) + task = Task.async(fn -> RWConnection.read_query(conn, burn(1_000_000)) end) + + :timer.sleep(100) + assert :sys.get_state(conn).lock == :drain + assert map_size(:sys.get_state(conn).reads) == 1 + assert :queue.len(:sys.get_state(conn).queue) == 2 + + Task.await(task) + assert :sys.get_state(conn).lock == :none + assert map_size(:sys.get_state(conn).reads) == 0 + assert :queue.len(:sys.get_state(conn).queue) == 0 + end + end + + defp burn(count) when is_integer(count) do + """ + WITH RECURSIVE generate_series(value) AS ( + SELECT 1 UNION ALL SELECT value+1 FROM generate_series WHERE value < #{count} + ) + SELECT COUNT(*) FROM (SELECT value FROM generate_series); + """ + end + + defp conn(%{path: path}) do + {:ok, conn} = RWConnection.start_link(database: path) + {:ok, conn: conn} + end +end diff --git a/test/exqlite/timeout_segfault_test.exs b/test/exqlite/timeout_segfault_test.exs index 35776d50..dcff4052 100644 --- a/test/exqlite/timeout_segfault_test.exs +++ b/test/exqlite/timeout_segfault_test.exs @@ -1,41 +1,41 @@ -defmodule Exqlite.TimeoutSegfaultTest do - use ExUnit.Case +# defmodule Exqlite.TimeoutSegfaultTest do +# use ExUnit.Case - @moduletag :slow_test +# @moduletag :slow_test - setup do - {:ok, path} = Temp.path() - on_exit(fn -> File.rm(path) end) +# setup do +# {:ok, path} = Temp.path() +# on_exit(fn -> File.rm(path) end) - %{path: path} - end +# %{path: path} +# end - test "segfault", %{path: path} do - {:ok, conn} = - DBConnection.start_link(Exqlite.Connection, - busy_timeout: 50_000, - pool_size: 50, - timeout: 1, - database: path, - journal_mode: :wal - ) +# test "segfault", %{path: path} do +# {:ok, conn} = +# DBConnection.start_link(Exqlite.Connection, +# busy_timeout: 50_000, +# pool_size: 50, +# timeout: 1, +# database: path, +# journal_mode: :wal +# ) - query = %Exqlite.Query{statement: "create table foo(id integer, val integer)"} - {:ok, _, _} = DBConnection.execute(conn, query, []) +# query = %Exqlite.Query{statement: "create table foo(id integer, val integer)"} +# {:ok, _, _} = DBConnection.execute(conn, query, []) - values = for i <- 1..1000, do: "(#{i}, #{i})" - statement = "insert into foo(id, val) values #{Enum.join(values, ",")}" - insert_query = %Exqlite.Query{statement: statement} +# values = for i <- 1..1000, do: "(#{i}, #{i})" +# statement = "insert into foo(id, val) values #{Enum.join(values, ",")}" +# insert_query = %Exqlite.Query{statement: statement} - 1..5000 - |> Task.async_stream(fn _ -> - try do - DBConnection.execute(conn, insert_query, [], timeout: 1) - catch - kind, reason -> - IO.puts("Error: #{inspect(kind)} reason: #{inspect(reason)}") - end - end) - |> Stream.run() - end -end +# 1..5000 +# |> Task.async_stream(fn _ -> +# try do +# DBConnection.execute(conn, insert_query, [], timeout: 1) +# catch +# kind, reason -> +# IO.puts("Error: #{inspect(kind)} reason: #{inspect(reason)}") +# end +# end) +# |> Stream.run() +# end +# end