Skip to content

Commit

Permalink
Merge pull request #58 from belltoy/gun2.0
Browse files Browse the repository at this point in the history
Upgrade to Gun2.0
  • Loading branch information
zhongwencool authored Mar 21, 2023
2 parents 0b14f17 + 515327d commit 0cdd103
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 103 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,29 @@ Gen proto and client file
```erlang
rebar3 etcd gen
```

Migration from eetcd 0.3.x to 0.4.x
-----

eetcd 0.4.x now dependents on Gun 2.0, which introduced some breaking changes,
and propagate to eetcd.

The prior transport options are split into `tcp_opts` and `tls_opts` and moved
inside the new `eetcd:opts()` parameter. As a result, the functions `eetcd:open/4,5`
have been replaced with `eetcd:open/2,3`.

Likewise, the transport options for `eetcd_maintenance` APIs are split into
`tcp_opts` and `tls_opts` as well.

- The function `eetcd:open/4,5` has been replaced with `eetcd:open/3`.
- The function `eetcd_maintenance:defragment/3` has been replaced with `eetcd_maintenance:defragment/2`.
- The function `eetcd_maintenance:status/3` has been replaced with `eetcd_maintenance:status/2`.
- The function `eetcd_maintenance:has_kv/4` has been replaced with `eetcd_maintenance:has_kv/3`.

New options `{domain_lookup_timeout, Interval}` and `{tls_handshake_timeout, Interval}`
have been added for `eetcd:open/3`. Alone with the prior `{connect_timeout, Interval}`,
it allows the underlining Gun library to get separate events when connecting,
the domain lookup, connection and TLS handshakes.
- `tls_opts` Passed to Gun.

