diff --git a/.gitignore b/.gitignore index 87e2ec70..4858b743 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ test/ct_log_cache logs/* .#* include/crypto_compat.hrl +deps/* diff --git a/README.md b/README.md index b603ba72..ac5d4fc5 100644 --- a/README.md +++ b/README.md @@ -284,6 +284,18 @@ or (after building emysql.app and the database, as explained above), start a_hel General Notes on using Emysql, including the actual specs: +#### Resolve Emysql dependency via rebar.config if possible + +```Erlang +{deps, +[ {'emysql', ".*", { git, "git://github.com/Eonblast/Emysql.git", "master" } } ] +``` +or some other branch +```Erlang +{deps, +[ {'emysql', ".*", { git, "git://github.com/jacktang/Emysql.git", "taodi" } } ] +``` + #### Starting an Application The Emysql driver is an Erlang gen-server, and, application. diff --git a/include/emysql.hrl b/include/emysql.hrl index 1320289f..f1fb97e2 100644 --- a/include/emysql.hrl +++ b/include/emysql.hrl @@ -34,9 +34,9 @@ port :: number(), database :: string(), encoding :: utf8 | latin1 | {utf8, utf8_unicode_ci} | {utf8, utf8_general_ci}, - available=queue:new() :: queue(), - locked=gb_trees:empty() :: gb_tree(), - waiting=queue:new() :: queue(), + available=queue:new() :: queue:queue(), + locked=gb_trees:empty() :: gb_tree:tree(), + waiting=queue:new() :: queue:queue(), start_cmds=[] :: string(), conn_test_period=0 :: number(), connect_timeout=infinity :: number() | infinity, @@ -180,3 +180,8 @@ % number of result set columns. -define(SERVER_STATUS_METADATA_CHANGED, 1024). +% Wrap the query result in erlang record +-define(AS_REC(RecAtom), [RecAtom, record_info(fields, RecAtom)]). +-define(AS_VAL, as_val). +% +-define(INPUT(RecAtom, Records), [Records, record_info(fields, RecAtom)]). diff --git a/rebar.config b/rebar.config index e61983a8..b21cb518 100644 --- a/rebar.config +++ b/rebar.config @@ -1,9 +1,20 @@ % -*- Erlang -*- % vim: ts=4 sw=4 et ft=erlang {erl_opts, [ - nowarn_deprecated_type + nowarn_deprecated_type, + {parse_transform, lager_transform} ]}. + {pre_hooks,[ {"linux|bsd|darwin|solaris", compile, "escript ./support/crypto_compat.escript"}, {"win32", compile, "escript.exe support/crypto_compat.escript"} ]}. + +{deps, [ + {'espec', ".*", { git, "git://github.com/lucaspiller/espec.git", "master" } }, + {'lager', ".*", { git, "git://github.com/basho/lager.git", "master" } }, + {'erl_utils', ".*", { git, "git://github.com/jacktang/erl_utils.git", "master" } } +]}. + + +{xref_warnings, true}. diff --git a/src/emysql.app.src b/src/emysql.app.src index 1eca61a5..230c9c24 100644 --- a/src/emysql.app.src +++ b/src/emysql.app.src @@ -10,6 +10,11 @@ {registered, [emysql_conn_mgr, emysql_sup]}, {applications, [kernel, stdlib, crypto]}, {env, [ - {default_timeout, 5000}, - {conn_test_period, 28000}]} + {default_timeout, 5000}, + {conn_test_period, 28000}, + {default_conn_opt, [ {host, "localhost"}, + {user, "root"}, + {database, test} + ]} + ]} ]}. diff --git a/src/emysql.erl b/src/emysql.erl index a373de9a..50b699ab 100644 --- a/src/emysql.erl +++ b/src/emysql.erl @@ -114,6 +114,8 @@ default_timeout/0 ]). +-export([transaction/2, transaction/3, abort/1]). + %% Result Conversion API -export([ as_dict/1, @@ -631,6 +633,16 @@ execute(PoolId, StmtName, Args, Timeout, nonblocking) when is_atom(StmtName), is unavailable end. +transaction(PoolId, Fun) -> + transaction(PoolId, Fun, default_timeout()). + +transaction(PoolId, Fun, Timeout) -> + Connection = emysql_conn_mgr:wait_for_connection(PoolId), + monitor_work(Connection, Timeout, [Connection, transaction, Fun]). + +abort(Reason) -> + throw(Reason). + %% @doc Return the field names of a result packet %% @end -spec field_names(Result) -> [Name] @@ -669,7 +681,7 @@ result_type(#eof_packet{}) -> eof. -spec as_dict(Result) -> Dict when Result :: #result_packet{}, - Dict :: dict(). + Dict :: dict:dict(). as_dict(Res) -> emysql_conv:as_dict(Res). diff --git a/src/emysql_conn.erl b/src/emysql_conn.erl index 01a5c74b..4cf0bad2 100644 --- a/src/emysql_conn.erl +++ b/src/emysql_conn.erl @@ -86,13 +86,37 @@ set_encoding(Connection, Encoding) -> canonicalize_query(Q) when is_binary(Q) -> Q; canonicalize_query(QL) when is_list(QL) -> iolist_to_binary(QL). +execute(Connection, transaction, Fun) when is_function(Fun) -> + case begin_transaction(Connection) of + #ok_packet{} -> + try Fun(Connection) of + Val -> + case commit_transaction(Connection) of + #ok_packet{} -> + {atomic, Val}; + #error_packet{} = ErrorPacket -> + {aborted, {commit_error, ErrorPacket}} + end + catch + throw:Reason -> + rollback_transaction(Connection), + {aborted, Reason}; + Class:Exception -> + rollback_transaction(Connection), + erlang:raise(Class, Exception, erlang:get_stacktrace()) + end; + #error_packet{} = ErrorPacket -> + {aborted, {begin_error, ErrorPacket}} + end; execute(Connection, StmtName, []) when is_atom(StmtName) -> prepare_statement(Connection, StmtName), StmtNameBin = atom_to_binary(StmtName, utf8), + execute_trace:execute(StmtName), Packet = <>, send_recv(Connection, Packet); execute(Connection, Query, []) -> QB = canonicalize_query(Query), + execute_trace:execute(raw_query, QB), Packet = <>, send_recv(Connection, Packet); execute(Connection, Query, Args) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) -> @@ -103,6 +127,7 @@ execute(Connection, Query, Args) when (is_list(Query) orelse is_binary(Query)) a OK when is_record(OK, ok_packet) -> ParamNamesBin = list_to_binary(string:join([[$@ | integer_to_list(I)] || I <- lists:seq(1, length(Args))], ", ")), % todo: utf8? Packet = <>, % todo: utf8? + execute_trace:execute(StmtName, Args), send_recv(Connection, Packet); Error -> Error @@ -126,6 +151,7 @@ prepare(Connection, Name, Statement) when is_atom(Name) -> prepare(Connection, atom_to_list(Name), Statement); prepare(Connection, Name, Statement) -> StatementBin = encode(Statement, binary), + execute_trace:prepare(Name, Statement), Packet = <>, % todo: utf8? case send_recv(Connection, Packet) of OK when is_record(OK, ok_packet) -> @@ -137,9 +163,22 @@ prepare(Connection, Name, Statement) -> unprepare(Connection, Name) when is_atom(Name)-> unprepare(Connection, atom_to_list(Name)); unprepare(Connection, Name) -> + execute_trace:unprepare(Name), Packet = <>, % todo: utf8? send_recv(Connection, Packet). +begin_transaction(Connection) -> + execute_trace:begin_transaction(), + emysql_conn:execute(Connection, <<"BEGIN">>, []). + +rollback_transaction(Connection) -> + execute_trace:rollback_transaction(), + emysql_conn:execute(Connection, <<"ROLLBACK">>, []). + +commit_transaction(Connection) -> + execute_trace:commit_transaction(), + emysql_conn:execute(Connection, <<"COMMIT">>, []). + open_n_connections(PoolId, N) -> case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools()) of {Pool, _} -> diff --git a/src/emysql_conn_mgr.erl b/src/emysql_conn_mgr.erl index b14a08d6..97536a06 100644 --- a/src/emysql_conn_mgr.erl +++ b/src/emysql_conn_mgr.erl @@ -40,7 +40,7 @@ -include("emysql.hrl"). --record(state, {pools, lockers = dict:new() :: dict()}). +-record(state, {pools, lockers = dict:new() :: dict:dict()}). %%==================================================================== %% API diff --git a/src/emysql_conv.erl b/src/emysql_conv.erl index 28af9dd8..2d4bd556 100644 --- a/src/emysql_conv.erl +++ b/src/emysql_conv.erl @@ -11,6 +11,7 @@ as_json/1, as_proplist/1, as_record/3, + as_record/5, as_record/4 ]). @@ -28,7 +29,7 @@ as_proplist(#result_packet{field_list=_Cols,rows=_Rows}) when is_list(_Cols), as_proplist(#result_packet{field_list=_Cols,rows=_Rows}) when is_list(_Cols), _Rows =:= [] -> []; -as_proplist(Res = #result_packet{field_list=Cols,rows=Rows}) when is_list(Cols), +as_proplist(#result_packet{field_list=Cols,rows=Rows} = Res) when is_list(Cols), is_list(Rows) -> Fields = emysql:field_names(Res), [begin @@ -36,7 +37,7 @@ as_proplist(Res = #result_packet{field_list=Cols,rows=Rows}) when is_list(Cols), end || R <- Rows]. %% @see emysql:as_record/1 -as_record(Result = #result_packet{}, RecordName, Fields, Fun) when is_atom(RecordName), is_list(Fields), is_function(Fun) -> +as_record(#result_packet{} = Result, RecordName, Fields, Fun, AccIn) when is_atom(RecordName), is_list(Fields), is_function(Fun) -> Columns = Result#result_packet.field_list, S = lists:seq(1, length(Columns)), @@ -50,13 +51,28 @@ as_record(Result = #result_packet{}, RecordName, Fields, Fun) when is_atom(Recor end end, Fs = [ F(FieldName) || FieldName <- Fields ], - F1 = fun(Row) -> - RecordData = [ Fx(Row) || Fx <- Fs ], - Fun(list_to_tuple([RecordName|RecordData])) - end, - [ F1(Row) || Row <- Result#result_packet.rows ]. -as_record(Result = #result_packet{}, RecordName, Fields) when is_atom(RecordName), is_list(Fields) -> + case AccIn of + undefined -> + F1 = fun(Row) -> + RecordData = [ Fx(Row) || Fx <- Fs ], + Fun(list_to_tuple([RecordName|RecordData])) + end, + [ F1(Row) || Row <- Result#result_packet.rows ]; + _ -> + lists:foldl(fun(Row, {RsAccIn, FunAccIn}) -> + RecordData = [ Fx(Row) || Fx <- Fs ], + Rec = list_to_tuple([RecordName|RecordData]), + NFunAccIn = Fun(Rec, FunAccIn), + {lists:append(RsAccIn, [Rec]), NFunAccIn} + end, {[], AccIn}, Result#result_packet.rows) + end. + +as_record(#result_packet{} = Result, RecordName, Fields, Fun) when is_atom(RecordName), is_list(Fields), is_function(Fun) -> + as_record(Result, RecordName, Fields, Fun, undefined). + + +as_record(#result_packet{} = Result, RecordName, Fields) when is_atom(RecordName), is_list(Fields) -> as_record(Result, RecordName, Fields, fun(A) -> A end). %% @see emysql:as_json/1 diff --git a/src/emysql_pool.erl b/src/emysql_pool.erl new file mode 100644 index 00000000..52cb07d8 --- /dev/null +++ b/src/emysql_pool.erl @@ -0,0 +1,182 @@ +%%%------------------------------------------------------------------- +%%% @author Jack Tang +%%% @copyright (C) 2014, Jack Tang +%%% @doc +%%% +%%% @end +%%% Created : 10 Apr 2014 by Jack Tang +%%%------------------------------------------------------------------- +-module(emysql_pool). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {pools}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +pool(Id) -> + gen_server:call(?SERVER, {pool, Id}). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link(MySqlOpts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [MySqlOpts], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initiates the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([undefined]) -> + {ok, #state{pools = []}}; +init([MySqlOpt]) when is_list(MySqlOpt) -> % MySqlOpt is proplist + Pools = start_pools(MySqlOpt), + {ok, #state{pools = Pools}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call({pool, Id}, _From, #state{pools = Pools}=State) -> + Reply = case lists:member(Id, Pools) of + true -> {ok, ok}; + false -> {error, not_found} + end, + {reply, Reply, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, #state{pools = Pools} = State) -> + lists:foreach( + fun(Pool) -> + emysql:remove_pool(Pool) + end, Pools), + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +start_pools(MySqlOpt) -> + Host = proplists:get_value(host, MySqlOpt, "localhost"), + Port = proplists:get_value(port, MySqlOpt, 3306), + UserName = proplists:get_value(user, MySqlOpt, "root"), + Password = proplists:get_value(password, MySqlOpt), + Encoding = proplists:get_value(encoding, MySqlOpt, utf8), + Database = proplists:get_value(database, MySqlOpt), + Pools = proplists:get_value(pools, MySqlOpt, []), + + lists:foldl( + fun({PoolName, PoolSize}, AccIn) -> + case emysql:add_pool(PoolName, + [{size, type_utils:any_to_integer(PoolSize)}, + {user, type_utils:any_to_list(UserName)}, + {password, type_utils:any_to_list(Password)}, + {host, type_utils:any_to_list(Host)}, + {port, type_utils:any_to_integer(Port)}, + {database, type_utils:any_to_list(Database)}, + {encoding, Encoding}] ) of + ok -> [PoolName | AccIn]; + {error, Reason} -> + io:format("EMysqlPool add pool<~p> failed: ~p", + [PoolName, Reason]), + {error, Reason} + end + end, [], Pools). + diff --git a/src/emysql_query.erl b/src/emysql_query.erl new file mode 100644 index 00000000..20d99a8b --- /dev/null +++ b/src/emysql_query.erl @@ -0,0 +1,320 @@ +%%%------------------------------------------------------------------- +%%% @author Jack Tang +%%% @copyright (C) 2014, Jack Tang +%%% @doc +%%% +%%% @end +%%% Created : 24 Apr 2014 by Jack Tang +%%%------------------------------------------------------------------- +-module(emysql_query). + +%% API +-export([find/2, find/3, find/4]). +-export([find_first/4]). +-export([find_each/4, find_each/5, find_each/6]). + +-include("emysql.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== + + +%%-------------------------------------------------------------------- +%% @doc +%% e.g: find(my_pool, ["SELECT name FROM users where age > ? and time > ?", 12, "2013-12-03" ]) +%% +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +find(ConnOrPool, [RawSql | Values]) -> + case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, RawSql, Values); + Pool -> + emysql:execute(Pool, RawSql, Values) + end. + +%%-------------------------------------------------------------------- +%% @doc +%% e.g: find(my_pool, users, [ {select, ["name", "age"]}, +%% {where, ["age > ? AND color in (?)", Age, "blue, red"]}, +%% {order, "id desc"}, +%% {limit, 100 }], ?AS_REC(user)) +%% or +%% +%% find(my_pool, users, [ {select, ["name", "age"]}, +%% {where, [{age, '>', Age}, {color, in, ["blue", "red"]} ]}, +%% {order, "id desc"}, +%% {limit, 100 }], ?AS_REC(user)) +%% +%% or +%% +%% find(my_pool, users, [ {select, ["count(unique name)"]}, +%% {where, [{age, '>', Age}]}, +%% {order, "id desc"}, +%% {limit, 100 }], ?AS_VAL) +%% +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +find(ConnOrPool, Table, SqlOptions) -> + {FindSql, CondVals} = build_sql(Table, SqlOptions), + case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, FindSql, CondVals); + Pool -> + emysql:execute(Pool, FindSql, CondVals) + end. +find(ConnOrPool, Table, SqlOptions, ?AS_VAL) -> + case find(ConnOrPool, Table, SqlOptions) of + #result_packet{rows = [[R] ]} -> R; + #result_packet{rows = Rs } -> + lists:foldl(fun([R], AccIn) -> + lists:append(AccIn, [R]) + end, [], Rs); + #error_packet{code = Code, msg = Msg} -> + throw({Code, Msg}) + end; + +find(ConnOrPool, Table, SqlOptions, [Rec, RecFields] = _AsRec) -> + Result = find(ConnOrPool, Table, SqlOptions), + emysql_conv:as_record(Result, Rec, RecFields). + + +%%-------------------------------------------------------------------- +%% @doc +%% e.g: find(my_pool, users, [ {select, ["name", "age"]}, +%% {where, ["age > ?", Age]}, +%% {order, "id desc"} ], ?AS_REC(user)) +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +find_first(ConnOrPool, Table, SqlOptions, ?AS_VAL) -> + NSqlOptions = proplists:delete(limit, SqlOptions), + find(ConnOrPool, Table, [{limit, 1} | NSqlOptions], ?AS_VAL); + +find_first(ConnOrPool, Table, SqlOptions, [_Rec, _RecFields] = AsRec) -> + NSqlOptions = proplists:delete(limit, SqlOptions), + case find(ConnOrPool, Table, [{limit, 1} | NSqlOptions], AsRec) of + [ ] -> undefined; + [Val] -> Val + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% +%% e.g: +%% Fun = fun(#user{} = U) -> +%% do_something_to(U) +%% end, +%% find_each(my_pool, users, ?AS_REC(user), Fun). +%% +%% find_each(my_pool, users, [ {select, ["name", "age"]}, +%% {where, ["age > ?", Age]}, +%% % {order, "id desc"}, +%% {order, [id, "DESC"] }, +%% {limit, 100 } ], 10, ?AS_REC(user), Fun). +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +find_each(ConnOrPool, Table, AsRec, Fun) -> + find_each(ConnOrPool, Table, [], 1000, AsRec, Fun, undefined). + +find_each(ConnOrPool, Table, AsRec, Fun, AccIn) -> + find_each(ConnOrPool, Table, [], 1000, AsRec, Fun, AccIn). + +find_each(ConnOrPool, Table, SqlOptions, AsRec, Fun, AccIn) -> + find_each(ConnOrPool, Table, SqlOptions, 1000, AsRec, Fun, AccIn). + +find_each(ConnOrPool, Table, SqlOptions, BatchSize, AsRec, Fun, AccIn) -> + BaseId = undefined, + + {[FindSql, FindCondVals], + [CountSql, CountCondVals], + [OField, OSort]} = build_sql(Table, SqlOptions, BatchSize, BaseId), + + Result = case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, CountSql, CountCondVals); + Pool -> + emysql:execute(Pool, CountSql, CountCondVals) + end, + case Result of + #result_packet{rows = [[Total]]} -> + Limit = proplists:get_value(limit, SqlOptions), + Remain = case {Limit, Total > Limit} of + {undefined, _} -> Total; + {_, true} -> Limit; + {_, false} -> Total + end, + %BaseId = case OSort of + % "ASC" -> 0; + % "DESC" -> infinite + % end, + %do_find_each(ConnOrPool, Table, FindSql, FindCondVals, BatchSize, + % AsRec, Fun, AccIn, Remain, BaseId); + do_find_each(ConnOrPool, Table, SqlOptions, BatchSize, AsRec, Fun, AccIn, Remain, BaseId); + #error_packet{code = Code, msg = Msg} -> + throw({Code, Msg}); + _ -> + ok + end. + + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +build_sql(Table, SqlOptions) -> + {[FindSql, FindCondVals], _, _} = build_sql(Table, SqlOptions, undefined, undefined), + {FindSql, FindCondVals}. + +build_sql(Table, SqlOptions, BatchSize, BaseId) -> + SelectFields = + case proplists:get_value(select, SqlOptions) of + undefined -> "*"; + V when is_list(V) -> string:join(V, ","); + _ -> throw(bad_sql_select) + end, + + {Where, CondVals} = case proplists:get_value(where, SqlOptions) of + undefined -> {"", [ ]}; + [ ] -> {"", [ ]}; + [ Head | _T] = L when is_tuple(Head) -> + {Stmt, Vals} = + lists:foldl( + fun(Item, {StmtAcc, ValAcc}) -> + case Item of + {K, between, [V1, V2]} -> + S = type_utils:any_to_list(K) ++ " BETWEEN ? AND ? ", + {[S | StmtAcc], [V1, V2 | ValAcc]}; + {K, in, V3} when is_list(V3) -> + InStmt = string:join(lists:duplicate(length(V3), "?"), ","), + S = type_utils:any_to_list(K) ++ " IN (" ++ InStmt ++ ") ", + + NV3 = lists:foldl(fun(V3Item, ValAcc2) -> + [V3Item | ValAcc2] + end, ValAcc, V3), + {[S | StmtAcc], NV3 }; + {K, OP, V4} -> + S = string:join([type_utils:any_to_list(K), + type_utils:any_to_list(OP), + "?"], " "), + {[S | StmtAcc], [V4 | ValAcc] }; + {K, V4} -> + S = type_utils:any_to_list(K) ++ " = ?", + {[S | StmtAcc], [V4 | ValAcc]}; + _ -> + {StmtAcc, ValAcc} + end + end, {[], []}, L), + {"WHERE " ++ string:join(Stmt, " AND "), Vals}; + [ Cond ] -> {"WHERE " ++ Cond, [ ]}; + [Cond | Values] -> {"WHERE " ++ Cond, Values} + end, + + [Order, OField, OSort] = + case proplists:get_value(order, SqlOptions) of + undefined -> ["", id, "ASC"]; + [OrderField, OrderSort] -> + OrderStr1 = "ORDER BY " ++ type_utils:any_to_list(OrderField) ++ " " ++ type_utils:any_to_list(OrderSort), + [OrderStr1, OrderField, OrderSort]; + [[OrderField, OrderSort] | _] = OrderOpt -> + MultiOrder = lists:foldl(fun([OField0, OSort0], OAcc) -> + lists:append(OAcc, [type_utils:any_to_list(OField0) ++ " " ++ type_utils:any_to_list(OSort0)]) + end, [], OrderOpt), + OrderStr2 = "ORDER BY " ++ string:join(MultiOrder, ", "), + [OrderStr2, OrderField, OrderSort]; %% TODO: should return multi-order + OrderBy -> + OrderStr3 = "ORDER BY " ++ type_utils:any_to_list(OrderBy), + [OrderStr3, undefined, undefined] + end, + + {Where2, CondVals2} = + case {BaseId, OSort, Where} of + {infinate, _, _} -> {Where, CondVals}; + {undefined, _, _} -> {Where, CondVals}; + {_, "ASC", ""} -> {"WHERE " ++ type_utils:any_to_list(OField) ++ " > ?", [BaseId]}; + {_, "DESC", ""} -> {"WHERE " ++ type_utils:any_to_list(OField) ++ " < ?", [BaseId]}; + {_, "ASC", _} -> + {Where ++ " AND " ++ type_utils:any_to_list(OField) ++ " > ?", + lists:append(CondVals, [BaseId])}; + {_, "DESC", _ } -> + {Where ++ " AND " ++ type_utils:any_to_list(OField) ++ " < ?", + lists:append(CondVals, [BaseId])} + end, + + Limit = + case BatchSize of + undefined -> + case proplists:get_value(limit, SqlOptions) of + undefined -> ""; + [LV1, LV2] -> + "LIMIT " ++ type_utils:any_to_list(LV1) ++ ", " ++ type_utils:any_to_list(LV2); + LV3 -> "LIMIT " ++ type_utils:any_to_list(LV3) + end; + _ -> + "LIMIT " ++ type_utils:any_to_list(BatchSize) + end, + + Table2 = type_utils:any_to_list(Table), + FindSql = string:join(["SELECT", SelectFields, "FROM", Table2, Where2, Order, Limit], " "), + CountSql = string:join(["SELECT COUNT(1) FROM", Table2, Where], " "), + {[FindSql, CondVals2], [CountSql, CondVals], [OField, OSort]}. + + +do_find_each(ConnOrPool, Table, SqlOptions, BatchSize, [Rec, RecFields] = AsRec, + Fun, AccIn, Remain, BaseId) -> + {[Sql, CondVals], + [_CountSql, _CountCondVals], + [OrderField, _OSort]} = build_sql(Table, SqlOptions, BatchSize, BaseId), + case OrderField of + undefined -> + throw(order_field_is_undefined); + _ -> + Result = case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, Sql, CondVals); + Pool -> + emysql:execute(Pool, Sql, CondVals) + end, + case Result of + #result_packet{rows = Rows} -> + case emysql_conv:as_record(Result, Rec, RecFields, Fun, AccIn) of + {[], NAccIn} -> NAccIn; + {Rs, NAccIn} -> + LastRow = lists_utils:last(Rs), + NextId = tuple_utils:value(OrderField, LastRow, RecFields), + case Remain - BatchSize > 0 of + true -> + do_find_each(ConnOrPool, Table, SqlOptions, BatchSize, AsRec, + Fun, NAccIn, (Remain - BatchSize), NextId); + false -> + NAccIn + end; + Rs when is_list(Rs) -> + LastRow = lists_utils:last(Rs), + NextId = case erlang:is_record(LastRow, Rec) of + true -> + tuple_utils:value(OrderField, LastRow, RecFields); + _ -> + undefined + end, + case Remain - BatchSize > 0 of + true -> + do_find_each(ConnOrPool, Table, SqlOptions, BatchSize, AsRec, + Fun, undefined, (Remain - BatchSize), NextId); + false -> + ok + end + end; + _ -> + AccIn + end + + end. diff --git a/src/emysql_saver.erl b/src/emysql_saver.erl new file mode 100644 index 00000000..4143a3be --- /dev/null +++ b/src/emysql_saver.erl @@ -0,0 +1,365 @@ +%%%------------------------------------------------------------------- +%%% @author Jack Tang +%%% @copyright (C) 2014, Jack Tang +%%% @doc +%%% +%%% @end +%%% Created : 28 Apr 2014 by Jack Tang +%%%------------------------------------------------------------------- +-module(emysql_saver). + +%% API +-export([save/3, save/4]). +-export([find_or_create_by/4]). +-export([update_or_create_by/4, + update_or_create_by/5]). + +-include("emysql.hrl"). +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Options = [{insert_ignore, true}, +%% {batch_size, 1000}, +%% {update_attrs, [data] }] +%% emysql_saver:save(pool, test, ?INPUT(#test{data = "hello"}, Options). +%% +%% or +%% +%% Options = [{insert_ignore, true}, +%% {batch_size, 1000}, +%% {unique_fields, [out_id, login] } ] +%% emysql_saver:save(pool, test, ?INPUT(#test{data = "hello"}, Options). +%% +%% or +%% +%% emysql_saver:save(pool, test, ?INPUT(#test{data = "hello"})). +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +save(_ConnOrPool, _Table, undefined) -> + ok; +save(_ConnOrPool, _Table, [[], _]) -> + ok; +save(ConnOrPool, Table, RecordInput) -> + DefOpt = [{auto_id, true}, + {batch_size, 1000}], + save(ConnOrPool, Table, RecordInput, DefOpt). + +save(ConnOrPool, Table, [Record0, Fields] = _RecordInput, Options) -> + [FieldPK | _ ] = Fields, + Records = case Record0 of + V1 when is_list(V1) -> V1; + V2 -> [V2] + end, + case build_sql(Table, Records, Fields, Options) of + {insert, Sqls} -> + X = lists:foldl( + fun({Sql, Values}, AccIn) -> + Result = case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, Sql, Values); + Pool -> + emysql:execute(Pool, Sql, Values) + end, + case Result of + #ok_packet{affected_rows = Affected, insert_id = InsertId} -> + case {Affected, length(Records), FieldPK} of + {1, 1, id} -> + [Record] = Records, + [setelement(2, Record, InsertId) | AccIn]; + _ -> AccIn + end; + #error_packet{code = Code, msg = Msg} -> + throw({Code, Msg}) + end + end, [], Sqls), + case X of + [] -> ok; + [V] -> V + end; + {update, {Sql, Values} } -> + Result = case ConnOrPool of + #emysql_connection{} = Conn -> + emysql_conn:execute(Conn, Sql, Values); + Pool -> + emysql:execute(Pool, Sql, Values) + end, + case Result of + #ok_packet{affected_rows = Affected} -> + case {Affected, length(Records)} of + {1, 1} -> + [Record] = Records, + Record; + _ -> Records + end; + #error_packet{code = Code, msg = Msg} -> + throw({Code, Msg}) + end + end. + +%%-------------------------------------------------------------------- +%% @doc +%% +%% emysql_saver:find_or_create_by(pool, test, +%% ["select * from test where data = ?", "hello"], +%% fun() -> +%% ?INPUT(test, #test{data = "find me"}) +%% end) +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +find_or_create_by(ConnOrPool, Table, FindSql, CreateFun) -> + Result = emysql_query:find(ConnOrPool, FindSql), + case CreateFun() of + [Record, Fields] -> + case Result of + #result_packet{rows = Rows} when length(Rows) =:= 0 -> + save(ConnOrPool, Table, [Record, Fields]); + #result_packet{} -> + [R | _] = emysql_conv:as_record(Result, element(1, Record), Fields), + R; + Other -> + Other + end; + _ -> + io:format("Forget to wrap return value using ?INPUT?", []) + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Options = [{ignore_null, true}] +%% emysql_saver:update_or_create_by(pool, test, +%% ["select * from test where data = ?", "hello"], +%% fun() -> +%% ?INPUT(test, #test{data = "find me"}) +%% end, Options) +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +update_or_create_by(ConnOrPool, Table, FindSql, Fun) -> + update_or_create_by(ConnOrPool, Table, FindSql, Fun, []). + +update_or_create_by(ConnOrPool, Table, FindSql, Fun, Options) -> + Result = emysql_query:find(ConnOrPool, FindSql), + case Fun() of + [Record, Fields] -> + case Result of + #result_packet{rows = Rows} when length(Rows) =:= 0 -> + save(ConnOrPool, Table, [Record, Fields]); + #result_packet{} -> + case emysql_conv:as_record(Result, element(1, Record), Fields) of + [RecInDb] -> + case merge_rec(RecInDb, Record, Fields, Options) of + {changed, MergedRec} -> + save(ConnOrPool, Table, [MergedRec, Fields]); + _ -> + RecInDb + end; + L -> + io:format("Only one record expectedh here, but now there are ~p", + [length(L)]), + throw({error, many_records}) + end; + Other -> + Other + end; + _ -> + io:format("Forget to wrap return value using ?INPUT?", []) + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +%% insert into table(f1, f2, f3) values(v1, v2, v3); +%% insert into table(f1, f2, f3) values(v11, v12, v13), (v21, v22, v23), ...; +%% update table set f1 = ? +build_sql(_Table, [] = _Records, _Fields, _Options) -> + {error, no_input}; + +build_sql(Table, [Record | _Tail] = Records, Fields, Options) -> + case element(2, Record) of + undefined -> % insert + {UpdateFields, UpdateFIndex, _UVals} = select_update_fields_index(insert, Record, + Fields, Options), + + SqlAndValues = generate_insert_sql(Table, UpdateFields, UpdateFIndex, Records, Options), + {insert, SqlAndValues}; + _ -> % update + {UpdateFields, _FIndex, UpdateVals} = select_update_fields_index(update, Record, + Fields, Options), + [PK | _] = Fields, + PKPair = {type_utils:any_to_list(PK), element(2, Record)}, + SqlAndValues = generate_update_sql(Table, UpdateFields, UpdateVals, PKPair), + {update, SqlAndValues} + end. + +select_update_fields_index(InsertOrUpdate, Record, Fields, Options) -> + {UAOptIsSet, UpdateAttrs} = + case proplists:get_value(update_attrs, Options) of + undefined -> {no, []}; + V -> {yes, V} + end, + + AutoId = case proplists:get_value(auto_id, Options) of + true -> true; + _ -> false + end, + + {_, UpdateFields, UpdateFIndex, UpdateVals} = + lists:foldl( + fun(Field, {Index, EffectedFields, EffectedIndex, Vals}) -> + case {Field, AutoId, UAOptIsSet, InsertOrUpdate, lists:member(Field, UpdateAttrs)} of + {id, true, _, _, _} -> + {Index + 1, EffectedFields, EffectedIndex, Vals}; + {created_at, _, _, insert, _} -> + NEffectedFields = [type_utils:any_to_list(Field) | EffectedFields], + NEffectedIndex = [ timestamp | EffectedIndex], + Val = datetime_utils:localtime_as_string(), + {Index + 1, NEffectedFields, NEffectedIndex, [Val | Vals]}; + {created_at, _, _, update, _} -> + {Index + 1, EffectedFields, EffectedIndex, Vals}; + {updated_at, _, _, _, _} -> + NEffectedFields = [type_utils:any_to_list(Field) | EffectedFields], + NEffectedIndex = [ timestamp | EffectedIndex], + Val = datetime_utils:localtime_as_string(), + {Index + 1, NEffectedFields, NEffectedIndex, [Val | Vals]}; + {_, _, no, _, _} -> + NEffectedFields = [type_utils:any_to_list(Field) | EffectedFields], + NEffectedIndex = [(Index + 1) | EffectedIndex], + Val = element(Index + 1, Record), + {Index + 1, NEffectedFields, NEffectedIndex, [Val | Vals]}; + {_, _, yes, _, true} -> + NEffectedFields = [type_utils:any_to_list(Field) | EffectedFields], + NEffectedIndex = [(Index + 1) | EffectedIndex], + Val = element(Index + 1, Record), + {Index + 1, NEffectedFields, NEffectedIndex, [Val | Vals]}; + {_, _, yes, _, false} -> + {Index + 1, EffectedFields, EffectedIndex, Vals} + end + end, {1, [], [], []}, Fields), + {UpdateFields, UpdateFIndex, UpdateVals}. + +generate_insert_sql(Table, UpdateFields, UpdateFIndex, Records, Options) -> + BatchSize = proplists:get_value(batch_size, Options, 1000), + SqlHead = case proplists:get_value(insert_ignore, Options, false) of + true -> + "INSERT IGNORE INTO " ++ type_utils:any_to_list(Table); + _ -> + "INSERT INTO " ++ type_utils:any_to_list(Table) + end, + SqlFields = string:join(lists:reverse(UpdateFields), ","), + ValuesInSql = "(" ++ string:join(lists:duplicate(length(UpdateFields), "?"), ",") ++ ")", + + BatchCount = length(Records) div BatchSize, + BatchRemainSize = length(Records) rem BatchSize, + + UniqueFields = proplists:get_value(unique_fields, Options, []), + + Batch1Sql = case BatchCount of + 0 -> undefined; + _ -> + SqlValues1 = string:join(lists:duplicate(BatchSize, ValuesInSql), ","), + Sql1 = string:join([SqlHead, "(", SqlFields, ") VALUES ", SqlValues1], " "), + case UniqueFields of + [] -> Sql1; + Fs1 when is_list(Fs1) -> + OnDupSubSql1 = lists:foldl( + fun(UniqueField, OnDupAcc) -> + UFieldStr = type_utils:any_to_list(UniqueField), + lists:append(OnDupAcc, [UFieldStr ++ "= VALUES(" ++ UFieldStr ++ ")"]) + end, [], UniqueFields), + Sql1 ++ " ON DUPLICATE KEY UPDATE " ++ string:join(OnDupSubSql1, ", "); + _ -> Sql1 + end + end, + Batch2Sql = case BatchRemainSize of + 0 -> undefined; + _ -> + SqlValues2 = string:join(lists:duplicate(BatchRemainSize, ValuesInSql), ","), + Sql2 = string:join([SqlHead, "(", SqlFields, ") VALUES ", SqlValues2], " "), + case UniqueFields of + [] -> Sql2; + Fs2 when is_list(Fs2) -> + OnDupSubSql2 = lists:foldl( + fun(UniqueField, OnDupAcc) -> + UFieldStr = type_utils:any_to_list(UniqueField), + lists:append(OnDupAcc, [UFieldStr ++ "= VALUES(" ++ UFieldStr ++ ")"]) + end, [], UniqueFields), + Sql2 ++ " ON DUPLICATE KEY UPDATE " ++ string:join(OnDupSubSql2, ", "); + _ -> Sql2 + end + end, + + RecBatchValues = + lists_utils:split(BatchSize, Records, + fun(NRecords) -> + lists:foldl( + fun(RecItem, AccIn) -> + lists:foldl( + fun(Idx, AccIn2) -> + Val = case Idx of + timestamp -> + datetime_utils:localtime_as_string(); + _ -> element(Idx, RecItem) + end, + [Val | AccIn2] % That's why reverse SqlFields + end, AccIn, UpdateFIndex) + end, [], NRecords) + end), + + case BatchRemainSize of + 0 -> + % 当保存条数整除1000时,lists RecBatchValues最后的元素为[],要删除掉 + [{Batch1Sql, FRecBatchValues} || FRecBatchValues <- lists:delete([], RecBatchValues)]; + + _ -> + {Batch1Values, Batch2Values} = lists:split(BatchCount, RecBatchValues), + case Batch1Sql of + undefined -> [{Batch2Sql, lists:merge(Batch2Values)}]; + _ -> + lists:merge([{Batch1Sql, FBatch1Values} || FBatch1Values <- Batch1Values], + [{Batch2Sql, lists:merge(Batch2Values)}]) + end + end. + +generate_update_sql(Table, UpdateFields, UpdateVals, {FieldPK, PKVal}) -> + SqlHead = "UPDATE " ++ type_utils:any_to_list(Table), + SqlTail = "WHERE " ++ FieldPK ++ " = ?", + SqlSet = string:join([ F ++ " = ?" || F <- UpdateFields], ","), + Sql = string:join([SqlHead, "SET ", SqlSet, SqlTail], " "), + + {Sql, lists:append(UpdateVals, [PKVal])}. + +merge_rec(Rec1, Rec2, Fields, Options) -> + IgnoreNil = proplists:get_value(ignore_nil, Options, false), + [_, RecChanged, MergedRec] = + lists:foldl(fun(Field, [Index, Changed, RecAcc]) -> + Val1 = element(Index + 1, Rec1), + Val2 = element(Index + 1, Rec2), + case {Field, Changed, IgnoreNil, Val1 =:= Val2, Val2} of + {id, _, _, _, _} -> + [Index + 1, Changed, RecAcc]; + {_, false, true, false, undefined} -> + [Index + 1, true, RecAcc]; + {_, false, true, false, _} -> + [Index + 1, true, setelement(Index + 1, RecAcc, Val2)]; + {_, false, _, true, _} -> + [Index + 1, false, RecAcc]; + {_, true, true, true, _} -> + [Index + 1, true, RecAcc]; + {_, _, false, false, _} -> + [Index + 1, true, setelement(Index + 1, RecAcc, Val2)]; + {_, true, false, true, _} -> + [Index + 1, true, RecAcc] + end + end, [1, false, Rec1], Fields), + case RecChanged of + true -> {changed, MergedRec}; + _ -> {unchanged, MergedRec} + end. diff --git a/src/emysql_sup.erl b/src/emysql_sup.erl index 096abc68..7c3a7d5e 100644 --- a/src/emysql_sup.erl +++ b/src/emysql_sup.erl @@ -32,7 +32,12 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init(_) -> - {ok, {{one_for_one, 10, 10}, [ - {emysql_statements, {emysql_statements, start_link, []}, permanent, 5000, worker, [emysql_statements]}, - {emysql_conn_mgr, {emysql_conn_mgr, start_link, []}, permanent, 5000, worker, [emysql_conn_mgr]} + {ok, {{one_for_one, 10, 10}, + [ + {emysql_statements, + {emysql_statements, start_link, []}, permanent, 5000, worker, [emysql_statements]}, + {emysql_conn_mgr, + {emysql_conn_mgr, start_link, []}, permanent, 5000, worker, [emysql_conn_mgr]}, + {execute_trace, + {execute_trace, start_link, []}, permanent, 5000, worker, [execute_trace]} ]}}. diff --git a/src/execute_trace.erl b/src/execute_trace.erl new file mode 100644 index 00000000..1e4e21e7 --- /dev/null +++ b/src/execute_trace.erl @@ -0,0 +1,250 @@ +%%%------------------------------------------------------------------- +%%% @author Jack Tang +%%% @copyright (C) 2014, Jack Tang +%%% @doc +%%% +%%% @end +%%% Created : 29 Apr 2014 by Jack Tang +%%%------------------------------------------------------------------- +-module(execute_trace). + +-behaviour(gen_server). + +%% API +-export([prepare/2, + unprepare/1, + execute/1, + execute/2, + begin_transaction/0, + commit_transaction/0, + rollback_transaction/0]). + +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { stmts }). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +prepare(Name, Statement) -> + gen_server:cast(?SERVER, {prepare, type_utils:any_to_binary(Name), Statement}). + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +unprepare(Name) -> + gen_server:cast(?SERVER, {unprepare, type_utils:any_to_binary(Name)}). + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +execute(StmtName) -> + execute(StmtName, []). +execute(raw_query, Query) -> + gen_server:cast(?SERVER, {raw_query, Query}); +execute(StmtName, Params) -> + gen_server:cast(?SERVER, {execute, type_utils:any_to_binary(StmtName), Params}). + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +begin_transaction() -> + gen_server:cast(?SERVER, begin_transaction). + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +commit_transaction() -> + gen_server:cast(?SERVER, commit). + + +%%-------------------------------------------------------------------- +%% @doc +%% +%% @spec +%% @end +%%-------------------------------------------------------------------- +rollback_transaction() -> + gen_server:cast(?SERVER, rollback). + + + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initiates the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{ stmts = orddict:new() }}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast({prepare, Name, Statement}, #state{stmts = Stmts}=State) -> + NStmts = case orddict:find(Name, Stmts) of + error -> + orddict:store(Name, [Statement, 1], Stmts); + {ok, [Statement, Ref]} -> + orddict:store(Name, [Statement, Ref + 1], Stmts); + {ok, [TheStatement, _]} -> + lager:error("Prepare statement (~p) conflicted: ~p <-> ~p", + [Name, TheStatement, Statement]), + Stmts + end, + {noreply, State#state{stmts = NStmts}}; + +handle_cast({unprepare, Name}, #state{stmts = Stmts} = State) -> + NStmts = case orddict:find(Name, Stmts) of + error -> Stmts; + {ok, [_Statement, 1]} -> + orddict:erase(Name, Stmts); + {ok, [Statement, Ref]} -> + orddict:store(Name, [Statement, Ref - 1], Stmts) + end, + {noreply, State#state{stmts = NStmts}}; + +handle_cast({execute, Name, Params}, #state{stmts = Stmts}=State) -> + case orddict:find(Name, Stmts) of + error -> + lager:warning("No statement (~p) found", [Name]); + {ok, [Statement, _Ref]} -> + lager:debug("Execute ~p: ~p", [Name, Statement]), + lager:debug("Params ~p: ~p", [Name, Params]) + end, + {noreply, State}; + +handle_cast({raw_query, Query}, #state{}=State) -> + lager:debug("Query: ~p", [Query]), + {noreply, State}; + + +handle_cast(begin_transaction, State) -> + lager:debug("BEGIN TRANSACTION"), + {noreply, State}; + +handle_cast(commit_transaction, State) -> + lager:debug("COMMIT"), + {noreply, State}; + +handle_cast(rollback_transaction, State) -> + lager:debug("ROLLBACK"), + {noreply, State}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================