diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84c4bde..c74a588 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,6 +75,6 @@ jobs: otp-version: ${{matrix.otp}} rebar3-version: '3.20.0' version-type: strict - - run: rebar3 do ct -v -c, cover -v + - run: rebar3 do xref, dialyzer, gradualizer, ct -v -c, cover -v env: TERM: xterm-color diff --git a/include/eetcd.hrl b/include/eetcd.hrl index 76db3d8..e424fb6 100644 --- a/include/eetcd.hrl +++ b/include/eetcd.hrl @@ -10,6 +10,7 @@ -type value() :: iodata(). -type context() :: map(). -type name() :: atom() | reference(). +-type new_context() :: atom() | reference() | context(). -type eetcd_error() :: timeout|{grpc_error,grpc_status()}|{gun_down,any()}|{gun_conn_error,any()}|{gun_stream_error,any()}|eetcd_conn_unavailable. -type grpc_status() :: #{'grpc-status' => integer(), 'grpc-message' => binary()}. diff --git a/rebar.config b/rebar.config index 55a9715..b5c015b 100644 --- a/rebar.config +++ b/rebar.config @@ -28,6 +28,20 @@ {rebar3_eetcd_plugin, "0.3.2"} ]}. +{project_plugins, [ + {gradualizer, {git, "https://github.com/josefs/Gradualizer.git", {branch, "master"}}} +]}. + +{gradualizer_opts, [ + {include, ["include"]}, + + %% Use exclude_modules instead of exclude, See: + %% https://github.com/josefs/Gradualizer/issues/483 + {exclude_modules, [ + ]}, + {color, always} +]}. + {xref_checks, [ undefined_function_calls, undefined_functions, @@ -40,6 +54,7 @@ {xref_ignores, [ auth_pb, gogo_pb, + health_pb, kv_pb, router_pb ]}. diff --git a/src/eetcd_auth.erl b/src/eetcd_auth.erl index c276e80..088e353 100644 --- a/src/eetcd_auth.erl +++ b/src/eetcd_auth.erl @@ -26,7 +26,7 @@ %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec auth_enable(name()|context()) -> +-spec auth_enable(new_context()) -> {ok,router_pb:'Etcd.AuthEnableResponse'()}|{error,eetcd_error()}. auth_enable(Context) -> eetcd_auth_gen:auth_enable(new(Context)). @@ -45,7 +45,7 @@ auth_enable(Context) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec auth_disable(name()|context()) -> +-spec auth_disable(new_context()) -> {ok,router_pb:'Etcd.AuthDisableResponse'()}|{error,eetcd_error()}. auth_disable(Context) -> eetcd_auth_gen:auth_disable(new(Context)). @@ -64,7 +64,7 @@ auth_disable(Context) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_add(name()|context(), iodata(), iodata()) -> +-spec user_add(new_context(), iodata(), iodata()) -> {ok,router_pb:'Etcd.AuthUserAddResponse'()}|{error,eetcd_error()}. user_add(Context, Name, Password) -> C1 = new(Context), @@ -86,10 +86,10 @@ user_add(Context, Name, Password) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_add(name()|context(), iodata()) -> +-spec user_add(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthUserAddResponse'()}|{error,eetcd_error()}. user_add(Context, Name) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), C3 = maps:put(options, #{no_password => true}, C2), eetcd_auth_gen:user_add(C3). @@ -108,10 +108,10 @@ user_add(Context, Name) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_delete(name()|context(), iodata()) -> +-spec user_delete(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthUserDeleteResponse'()}|{error,eetcd_error()}. user_delete(Context, Name) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), eetcd_auth_gen:user_delete(C2). @@ -129,10 +129,10 @@ user_delete(Context, Name) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_change_password(name()|context(), iodata(), iodata()) -> +-spec user_change_password(new_context(), iodata(), iodata()) -> {ok,router_pb:'Etcd.AuthUserChangePasswordResponse'()}|{error,eetcd_error()}. user_change_password(Context, Name, Password) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), C3 = maps:put(password, Password, C2), eetcd_auth_gen:user_change_password(C3). @@ -151,10 +151,10 @@ user_change_password(Context, Name, Password) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_grant_role(name()|context(), iodata(), iodata()) -> +-spec user_grant_role(new_context(), iodata(), iodata()) -> {ok,router_pb:'Etcd.AuthUserGrantRoleResponse'()}|{error,eetcd_error()}. user_grant_role(Context, User, Role) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(user, User, C1), C3 = maps:put(role, Role, C2), eetcd_auth_gen:user_grant_role(C3). @@ -173,10 +173,10 @@ user_grant_role(Context, User, Role) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_get(name()|context(), iodata()) -> +-spec user_get(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthUserGetResponse'()}|{error,eetcd_error()}. user_get(Context, Name) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), eetcd_auth_gen:user_get(C2). @@ -194,10 +194,10 @@ user_get(Context, Name) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_list(name()|context()) -> +-spec user_list(new_context()) -> {ok,router_pb:'Etcd.AuthUserListResponse'()}|{error,eetcd_error()}. user_list(Context) -> - eetcd_auth_gen:user_list(eetcd:new(Context)). + eetcd_auth_gen:user_list(new(Context)). %%% @doc UserRevokeRole revokes a role of a user. %%%
@@ -213,10 +213,10 @@ user_list(Context) -> %%%
%%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec user_revoke_role(name()|context(), iodata(), iodata()) -> +-spec user_revoke_role(new_context(), iodata(), iodata()) -> {ok,router_pb:'Etcd.AuthUserRevokeRoleResponse'()}|{error,eetcd_error()}. user_revoke_role(Context, Name, Role) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), C3 = maps:put(role, Role, C2), eetcd_auth_gen:user_revoke_role(C3). @@ -235,10 +235,10 @@ user_revoke_role(Context, Name, Role) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_add(name()|context(), iodata()) -> +-spec role_add(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthRoleAddResponse'()}|{error,eetcd_error()}. role_add(Context, Name) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), eetcd_auth_gen:role_add(C2). @@ -257,10 +257,10 @@ role_add(Context, Name) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_grant_permission(name()|context(), iodata(), iodata(), iodata(), 'READ' | 'WRITE' | 'READWRITE') -> +-spec role_grant_permission(new_context(), iodata(), iodata(), iodata(), 'READ' | 'WRITE' | 'READWRITE') -> {ok,router_pb:'Etcd.AuthRoleGrantPermissionResponse'()}|{error,eetcd_error()}. role_grant_permission(Context, Name, Key, RangeEnd, PermType) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(name, Name, C1), Permission = #{key => Key, range_end => RangeEnd, permType => PermType}, C3 = maps:put(perm, Permission, C2), @@ -280,10 +280,10 @@ role_grant_permission(Context, Name, Key, RangeEnd, PermType) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_get(name()|context(), iodata()) -> +-spec role_get(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthRoleGetResponse'()}|{error,eetcd_error()}. role_get(Context, Role) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(role, Role, C1), eetcd_auth_gen:role_get(C2). @@ -301,10 +301,10 @@ role_get(Context, Role) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_list(name()|context()) -> +-spec role_list(new_context()) -> {ok,router_pb:'Etcd.AuthRoleListResponse'()}|{error,eetcd_error()}. role_list(Context) -> - eetcd_auth_gen:role_list(eetcd:new(Context)). + eetcd_auth_gen:role_list(new(Context)). %%% @doc RoleRevokePermission revokes a permission from a role. %%%
@@ -320,10 +320,10 @@ role_list(Context) -> %%%
%%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_revoke_permission(name()|context(), iodata(), iodata(), iodata()) -> +-spec role_revoke_permission(new_context(), iodata(), iodata(), iodata()) -> {ok,router_pb:'Etcd.AuthRoleRevokePermissionResponse'()}|{error,eetcd_error()}. role_revoke_permission(Context, Role, Key, RangeEnd) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(role, Role, C1), C3 = maps:put(key, Key, C2), C4 = maps:put(range_end, RangeEnd, C3), @@ -343,15 +343,15 @@ role_revoke_permission(Context, Role, Key, RangeEnd) -> %%% %%% {@link eetcd_auth:with_timeout/2} {@link eetcd_auth:new/1} %%% @end --spec role_delete(name()|context(), iodata()) -> +-spec role_delete(new_context(), iodata()) -> {ok,router_pb:'Etcd.AuthRoleDeleteResponse'()}|{error,eetcd_error()}. role_delete(Context, Role) -> - C1 = eetcd:new(Context), + C1 = new(Context), C2 = maps:put(role, Role, C1), eetcd_auth_gen:role_delete(C2). %%% @doc Create context for request. --spec new(atom()|reference()) -> context(). +-spec new(new_context()) -> context(). new(Context) -> eetcd:new(Context). %% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, diff --git a/src/eetcd_cluster.erl b/src/eetcd_cluster.erl index c0dcdb8..74d94cc 100644 --- a/src/eetcd_cluster.erl +++ b/src/eetcd_cluster.erl @@ -23,7 +23,7 @@ %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_list(context()|name()) -> +-spec member_list(new_context()) -> {ok,router_pb:'Etcd.MemberListResponse'()}|{error,eetcd_error()}. member_list(Context) -> eetcd_cluster_gen:member_list(new(Context)). @@ -41,9 +41,9 @@ member_list(Context) -> eetcd_cluster_gen:member_list(new(Context)). %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_add(context()|name(), PeerURLs) -> - {ok,router_pb:'Etcd.MemberListResponse'()} - | {error, {'grpc_error', non_neg_integer(), binary()}} | {error, term()} +-spec member_add(new_context(), PeerURLs) -> + {ok,router_pb:'Etcd.MemberAddResponse'()} + | {error, eetcd:eetcd_error()} when PeerURLs :: [iodata()]. member_add(Context, PeerAddrs) when is_list(PeerAddrs) -> C1 = new(Context), @@ -65,8 +65,8 @@ member_add(Context, PeerAddrs) when is_list(PeerAddrs) -> %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_add_as_learner(context()|name(), PeerURLs) -> - {ok,router_pb:'Etcd.MemberListResponse'()} +-spec member_add_as_learner(new_context(), PeerURLs) -> + {ok,router_pb:'Etcd.MemberAddResponse'()} | {error, {'grpc_error', non_neg_integer(), binary()}} | {error, term()} when PeerURLs :: [iodata()]. member_add_as_learner(Context, PeerAddrs) when is_list(PeerAddrs) -> @@ -89,7 +89,7 @@ member_add_as_learner(Context, PeerAddrs) when is_list(PeerAddrs) -> %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_remove(context()|name(), pos_integer()) -> +-spec member_remove(new_context(), pos_integer()) -> {ok,router_pb:'Etcd.MemberRemoveResponse'()}|{error,eetcd_error()}. member_remove(Context, Id) when is_integer(Id) -> C1 = new(Context), @@ -110,7 +110,7 @@ member_remove(Context, Id) when is_integer(Id) -> %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_update(context()|name(), pos_integer(), [list()]) -> +-spec member_update(new_context(), pos_integer(), [list()]) -> {ok,router_pb:'Etcd.MemberUpdateResponse'()}|{error,eetcd_error()}. member_update(Context, Id, PeerAddrs) when is_integer(Id) andalso is_list(PeerAddrs) -> @@ -133,7 +133,7 @@ member_update(Context, Id, PeerAddrs) %%% %%% {@link eetcd_cluster:with_timeout/2} {@link eetcd_cluster:new/1} %%% @end --spec member_promote(context()|name(), pos_integer()) -> +-spec member_promote(new_context(), pos_integer()) -> {ok,router_pb:'Etcd.MemberPromoteResponse'()}|{error,eetcd_error()}. member_promote(Context, Id) when is_integer(Id) -> C1 = new(Context), @@ -141,7 +141,7 @@ member_promote(Context, Id) when is_integer(Id) -> eetcd_cluster_gen:member_promote(C2). %%% @doc Create context for request. --spec new(atom()|reference()) -> context(). +-spec new(new_context()) -> context(). new(Context) -> eetcd:new(Context). %% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, diff --git a/src/eetcd_conn.erl b/src/eetcd_conn.erl index 8251bbd..09f853f 100644 --- a/src/eetcd_conn.erl +++ b/src/eetcd_conn.erl @@ -48,10 +48,16 @@ round_robin_select(Name) -> [Uniq] -> Uniq; Lists -> Length = erlang:length(Lists), + %% Incr {_, Incr, Lenght, 1} is not negative (>= 0) and the result(Index) would be + %% greater than (>) Threshold(Length). + %% And the Length is greater than (>) 1 according to the case clauses. Index = ets:update_counter(?ETCD_CONNS, Name, {1, 1, Length, 1}, {1, Name}), - case lists:nth(Index, Lists) of - {Gun, undefined} -> {ok, Gun, ?HEADERS}; - {Gun, Token} -> {ok, Gun, [{<<"authorization">>, Token} | ?HEADERS]} + case Index of + I when is_integer(I), I >=1 -> + case lists:nth(I, Lists) of + {Gun, undefined} -> {ok, Gun, ?HEADERS}; + {Gun, Token} -> {ok, Gun, [{<<"authorization">>, Token} | ?HEADERS]} + end end end. @@ -119,10 +125,14 @@ init({Name, Hosts, Options}) -> random -> AutoSyncInterval > 0 andalso ?LOG_WARNING("~s run under random mode, disabled auto_sync member list", [Name]), - Length = erlang:length(Hosts), - Data1 = Data#{endpoints => shuffle(Hosts), mode => random}, - Index = rand:uniform(Length), - connect_one(Index, 2 * Length, Data1, Length) + case erlang:length(Hosts) of + Length when Length >= 1 -> + Data1 = Data#{endpoints => shuffle(Hosts), mode => random}, + Index = rand:uniform(Length), + connect_one(Index, 2 * Length, Data1, Length); + _ -> + {stop, eetcd_conn_unavailable} + end end. callback_mode() -> [handle_event_function]. @@ -458,6 +468,31 @@ handle_do_sync(Data = #{sync_ref := SyncRef}) -> reconnect_conns(NewData1) end. +filtermap_url(Url, Transport) -> + ParseResult = uri_string:parse(Url), + do_filtermap_url(ParseResult, Transport, Url). + +do_filtermap_url(#{host := Host, port := Port, scheme := Scheme}, Transport, Url) + when erlang:is_binary(Host) -> + case {erlang:bit_size(Host) > 0, Scheme, Transport} of + {true, <<"http">>, tcp} -> + {true, {erlang:binary_to_list(Host), Port}}; + {true, <<"https">>, tls} -> + {true, {erlang:binary_to_list(Host), Port}}; + {true, <<"https">>, ssl} -> + {true, {erlang:binary_to_list(Host), Port}}; + _ -> + %% Note: because of the design of eetcd_conn, we need + %% the member lists' URL use the same transport + %% options to the active connections. + ?LOG_WARNING("Not matched schemes from member list ~s", + [Url]), + false + end; +do_filtermap_url(_, _, Url) -> + ?LOG_WARNING("Url ~s from member list is not a valid etcd url", [Url]), + false. + do_sync_memberlist(#{mode := random} = Data) -> Data; do_sync_memberlist(#{active_conns := [], name := Name} = Data) -> @@ -491,31 +526,7 @@ do_sync_memberlist(#{name := Name, ClientUrls1 = lists:flatten(ClientUrls0), ClientUrls2 = lists:filtermap( - fun(Url) -> - case uri_string:parse(Url) of - #{host := Host, port := Port, scheme := Scheme} - when erlang:bit_size(Host) > 0 -> - case {Scheme, Transport} of - {<<"http">>, tcp} -> - {true, {erlang:binary_to_list(Host), Port}}; - {<<"https">>, tls} -> - {true, {erlang:binary_to_list(Host), Port}}; - {<<"https">>, ssl} -> - {true, {erlang:binary_to_list(Host), Port}}; - _ -> - %% Note: because of the design of eetcd_conn, we need - %% the member lists' URL use the same transport - %% options to the active connections. - ?LOG_WARNING("Not matched schemes from member list ~s", - [Url]), - false - end; - _I -> - ?LOG_WARNING("Url ~s from member list is not a valid etcd url", - [Url]), - false - end - end, ClientUrls1), + fun(Url) -> filtermap_url(Url, Transport) end, ClientUrls1), ClientUrls = lists:usort(ClientUrls2), diff --git a/src/eetcd_election.erl b/src/eetcd_election.erl index 89e0047..69bb07c 100644 --- a/src/eetcd_election.erl +++ b/src/eetcd_election.erl @@ -8,11 +8,22 @@ -export([observe/3, observe_stream/2]). -export_type([campaign_ctx/0, observe_ctx/0]). --type observe_ctx() :: #{leader => map(), http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}. --type campaign_ctx() :: #{campaign => map()|'waiting_campaign_response', http2_pid => pid(), monitor_ref => reference(), stream_ref => reference()}. +-type leader_key() :: router_pb:'Etcd.LeaderKey'(). +-type observe_ctx() :: #{ + leader => map() | election_no_leader, + http2_pid => pid(), + monitor_ref => reference(), + stream_ref => gun:stream_ref() +}. +-type campaign_ctx() :: #{ + campaign := leader_key() | waiting_campaign_response, + http2_pid => pid(), + monitor_ref => reference(), + stream_ref => gun:stream_ref() +}. %%% @doc Creates a blank context for a request. --spec new(name()|context()) -> context(). +-spec new(new_context()) -> context(). new(Ctx) -> eetcd:new(Ctx). %% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, @@ -29,7 +40,7 @@ with_name(Ctx, Name) -> %%% @doc lease is the ID of the lease attached to leadership of the election. If the %% lease expires or is revoked before resigning leadership, then the %% leadership is transferred to the next campaigner, if any. --spec with_lease(context(), LeaseID :: pos_integer()) -> context(). +-spec with_lease(context(), LeaseID :: integer()) -> context(). with_lease(Ctx, LeaseID) -> maps:put(lease, LeaseID, Ctx). @@ -44,9 +55,9 @@ with_value(Ctx, Value) -> %%% key is an opaque key representing the ownership of the election. If the key is deleted, then leadership is lost. %%% rev is the creation revision of the key. It can be used to test for ownership of an election during transactions by testing the key's creation revision matches rev. %%% lease is the lease ID of the election leader. --spec with_leader(context(), Leader :: binary()) -> context(). -with_leader(Ctx, Leader) -> - maps:put(leader, Leader, Ctx). +-spec with_leader(context(), LeaderKey :: leader_key()) -> context(). +with_leader(Ctx, LeaderKey) -> + maps:put(leader, LeaderKey, Ctx). %%% @doc %%% Campaign waits to acquire leadership in an election, returning a LeaderKey @@ -74,7 +85,7 @@ with_leader(Ctx, Leader) -> campaign(Ctx) -> eetcd_election_gen:campaign(Ctx). --spec campaign(Ctx :: context()|name(), Name :: binary(), LeaseId :: integer(), Value :: binary()) -> +-spec campaign(Ctx :: new_context(), Name :: binary(), LeaseId :: integer(), Value :: binary()) -> {ok, router_pb:'Etcd.CampaignResponse'()} | {error, eetcd_error()}. campaign(Ctx, Name, LeaseId, Value) -> Ctx1 = new(Ctx), @@ -127,16 +138,15 @@ campaign_request(ConnName, Name, LeaseId, Value) -> campaign_response(CCtx, Msg) -> case resp_stream(CCtx, Msg) of {ok, Bin} -> - #{monitor_ref := MRef} = CCtx, - erlang:demonitor(MRef, [flush]), + case maps:get(monitor_ref, CCtx, undefined) of + MRef when is_reference(MRef) -> + erlang:demonitor(MRef, [flush]); + _ -> + ok + end, {ok, #{leader := Leader}, <<>>} = eetcd_grpc:decode(identity, Bin, 'Etcd.CampaignResponse'), - {ok, #{ - campaign => Leader, - http2_pid => undefined, - monitor_ref => undefined, - stream_ref => undefined - }}; + {ok, #{campaign => Leader}}; Other -> Other end. @@ -163,11 +173,11 @@ campaign_response(CCtx, Msg) -> proclaim(Ctx) -> eetcd_election_gen:proclaim(Ctx). --spec proclaim(Ctx :: context()|name(), Leader :: binary(), Value :: binary()) -> +-spec proclaim(Ctx :: new_context(), LeaderKey :: leader_key(), Value :: binary()) -> {ok, router_pb:'Etcd.ProclaimResponse'()} | {error, eetcd_error()}. -proclaim(Ctx, Leader, Val) -> +proclaim(Ctx, LeaderKey, Val) -> Ctx1 = new(Ctx), - Ctx2 = with_leader(Ctx1, Leader), + Ctx2 = with_leader(Ctx1, LeaderKey), Ctx3 = with_value(Ctx2, Val), eetcd_election_gen:proclaim(Ctx3). @@ -192,11 +202,11 @@ proclaim(Ctx, Leader, Val) -> resign(Ctx) -> eetcd_election_gen:resign(Ctx). --spec resign(Ctx :: context()|name(), Leader :: binary()) -> +-spec resign(Ctx :: new_context(), LeaderKey :: leader_key()) -> {ok, router_pb:'Etcd.ResignResponse'()} | {error, eetcd_error()}. -resign(Ctx, Leader) -> +resign(Ctx, LeaderKey) -> Ctx1 = new(Ctx), - Ctx2 = with_leader(Ctx1, Leader), + Ctx2 = with_leader(Ctx1, LeaderKey), eetcd_election_gen:resign(Ctx2). %%% @doc @@ -219,7 +229,7 @@ resign(Ctx, Leader) -> leader(Ctx) -> eetcd_election_gen:leader(Ctx). --spec leader(Ctx :: context()|name(), Name :: binary()) -> +-spec leader(Ctx :: new_context(), Name :: binary()) -> {ok, router_pb:'Etcd.LeaderResponse'()} | {error, eetcd_error()}. leader(Ctx, Name) -> Ctx1 = new(Ctx), @@ -263,7 +273,7 @@ observe(ConnName, Name, Timeout) -> http2_pid => Gun, monitor_ref => MRef, stream_ref => StreamRef, - leader => 'election_no_leader' + leader => election_no_leader } }; {error, _} = Err2 -> @@ -305,14 +315,14 @@ resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef}, {gun_error, Pid, SRef, Reason}) -> %% stream error erlang:demonitor(MRef, [flush]), gun:cancel(Pid, SRef), - {error, {stream_error, Reason}}; + {error, {gun_stream_error, Reason}}; resp_stream(#{http2_pid := Pid, stream_ref := SRef, monitor_ref := MRef}, {gun_error, Pid, Reason}) -> %% gun connection process state error erlang:demonitor(MRef, [flush]), gun:cancel(Pid, SRef), - {error, {conn_error, Reason}}; + {error, {gun_conn_error, Reason}}; resp_stream(#{http2_pid := Pid, monitor_ref := MRef}, {'DOWN', MRef, process, Pid, Reason}) -> %% gun connection down erlang:demonitor(MRef, [flush]), - {error, {http2_down, Reason}}; + {error, {gun_down, Reason}}; resp_stream(_OCtx, _UnKnow) -> unknown. diff --git a/src/eetcd_kv.erl b/src/eetcd_kv.erl index 59f7e94..c3a2d5e 100644 --- a/src/eetcd_kv.erl +++ b/src/eetcd_kv.erl @@ -21,8 +21,9 @@ ]). %%% @doc Create context for request. --spec new(atom()|reference()) -> context(). +-spec new(new_context()) -> context(). new(Context) -> eetcd:new(Context). + -spec new() -> context(). new() -> #{}. @@ -62,12 +63,12 @@ with_range_end(Context, End) -> %% @doc Limit the number of results to return from `get' request. %% If with_limit is given a 0 limit, it is treated as no limit. --spec with_limit(context(), non_neg_integer()) -> context(). +-spec with_limit(context(), integer()) -> context(). with_limit(Context, End) -> maps:put(limit, End, Context). %% @doc Specifies the store revision for `get' request. --spec with_rev(context(), pos_integer()) -> context(). +-spec with_rev(context(), integer()) -> context(). with_rev(Context, Rev) -> maps:put(revision, Rev, Context). @@ -219,7 +220,7 @@ with_physical(Context) -> put(Context) -> eetcd_kv_gen:put(Context). %%% @doc Put puts a key-value pair into etcd with options {@link put/1} --spec put(name()|context(), key(), value()) -> +-spec put(new_context(), key(), value()) -> {ok, router_pb:'Etcd.PutResponse'()}|{error, eetcd_error()}. put(Context, Key, Value) -> C1 = new(Context), @@ -269,8 +270,9 @@ put(Context, Key, Value) -> -spec get(context()) -> {ok, router_pb:'Etcd.RangeResponse'()}|{error, eetcd_error()}. get(Context) when is_map(Context) -> eetcd_kv_gen:range(Context). + %%% @doc Get retrieves keys with options. --spec get(context()|name(), key()) -> +-spec get(new_context(), key()) -> {ok, router_pb:'Etcd.RangeResponse'()}|{error, eetcd_error()}. get(Context, Key) -> C0 = new(Context), @@ -299,7 +301,7 @@ get(Context, Key) -> {ok, router_pb:'Etcd.DeleteRangeResponse'()}|{error, eetcd_error()}. delete(Context) when is_map(Context) -> eetcd_kv_gen:delete_range(Context). %%% @doc Delete deletes a key with options --spec delete(name()|context(), key()) -> +-spec delete(new_context(), key()) -> {ok, router_pb:'Etcd.DeleteRangeResponse'()}|{error, eetcd_error()}. delete(Context, Key) -> C0 = new(Context), @@ -327,7 +329,7 @@ delete(Context, Key) -> {ok, router_pb:'Etcd.CompactionResponse'()}|{error, eetcd_error()}. compact(Context) when is_map(Context) -> eetcd_kv_gen:compact(Context). %% @doc Compact compacts etcd KV history before the given revision with options --spec compact(name()|context(), integer()) -> +-spec compact(new_context(), integer()) -> {ok, router_pb:'Etcd.CompactionResponse'()}|{error, eetcd_error()}. compact(Context, Revision) -> C0 = new(Context), @@ -357,7 +359,7 @@ compact(Context, Revision) -> %%% {@link eetcd_op:put/1} {@link eetcd_op:get/1} %%% {@link eetcd_op:delete/1} {@link eetcd_op:txn/1} %%% @end --spec txn(name()|context(), [router_pb:'Etcd.Compare'()], [router_pb:'Etcd.RequestOp'()], [router_pb:'Etcd.RequestOp'()]) -> +-spec txn(new_context(), [router_pb:'Etcd.Compare'()], [router_pb:'Etcd.RequestOp'()], [router_pb:'Etcd.RequestOp'()]) -> {ok, router_pb:'Etcd.TxnResponse'()}|{error, eetcd_error()}. txn(Context, If, Then, Else) -> C1 = new(Context), diff --git a/src/eetcd_lease.erl b/src/eetcd_lease.erl index 8397b3c..e787b10 100644 --- a/src/eetcd_lease.erl +++ b/src/eetcd_lease.erl @@ -16,7 +16,7 @@ -define(TRY_RECONNECTING, try_reconnecting). %%% @doc Create context for request. --spec new(name()|context()) -> context(). +-spec new(new_context()) -> context(). new(Context) -> eetcd:new(Context). %% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, @@ -26,7 +26,7 @@ new(Context) -> eetcd:new(Context). with_timeout(Context, Timeout) -> eetcd:with_timeout(Context, Timeout). %% @doc Grant creates a new lease with the provided TTL in seconds. --spec grant(context(), pos_integer()) -> +-spec grant(new_context(), pos_integer()) -> {ok, router_pb:'Etcd.LeaseGrantResponse'()}|{error, eetcd_error()}. grant(Context, TTL) -> C1 = new(Context), @@ -34,7 +34,7 @@ grant(Context, TTL) -> eetcd_lease_gen:lease_grant(C2). %% @doc Revoke revokes the given lease. --spec revoke(name()|context(), pos_integer()) -> +-spec revoke(new_context(), pos_integer()) -> {ok, router_pb:'Etcd.LeaseGrantResponse'()}|{error, eetcd_error()}. revoke(Context, LeaseID) -> C1 = new(Context), @@ -43,21 +43,21 @@ revoke(Context, LeaseID) -> %% @doc TimeToLive retrieves the lease information of the given lease ID. %% The 3rd argument is a option of `NeedAttachedKeys'. --spec time_to_live(context(), pos_integer(), boolean()) -> - {ok, router_pb:'Etcd.LeaseGrantResponse'()}|{error, eetcd_error()}. +-spec time_to_live(new_context(), pos_integer(), boolean()) -> + {ok, router_pb:'Etcd.LeaseTimeToLiveResponse'()}|{error, eetcd_error()}. time_to_live(Context, LeaseID, WithKeys) when is_boolean(WithKeys) -> C1 = new(Context), C2 = maps:put('ID', LeaseID, C1), C3 = maps:put(keys, WithKeys, C2), case eetcd_lease_gen:lease_time_to_live(C3) of {ok, #{'TTL' := TTL}} when TTL =< 0 -> - {error, #{'grpc-status' => ?GRPC_STATUS_NOT_FOUND, 'grpc-message' => ?LeaseNotFound}}; + {error, {grpc_error, #{'grpc-status' => ?GRPC_STATUS_NOT_FOUND, 'grpc-message' => ?LeaseNotFound}}}; {ok, _Reps} = Status -> Status; {error, _Reason} = Err -> Err end. %% @doc Leases retrieves all leases. --spec leases(context()) -> +-spec leases(new_context()) -> {ok, router_pb:'Etcd.LeaseLeasesResponse'()}|{error, eetcd_error()}. leases(ConnName) -> C1 = new(ConnName), @@ -106,7 +106,12 @@ close() -> end, 0, supervisor:which_children(eetcd_lease_sup)). --spec start_link(pid(), name(), integer()) -> {ok, pid()}. +-spec start_link(pid(), name(), integer()) -> Result + when Result :: {ok, Pid} | ignore | {error, Error}, + Pid :: pid(), + Error :: {already_started, Pid} | term(). +%% The gen_server:start_ret() return type was introduced since OTP 25, +%% but for backward compatibility, we still use the old return type. start_link(Caller, Name, LeaseID) -> gen_server:start_link(?MODULE, [Caller, Name, LeaseID], []). diff --git a/src/eetcd_lock.erl b/src/eetcd_lock.erl index 1858cee..a23da9c 100644 --- a/src/eetcd_lock.erl +++ b/src/eetcd_lock.erl @@ -6,7 +6,7 @@ %%% @doc Creates a blank context for a request. --spec new(atom()|reference()) -> context(). +-spec new(new_context()) -> context(). new(Context) -> eetcd:new(Context). %% @doc Timeout is an integer greater than zero which specifies how many milliseconds to wait for a reply, @@ -44,7 +44,7 @@ with_key(Context, Key) -> lock(Context) -> eetcd_lock_gen:lock(Context). --spec lock(Ctx :: context()|name(), Name :: binary(), LeaseID :: pos_integer()) -> {ok, router_pb:'Etcd.LockResponse'()} | {error, eetcd_error()}. +-spec lock(Ctx :: new_context(), Name :: binary(), LeaseID :: pos_integer()) -> {ok, router_pb:'Etcd.LockResponse'()} | {error, eetcd_error()}. lock(Context0, Name, LeaseID) -> Context1 = new(Context0), Context = with_lease(with_name(Context1, Name), LeaseID), @@ -53,7 +53,7 @@ lock(Context0, Name, LeaseID) -> %%% @doc Unlock takes a key returned by Lock and releases the hold on lock. The %%% next Lock caller waiting for the lock will then be woken up and given %%% ownership of the lock. --spec unlock(Ctx :: context()|name(), Key :: binary()) -> {ok, router_pb:'Etcd.UnlockRequest'()} | {error, eetcd_error()}. +-spec unlock(Ctx :: new_context(), Key :: binary()) -> {ok, router_pb:'Etcd.UnlockResponse'()} | {error, eetcd_error()}. unlock(Context0, Key) -> Context1 = new(Context0), Context = with_key(Context1, Key), diff --git a/src/eetcd_maintenance.erl b/src/eetcd_maintenance.erl index f321a89..a41351c 100644 --- a/src/eetcd_maintenance.erl +++ b/src/eetcd_maintenance.erl @@ -5,7 +5,7 @@ -export([defragment/2, status/2, hash_kv/3, move_leader/2]). %%% @doc AlarmList gets all active alarms. --spec alarm_list(name()|context()) -> +-spec alarm_list(new_context()) -> {ok,router_pb:'Etcd.AlarmResponse'()}|{error,eetcd_error()}. alarm_list(ConnName) -> C1 = eetcd:new(ConnName), @@ -15,7 +15,7 @@ alarm_list(ConnName) -> eetcd_maintenance_gen:alarm(C4). %%% @doc AlarmDisarm disarms a given alarm. --spec alarm_disarm(name()|context(), integer(), integer()) -> +-spec alarm_disarm(new_context(), integer(), integer()) -> {ok,router_pb:'Etcd.AlarmResponse'()}|{error,eetcd_error()}. alarm_disarm(Context, MemberId, Alarm) -> C1 = eetcd:new(Context), @@ -25,7 +25,7 @@ alarm_disarm(Context, MemberId, Alarm) -> eetcd_maintenance_gen:alarm(C4). %%% @doc AlarmDisarmAll disarms all alarm. --spec alarm_disarm_all(name()|context()) -> +-spec alarm_disarm_all(new_context()) -> router_pb:'Etcd.AlarmResponse'(). alarm_disarm_all(ConnName) -> {ok, Acc0 = #{alarms := List}} = alarm_list(ConnName), @@ -79,7 +79,7 @@ hash_kv(Endpoint, Options, Rev) -> %%% @doc MoveLeader requests current leader to transfer its leadership to the transferee. %%% Request must be made to the leader. --spec move_leader(name()|context(), pos_integer()) -> +-spec move_leader(new_context(), pos_integer()) -> {ok,router_pb:'Etcd.MoveLeaderResponse'()}|{error,eetcd_error()}. move_leader(Context, TargetID) -> C1 = eetcd:new(Context), diff --git a/src/eetcd_stream.erl b/src/eetcd_stream.erl index 598d144..a7a16f5 100644 --- a/src/eetcd_stream.erl +++ b/src/eetcd_stream.erl @@ -13,7 +13,7 @@ Name :: name(), Path :: iodata(), GunPid :: pid(), - Http2Ref :: reference(). + Http2Ref :: gun:stream_ref(). new(Name, Path) -> case eetcd_conn:round_robin_select(Name) of {ok, Pid, Headers} -> @@ -28,7 +28,7 @@ new(Name, Path) -> EtcdMsgName :: atom(), Http2Path :: iodata(), GunPid :: pid(), - Http2Ref :: reference(). + Http2Ref :: gun:stream_ref(). new(Name, Msg, MsgName, Path) -> case new(Name, Path) of {ok, Pid, Ref} -> @@ -39,7 +39,7 @@ new(Name, Msg, MsgName, Path) -> -spec data(GunPid, Http2Ref, EtcdMsg, EtcdMsgName, IsFin) -> Http2Ref when GunPid :: pid(), - Http2Ref :: reference(), + Http2Ref :: gun:stream_ref(), EtcdMsg :: map(), EtcdMsgName :: atom(), IsFin :: fin | nofin. @@ -73,7 +73,7 @@ unary(Request, RequestName, Path, ResponseType) -> Http2Path :: iodata(), EtcdResponseType :: atom(), Http2Headers :: list(), - EtcdResponse :: tuple(). + EtcdResponse :: {ok, term()} | {error, eetcd_error()}. unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -> Timeout = maps:get(eetcd_reply_timeout, Request, 9000), EncodeBody = eetcd_grpc:encode(identity, maps:remove(eetcd_reply_timeout, Request), RequestName), @@ -82,12 +82,7 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) - Res = case await(Pid, StreamRef, Timeout, MRef) of {response, nofin, 200, _Headers} -> - case await_body(Pid, StreamRef, Timeout, MRef) of - {ok, ResBody, _Trailers} -> - {ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType), - {ok, Resp}; - {error, _} = Error1 -> Error1 - end; + await_body(Pid, StreamRef, Timeout, MRef, ResponseType); {response, fin, 200, RespHeaders} -> case eetcd_grpc:grpc_status(RespHeaders) of #{'grpc-status' := ?GRPC_STATUS_UNAUTHENTICATED, @@ -96,14 +91,9 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) - StreamRef1 = gun:request(Pid, <<"POST">>, Path, NewHeaders, EncodeBody), case await(Pid, StreamRef1, Timeout, MRef) of {response, nofin, 200, _Headers} -> - case await_body(Pid, StreamRef1, Timeout, MRef) of - {ok, ResBody, _Trailers} -> - {ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType), - {ok, Resp}; - {error, _} = Error1 -> Error1 - end; - {response, fin, 200, RespHeaders} -> - {error, {grpc_error, eetcd_grpc:grpc_status(RespHeaders)}} + await_body(Pid, StreamRef1, Timeout, MRef, ResponseType); + {response, fin, 200, RespHeaders1} -> + {error, {grpc_error, eetcd_grpc:grpc_status(RespHeaders1)}} end; Error3 -> {error, {grpc_error, Error3}} end; @@ -112,6 +102,22 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) - erlang:demonitor(MRef, [flush]), Res. +%% `gun:await/2,3,4`, `gun:await_body/2,3,4` and `gun:await_up/1,2,3` don't distinguish the error types until v2.0.0. +%% They can be a timeout, a connection error, a stream error or a down error (when the Gun process exited while waiting). +%% so we copy some code from gun v2.0.0 to replace `gun:await/4` +%% TODO remove this when upgrade gun to v2.0.0 +-spec await(ServerPid, StreamRef, Timeout, MonitorRef) -> + {response, IsFin, StatusCode, Headers} | + {data, IsFin, Data} | {error, eetcd_error()} + when + ServerPid :: pid(), + StreamRef :: gun:stream_ref(), + Timeout :: timeout(), + MonitorRef :: reference(), + IsFin :: fin | nofin, + StatusCode :: non_neg_integer(), + Headers :: [{binary(), binary()}], + Data :: binary(). await(ServerPid, StreamRef, Timeout, MRef) -> case gun:await(ServerPid, StreamRef, Timeout, MRef) of {response, _, _, _}=Resp -> @@ -125,8 +131,13 @@ await(ServerPid, StreamRef, Timeout, MRef) -> await(ServerPid, StreamRef, Timeout, MRef) end. -await_body(ServerPid, StreamRef, Timeout, MRef) -> - transfer_error(gun:await_body(ServerPid, StreamRef, Timeout, MRef)). +await_body(ServerPid, StreamRef, Timeout, MRef, ResponseType) -> + case transfer_error(gun:await_body(ServerPid, StreamRef, Timeout, MRef)) of + {ok, ResBody, _Trailers} -> + {ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType), + {ok, Resp}; + {error, _} = Error -> Error + end. transfer_error({error, {stream_error, Reason}}) -> {error, {gun_stream_error, Reason}}; diff --git a/src/eetcd_sup.erl b/src/eetcd_sup.erl index fc29d5c..4cc4646 100644 --- a/src/eetcd_sup.erl +++ b/src/eetcd_sup.erl @@ -18,7 +18,7 @@ %% API functions %%==================================================================== --spec start_link() -> {ok, pid()}. +-spec start_link() -> supervisor:startlink_ret(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). diff --git a/src/eetcd_watch.erl b/src/eetcd_watch.erl index d50bcab..27ecb43 100644 --- a/src/eetcd_watch.erl +++ b/src/eetcd_watch.erl @@ -4,7 +4,7 @@ -export_type([watch_conn/0]). -type watch_conn() :: #{http2_pid => pid(), monitor_ref => reference(), - stream_ref => reference(), + stream_ref => gun:stream_ref(), %% A buffer for incompleted response frame unprocessed => binary(), @@ -121,13 +121,13 @@ with_range_end(Context, End) -> %% @doc @equiv watch(name(), context(), 5000). -spec watch(name(), context()) -> {ok, watch_conn(), WatchId :: pos_integer()} | - {error, {stream_error | conn_error | http2_down, term()} | timeout}. + {error, eetcd_error()}. watch(Name, CreateReq) -> watch(Name, CreateReq, undefined, 5000). -spec watch(name(), context(), Timeout :: pos_integer() | watch_conn() | undefined) -> {ok, watch_conn(), WatchId :: pos_integer()} | - {error, {stream_error | conn_error | http2_down, term()} | timeout}. + {error, eetcd_error()}. watch(Name, CreateReq, Timeout) when is_integer(Timeout) -> watch(Name, CreateReq, undefined, Timeout); watch(Name, CreateReq, WatchConn) -> @@ -148,7 +148,7 @@ watch(Name, CreateReq, WatchConn) -> %% in events that are sent to the created watcher through stream channel. -spec watch(name(), context(), watch_conn() | undefined, pos_integer()) -> {ok, watch_conn(), WatchId :: pos_integer()} | - {error, {stream_error | conn_error | http2_down, term()} | timeout}. + {error, eetcd_error()}. watch(_Name, CreateReq, #{http2_pid := Gun, stream_ref := StreamRef, monitor_ref := MRef} = WatchConn, Timeout) @@ -163,7 +163,7 @@ watch(Name, CreateReq, undefined, Timeout) -> %% Do watch request with a new watch stream. -spec watch_new_(context(), pid(), reference(), pos_integer()) -> {ok, watch_conn(), WatchId :: pos_integer()} | - {error, {stream_error | conn_error | http2_down, term()} | timeout}. + {error, eetcd_error()}. watch_new_(CreateReq, Gun, StreamRef, Timeout) -> Request = #{request_union => {create_request, CreateReq}}, MRef = erlang:monitor(process, Gun), @@ -207,7 +207,7 @@ watch_new_(CreateReq, Gun, StreamRef, Timeout) -> %% Do watch request with the re-used watch stream. -spec watch_reuse_(context(), watch_conn(), pos_integer()) -> {ok, watch_conn(), WatchId :: pos_integer()} | - {error, {stream_error | conn_error | http2_down, term()} | timeout}. + {error, eetcd_error()}. watch_reuse_(CreateReq, #{http2_pid := Gun, stream_ref := StreamRef, monitor_ref := MRef, @@ -244,7 +244,7 @@ watch_reuse_(CreateReq, #{http2_pid := Gun, }; {ok, #{created := false} = ReceivedMessage, _} -> - {error, {stream_error, ReceivedMessage}} + {error, {gun_stream_error, ReceivedMessage}} end; {error, _} = Err2 -> @@ -258,13 +258,13 @@ watch_reuse_(CreateReq, #{http2_pid := Gun, %%that is, a gun_* message received on the gun connection. %%If it is, then this function will parse the message, turn it into watch responses, and possibly take action given the responses. %%If there's no error, this function returns {ok, WatchConn, 'Etcd.WatchResponse'()}|{more, WatchConn} -%%If there's an error, {error, {grpc_error, stream_error | conn_error | http2_down, term()} | timeout} is returned. +%%If there's an error, {error, eetcd_error()} is returned. %%If the given message is not from the gun connection, this function returns unknown. -spec watch_stream(watch_conn(), Message) -> {ok, watch_conn(), router_pb:'Etcd.WatchResponse'()} | {more, watch_conn()} | unknown - | {error, {grpc_error, stream_error | conn_error | http2_down, term()}} when + | {error, eetcd_error()} when Message :: term(). watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed, watch_ids := Ids} = Conn, @@ -287,21 +287,21 @@ watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef}, {gun_trailers, Pid, SRef, [{<<"grpc-status">>, Status}, {<<"grpc-message">>, Msg}]}) -> erlang:demonitor(MRef, [flush]), gun:cancel(Pid, SRef), - {error, {grpc_error, ?GRPC_ERROR(Status, Msg)}}; + {error, ?GRPC_ERROR(Status, Msg)}; watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef}, {gun_error, Pid, SRef, Reason}) -> %% stream error erlang:demonitor(MRef, [flush]), gun:cancel(Pid, SRef), - {error, {stream_error, Reason}}; + {error, {gun_stream_error, Reason}}; watch_stream(#{http2_pid := Pid, stream_ref := SRef, monitor_ref := MRef}, {gun_error, Pid, Reason}) -> %% gun connection process state error erlang:demonitor(MRef, [flush]), gun:cancel(Pid, SRef), - {error, {conn_error, Reason}}; + {error, {gun_conn_error, Reason}}; watch_stream(#{http2_pid := Pid, monitor_ref := MRef}, {'DOWN', MRef, process, Pid, Reason}) -> %% gun connection down erlang:demonitor(MRef, [flush]), - {error, {http2_down, Reason}}; + {error, {gun_down, Reason}}; watch_stream(_Conn, _UnKnow) -> unknown. %% @doc Rev returns the current revision of the KV the stream watches on. @@ -315,7 +315,7 @@ rev(#{revision := Rev}) -> Rev. %% Notice that this function will cancel all the watches in the same stream. -spec unwatch(watch_conn(), Timeout) -> {ok, Responses, OtherEvents} - | {error, {stream_error | conn_error | http2_down, term()} | timeout, Responses, OtherEvents} when + | {error, eetcd_error(), Responses, OtherEvents} when Timeout :: pos_integer(), Responses :: [router_pb:'Etcd.WatchResponse'()], OtherEvents :: [router_pb:'Etcd.WatchResponse'()]. diff --git a/test/eetcd_election_SUITE.erl b/test/eetcd_election_SUITE.erl index ef64b6d..5482b6d 100644 --- a/test/eetcd_election_SUITE.erl +++ b/test/eetcd_election_SUITE.erl @@ -150,12 +150,12 @@ campaign_async(_Config) -> {ok, Campaign2} = eetcd_election_leader_example:get_campaign(Pid2), ?assertMatch(#{campaign := #{name := LeaderKey}}, Campaign1), ?assertMatch(#{campaign := 'waiting_campaign_response'}, Campaign2), - + true = eetcd_election_leader_example:resign(Pid1), timer:sleep(500), {ok, Campaign3} = eetcd_election_leader_example:get_campaign(Pid2), ?assertMatch(#{campaign := #{name := LeaderKey}}, Campaign3), - + eetcd_election_leader_example:stop(Pid1), eetcd_election_leader_example:stop(Pid2), ok. @@ -174,4 +174,4 @@ new_lease(Sec) -> {ok, #{'ID' := Id}} = eetcd_lease:grant(?Name, Sec), {ok, _Pid} = eetcd_lease:keep_alive(?Name, Id), Id. - \ No newline at end of file +