Read more details of Gun options in the [Gun 2.0 manual](https://ninenines.eu/docs/en/gun/2.0/manual/gun/).
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{erl_opts, [{i, "./_build/default/plugins/gpb/include"}]}.
{deps, [
{gun, "1.3.3"}
{gun, "2.0.0"}
]}.

{gpb_opts, [
Expand Down
12 changes: 6 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{"1.2.0",
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}]}.
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"2.0.0">>},0}]}.
[
{pkg_hash,[
{<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]},
{<<"cowlib">>, <<"0B9FF9C346629256C42EBE1EEB769A83C6CB771A6EE5960BD110AB0B9B872063">>},
{<<"gun">>, <<"2326BC0FD6D9CF628419708270D6FE8B02B8D002CF992E4165A77D997B1DEFD0">>}]},
{pkg_hash_ext,[
{<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
{<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}]}
{<<"cowlib">>, <<"2B3E9DA0B21C4565751A6D4901C20D1B4CC25CBB7FD50D91D2AB6DD287BC86A9">>},
{<<"gun">>, <<"6613CB7C62930DC8D58263C44DDA72F8556346BA88358FC929DCBC5F76D04569">>}]}
].
2 changes: 1 addition & 1 deletion src/eetcd.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application, eetcd,
[
{description, "ETCD V3 client"},
{vsn, "0.3.6"},
{vsn, "0.4.0"},
{registered, [eetcd_sup, eetcd_conn_sup, eetcd_lease_sup]},
{mod, {eetcd_app, []}},
{applications, [kernel, stdlib, gun]},
Expand Down
63 changes: 34 additions & 29 deletions src/eetcd.erl
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
-module(eetcd).
-include("eetcd.hrl").
%% API
-export([open/2, open/4, open/5, close/1]).
-export([open/2, open/3, close/1]).
-export([info/0]).
-export([new/1, with_timeout/2]).
-export([get_prefix_range_end/1]).
-export_type([opts/0]).

-type opts() :: [ {mode, connect_all | random} |
{transport, tcp | tls | ssl} |
{name, string()} |
{password, string()} |
{auto_sync_interval_ms, non_neg_integer()} |
{retry, non_neg_integer()} |
{retry_timeout, pos_integer()} |
{connect_timeout, timeout()} |
{domain_lookup_timeout, timeout()} |
{tls_handshake_timeout, timeout()} |
{tcp_opts, [gen_tcp:connect_option()]} |
{tls_opts, [ssl:tls_client_option()]}
].

%% @doc Connects to a etcd server on TCP port
%% Port on the host with IP address Address, such as:
%% `open(test,["127.0.0.1:2379","127.0.0.1:2479","127.0.0.1:2579"]).'
-spec open(name(), [string()]) -> {ok, pid()} | {error, any()}.
open(Name, Hosts) ->
open(Name, Hosts, [], tcp, []).

%% @doc Connects to a etcd server.
-spec open(name(),
[string()],
tcp | tls | ssl,
[gen_tcp:connect_option()] | [ssl:connect_option()]) ->
{ok, pid()} | {error, any()}.
open(Name, Hosts, Transport, TransportOpts) ->
open(Name, Hosts, [], Transport, TransportOpts).
open(Name, Hosts, [{transport, tcp}]).

%% @doc Connects to a etcd server.
%% ssl:connect_option() see all options in ssl_api.hrl
%% such as [{certfile, Certfile}, {keyfile, Keyfile}] or [{cert, Cert}, {key, Key}].
%%
%% Default mode is `connect_all', it creates multiple sub-connections (one sub-connection per each endpoint).
%% The balancing policy is round robin.
Expand All @@ -39,6 +43,8 @@ open(Name, Hosts, Transport, TransportOpts) ->
%% When the client receives an error, it randomly picks another normal endpoint.
%%
%% `{connect_timeout, Interval}' is the connection timeout. Defaults to one second (1000).
%% `{domain_lookup_timeout, Interval}' is the domain_lookup_timeout timeout. Defaults to one second (1000).
%% `{tls_handshake_timeout, Interval}' is the tls_handshake_timeout timeout. Defaults to three second (3000).
%% `{retry, Attempts}' is the number of times it will try to reconnect on failure before giving up. Defaults to zero (disabled).
%% `{retry_timeout, Interval}' is the time between retries in milliseconds.
%%
Expand All @@ -49,25 +55,24 @@ open(Name, Hosts, Transport, TransportOpts) ->
%% list via the MemberList API of etcd, and will try to connect any new endpoints if in `connect_all'
%% mode.
%%
%% `[{name, string()},{password, string()}]' generates an authentication token based on a given user name and password.
%% `[{name, string()}, {password, string()}]' generates an authentication token based on a given user name and password.
%%
%% `{tcp_opts, [gen_tcp:connect_option()]}' and `{tls_opts, [ssl:tls_client_option()]}' are the
%% options for gun:open/3 in Gun 2.0.
%%
%% See all TCP options in {@link gen_tcp} module.
%%
%% See all TLS client options in {@link ssl} module,
%% such as `[{certfile, Certfile}, {keyfile, Keyfile}] or [{cert, Cert}, {key, Key}]'.
%%
%% Read more details of gun options in the
%% [https://ninenines.eu/docs/en/gun/2.0/manual/gun/ Gun 2.0 manual].
%%
%% You can use `eetcd:info/0' to see the internal connection status.
-spec open(name(),
[string()],
[
{mode, connect_all | random}
| {name, string()}
| {password, string()}
| {retry, non_neg_integer()}
| {retry_timeout, pos_integer()}
| {connect_timeout, timeout()}
],
tcp | tls | ssl,
[gen_tcp:connect_option()] | [ssl:connect_option()]) ->
{ok, pid()} | {error, any()}.
open(Name, Hosts, Options, Transport, TransportOpts) ->
-spec open(name(), [string()], opts()) -> {ok, pid()} | {error, any()}.
open(Name, Hosts, Options) ->
Cluster = [begin [IP, Port] = string:tokens(Host, ":"), {IP, list_to_integer(Port)} end || Host <- Hosts],
eetcd_conn_sup:start_child([{Name, Cluster, Options, Transport, TransportOpts}]).
eetcd_conn_sup:start_child([{Name, Cluster, Options}]).

%% @doc close connections with etcd server.
-spec close(name()) -> ok | {error, eetcd_conn_unavailable}.
Expand Down
9 changes: 6 additions & 3 deletions src/eetcd_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ flush_token(Gun, Headers) ->
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
init({Name, Hosts, Options, Transport, TransportOpts}) ->
init({Name, Hosts, Options}) ->
erlang:process_flag(trap_exit, true),
GunOpts = #{protocols => [http2],
connect_timeout => proplists:get_value(connect_timeout, Options, 1000),
domain_lookup_timeout => proplists:get_value(domain_lookup_timeout, Options, 1000),
tls_handshake_timeout => proplists:get_value(tls_handshake_timeout, Options, 3000),
http2_opts => #{keepalive => 45000},
retry => proplists:get_value(retry, Options, 0),
retry_timeout => proplists:get_value(retry_timeout, Options, 5000),
transport => Transport,
transport_opts => TransportOpts
transport => proplists:get_value(transport, Options, tcp),
tcp_opts => proplists:get_value(tcp_opts, Options, []),
tls_opts => proplists:get_value(tls_opts, Options, [])
},
AutoSyncInterval = proplists:get_value(auto_sync_interval_ms, Options, 0),
Data0 = #{
Expand Down
24 changes: 12 additions & 12 deletions src/eetcd_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-include("eetcd.hrl").
%% API
-export([alarm_list/1, alarm_disarm/3, alarm_disarm_all/1]).
-export([defragment/3, status/3, hash_kv/4, move_leader/2]).
-export([defragment/2, status/2, hash_kv/3, move_leader/2]).

%%% @doc AlarmList gets all active alarms.
-spec alarm_list(name()|context()) ->
Expand Down Expand Up @@ -46,30 +46,30 @@ alarm_disarm_all(ConnName) ->
%%% Defragment is an expensive operation. User should avoid defragmenting multiple members at the same time.
%%% To defragment multiple members in the cluster, user need to call defragment multiple
%%% times with different endpoints.
-spec defragment(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()]) ->
-spec defragment(iodata(), eetcd:opts()) ->
{ok,router_pb:'Etcd.DefragmentResponse'()}|{error,eetcd_error()}.
defragment(Endpoint, Transport, TransportOpts) ->
defragment(Endpoint, Options) ->
Fun = fun(Conn) -> eetcd_maintenance_gen:defragment(eetcd:new(Conn)) end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% @doc Status gets the status of the endpoint.
-spec status(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()]) ->
-spec status(iodata(), eetcd:opts()) ->
{ok,router_pb:'Etcd.StatusResponse'()}|{error,eetcd_error()}.
status(Endpoint, Transport, TransportOpts) ->
status(Endpoint, Options) ->
Fun = fun(Conn) -> eetcd_maintenance_gen:status(eetcd:new(Conn)) end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% @doc HashKV returns a hash of the KV state at the time of the RPC.
%%% If revision is zero, the hash is computed on all keys. If the revision
%%% is non-zero, the hash is computed on all keys at or below the given revision.
-spec hash_kv(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()], pos_integer()) ->
-spec hash_kv(iodata(), eetcd:opts(), pos_integer()) ->
{ok,router_pb:'Etcd.HashKVResponse'()}|{error,eetcd_error()}.
hash_kv(Endpoint, Transport, TransportOpts, Rev) ->
hash_kv(Endpoint, Options, Rev) ->
Fun = fun(Conn) ->
Context = maps:put(revision, Rev, eetcd:new(Conn)),
eetcd_maintenance_gen:hash_kv(Context)
end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% Snapshot provides a reader for a point-in-time snapshot of etcd.
%%% If the context "ctx" is canceled or timed out, reading from returned
Expand All @@ -89,10 +89,10 @@ move_leader(Context, TargetID) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
dial(Endpoint, Transport, TransportOpts, Fun) ->
dial(Endpoint, Options, Fun) ->
Conn = make_ref(),
try
case eetcd:open(Conn, [Endpoint], Transport, TransportOpts) of
case eetcd:open(Conn, [Endpoint], Options) of
{ok, _Pid} -> Fun(Conn);
Err ->
Err
Expand Down
64 changes: 24 additions & 40 deletions src/eetcd_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
new(Name, Path) ->
case eetcd_conn:round_robin_select(Name) of
{ok, Pid, Headers} ->
Ref = gun:request(Pid, <<"POST">>, Path, Headers, <<>>),
Ref = gun:headers(Pid, <<"POST">>, Path, Headers),
{ok, Pid, Ref};
Err -> Err
end.
Expand Down Expand Up @@ -82,7 +82,7 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
Res =
case await(Pid, StreamRef, Timeout, MRef) of
{response, nofin, 200, _Headers} ->
case await_body(Pid, StreamRef, Timeout, MRef, <<>>) of
case await_body(Pid, StreamRef, Timeout, MRef) of
{ok, ResBody, _Trailers} ->
{ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType),
{ok, Resp};
Expand All @@ -96,7 +96,7 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
StreamRef1 = gun:request(Pid, <<"POST">>, Path, NewHeaders, EncodeBody),
case await(Pid, StreamRef1, Timeout, MRef) of
{response, nofin, 200, _Headers} ->
case await_body(Pid, StreamRef1, Timeout, MRef, <<>>) of
case await_body(Pid, StreamRef1, Timeout, MRef) of
{ok, ResBody, _Trailers} ->
{ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType),
{ok, Resp};
Expand All @@ -112,43 +112,27 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
erlang:demonitor(MRef, [flush]),
Res.

%% `gun:await/2,3,4`, `gun:await_body/2,3,4` and `gun:await_up/1,2,3` don't distinguish the error types until v2.0.0.
%% They can be a timeout, a connection error, a stream error or a down error (when the Gun process exited while waiting).
%% so we copy some code from gun v2.0.0 to replace `gun:await/4`
%% TODO remove this when upgrade gun to v2.0.0
await(ServerPid, StreamRef, Timeout, MRef) ->
receive
{gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
{response, IsFin, Status, Headers};
{gun_data, ServerPid, StreamRef, IsFin, Data} ->
{data, IsFin, Data};
{gun_error, ServerPid, StreamRef, Reason} ->
{error, {gun_stream_error, Reason}};
{gun_error, ServerPid, Reason} ->
{error, {gun_conn_error, Reason}};
{'DOWN', MRef, process, ServerPid, Reason} ->
{error, {gun_down, Reason}}
after Timeout ->
{error, timeout}
case gun:await(ServerPid, StreamRef, Timeout, MRef) of
{response, _, _, _}=Resp ->
Resp;
{data, _, _}=Resp ->
Resp;
{error, _} = Resp ->
transfer_error(Resp);
Other ->
?LOG_INFO("eetcd_await_resp_other ~p", [Other]),
await(ServerPid, StreamRef, Timeout, MRef)
end.

await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
receive
{gun_data, ServerPid, StreamRef, nofin, Data} ->
await_body(ServerPid, StreamRef, Timeout, MRef,
<<Acc/binary, Data/binary>>);
{gun_data, ServerPid, StreamRef, fin, Data} ->
{ok, <<Acc/binary, Data/binary>>};
%% It's OK to return trailers here because the client specifically requested them
%% Trailers are grpc_status and grpc_message headers
{gun_trailers, ServerPid, StreamRef, Trailers} ->
{ok, Acc, Trailers};
{gun_error, ServerPid, StreamRef, Reason} ->
{error, {gun_stream_error, Reason}};
{gun_error, ServerPid, Reason} ->
{error, {gun_conn_error, Reason}};
{'DOWN', MRef, process, ServerPid, Reason} ->
{error, {gun_down, Reason}}
after Timeout ->
{error, timeout}
end.
await_body(ServerPid, StreamRef, Timeout, MRef) ->
transfer_error(gun:await_body(ServerPid, StreamRef, Timeout, MRef)).

transfer_error({error, {stream_error, Reason}}) ->
{error, {gun_stream_error, Reason}};
transfer_error({error, {connection_error, Reason}}) ->
{error, {gun_conn_error, Reason}};
transfer_error({error, {down, Reason}}) ->
{error, {gun_down, Reason}};
transfer_error(Other) ->
Other.
16 changes: 8 additions & 8 deletions test/eetcd_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ auth(_Config) ->
user(Name) ->
{ok, #{}} = eetcd_auth:auth_enable(Name),
%% list
{ok, AuthRoleListResponse} = eetcd_auth:user_list(Name),
{ok, _AuthRoleListResponse} = eetcd_auth:user_list(Name),
%% add
{ok, AuthRoleAddResponse} = eetcd_auth:user_add(Name, <<"name1">>, <<"12345">>),
{ok, _AuthRoleAddResponse} = eetcd_auth:user_add(Name, <<"name1">>, <<"12345">>),
%% get
{ok, AuthUserGetResponse} = eetcd_auth:user_get(Name, <<"name1">>),
{ok, _AuthUserGetResponse} = eetcd_auth:user_get(Name, <<"name1">>),
%% change password
{ok, AuthUserChangePasswordResponse} = eetcd_auth:user_change_password(Name, <<"name1">>,<<"54321">>),
{ok, _AuthUserChangePasswordResponse} = eetcd_auth:user_change_password(Name, <<"name1">>,<<"54321">>),
%% get
{ok, AuthUserGetResponse1} = eetcd_auth:user_get(Name, <<"name1">>),
{ok, _AuthUserGetResponse1} = eetcd_auth:user_get(Name, <<"name1">>),
%% delete
{ok, AuthUserDeleteResponse} = eetcd_auth:user_delete(Name, <<"name1">>),
{ok, _AuthUserDeleteResponse} = eetcd_auth:user_delete(Name, <<"name1">>),
%% list
{ok, AuthRoleListResponse2} = eetcd_auth:user_list(Name),
{ok, AuthDisableResponse2} = eetcd_auth:auth_disable(Name),
{ok, _AuthRoleListResponse2} = eetcd_auth:user_list(Name),
{ok, _AuthDisableResponse2} = eetcd_auth:auth_disable(Name),
ok.

2 changes: 1 addition & 1 deletion test/eetcd_lease_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ groups() ->
init_per_suite(Config) ->
application:ensure_all_started(eetcd),
{ok, _Pid} = eetcd:open(?Name, ["127.0.0.1:2379", "127.0.0.1:2479", "127.0.0.1:2579"],
[{mode, random}], tcp, []),
[{mode, random}, {transport, tcp}]),
Config.

init_per_testcase(_TestCase, Config) ->
Expand Down
2 changes: 1 addition & 1 deletion test/eetcd_lock_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ groups() ->
init_per_suite(Config) ->
application:ensure_all_started(eetcd),
{ok, _Pid} = eetcd:open(?Name, ["127.0.0.1:2379", "127.0.0.1:2479", "127.0.0.1:2579"],
[{mode, random}], tcp, []),
[{mode, random}, {transport, tcp}]),
Config.

init_per_testcase(_TestCase, Config) ->
Expand Down
Loading

0 comments on commit 0cdd103

Please sign in to comment.