From ffe10bb19a68b91843604bbfddf9045742feddc4 Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Fri, 12 Apr 2013 13:23:20 +0400 Subject: [PATCH 1/7] Add eredis_sentinel_client/masters auxilary modules --- src/eredis_sentinel_client.erl | 40 +++++++ src/eredis_sentinel_masters.erl | 202 ++++++++++++++++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 src/eredis_sentinel_client.erl create mode 100644 src/eredis_sentinel_masters.erl diff --git a/src/eredis_sentinel_client.erl b/src/eredis_sentinel_client.erl new file mode 100644 index 00000000..01aa3299 --- /dev/null +++ b/src/eredis_sentinel_client.erl @@ -0,0 +1,40 @@ +%% Sentinel client connection functions. +%% +-module(eredis_sentinel_client). +-author("Mikl Kurkov "). + +-include("eredis.hrl"). +-include("eredis_sentinel.hrl"). + +%% API +-export([start_link/2, stop/1, get_master/2]). + +%%% API --------------------------------------------------------------- + +start_link(Host, Port) when is_list(Host), is_integer(Port) -> + eredis:start_link(Host, Port, undefined, "", no_reconnect). + +stop(Pid) when is_pid(Pid) -> + catch eredis:stop(Pid). + +get_master(Pid, MasterName) when is_pid(Pid), is_atom(MasterName) -> + Req = ["SENTINEL", "get-master-addr-by-name", atom_to_list(MasterName)], + try get_master_response(eredis:q(Pid,Req)) of + Result -> + Result + catch Type:Error -> + {error, ?SENTINEL_UNREACHABLE} + end. + + + +%%% Internal ---------------------------------------------------------- + +get_master_response({ok, [HostBin, PortBin]}) -> + Host = binary_to_list(HostBin), + Port = list_to_integer(binary_to_list(PortBin)), + {ok, {Host, Port}}; +get_master_response({ok, undefined}) -> + {error, ?MASTER_UNKNOWN}; +get_master_response({error, <<"IDONTKNOW", _Rest/binary >>}) -> + {error, ?MASTER_UNREACHABLE}. diff --git a/src/eredis_sentinel_masters.erl b/src/eredis_sentinel_masters.erl new file mode 100644 index 00000000..1b71b94f --- /dev/null +++ b/src/eredis_sentinel_masters.erl @@ -0,0 +1,202 @@ +%% ADT for sentinel masters data. +%% Keeps current masters host/port and list of subscribers. +%% Notifies subscribers when master changes. + +-module(eredis_sentinel_masters). +-author("Mikl Kurkov "). + +%% Imports +-import(lists, [sort/1]). + +%% API +-export([new/0, update/4, subscribe/3, unsubscribe/2]). + +%% Records +-record(master, { + name :: master_name(), + host :: master_host(), + port :: master_port(), + pids :: [pid()]}). + +%% Types +-type master_host() :: string(). +-type master_port() :: integer(). +-type master_name() :: atom(). +-type masters() :: [#master{}]. + +%%% API --------------------------------------------------------------- + +%% @doc Masters initialization +-spec new() -> {ok, masters()}. +new() -> + {ok,[]}. + +%% @doc Add new master or update if it already exists +-spec update(masters(), master_name(), master_host(), master_port()) -> {ok,masters()}. +update(Masters, MasterName, Host, Port) + when is_list(Masters), + is_atom(MasterName), + is_list(Host), + is_integer(Port) + -> + case find(Masters, MasterName) of + {ok, Master} -> + NewMaster = update_master(Master, Host, Port); + undefined -> + NewMaster = new_master(MasterName, Host, Port) + end, + {ok, set_master(Masters, NewMaster)}. + +%% @doc Subscribe process to master updates. +-spec subscribe(masters(), master_name(), pid()) -> {ok, masters()} | {error, no_master_found}. +subscribe(Masters, MasterName, Pid) + when is_list(Masters), is_atom(MasterName), is_pid(Pid) -> + case find(Masters, MasterName) of + {ok, Master} -> + NewMaster = add_pid(Master, Pid), + {ok, set_master(Masters, NewMaster)}; + undefined -> + {error, no_master_found} + end. + +%% @doc Unsubscribe process from all masters. +-spec unsubscribe(masters(), pid()) -> {ok, masters()}. +unsubscribe(Masters, Pid) + when is_list(Masters), is_pid(Pid) -> + RemovePid = fun(M) -> rm_pid(M, Pid) end, + {ok, lists:map(RemovePid, Masters)}. + + +%%% Internal ---------------------------------------------------------- + +new_master(MasterName, Host, Port) -> + #master{name = MasterName, host = Host, port = Port, pids =[]}. + +find(Masters,MasterName) -> + case lists:keysearch(MasterName, #master.name, Masters) of + {value, Master} -> + {ok, Master}; + false -> + undefined + end. + +set_master(Masters, Master) -> + lists:keystore(Master#master.name, #master.name, Masters, Master). + +update_master(#master{host=Host, port=Port}=Master, Host, Port) -> + Master; +update_master(Master, Host, Port) -> + MasterNew = Master#master{host=Host,port=Port}, + notify_pids(MasterNew), + MasterNew. + +notify_pids(#master{pids=Pids, name=Name, host=Host, port=Port}) -> + Message = {sentinel, {reconnect, Name, Host, Port}}, + [ P ! Message || P <- Pids], + ok. + +add_pid(#master{pids=Pids} = Master, Pid) -> + Master#master{pids = lists:umerge(Pids, [Pid])}. + +rm_pid(#master{pids=Pids} = Master, Pid) -> + Master#master{pids = Pids -- [Pid]}. + + +%%% Tests ------------------------------------------------------------- + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +two_masters() -> + {ok, M0} = new(), + {ok, M1} = update(M0, master1, "host1", 1), + {ok, M2} = update(M1, master2, "host2", 2), + M2. + +add_master_test() -> + {ok, M3} = update(two_masters(), master3, "host3", 3), + ?assertMatch( + [#master{name=master1, host="host1", port=1, pids=[]}, + #master{name=master2, host="host2", port=2, pids=[]}, + #master{name=master3, host="host3", port=3, pids=[]} + ], + sort(M3)). + +update_master_host_test() -> + {ok, M3} = update(two_masters(), master2, "host22", 2), + ?assertMatch( + [#master{name=master1, host="host1", port=1, pids=[]}, + #master{name=master2, host="host22", port=2, pids=[]}], + sort(M3)). + +update_master_port_test() -> + {ok, M3} = update(two_masters(), master2, "host2", 22), + ?assertMatch( + [#master{name=master1, host="host1", port=1, pids=[]}, + #master{name=master2, host="host2", port=22, pids=[]}], + sort(M3)). + +find_test() -> + ?assertMatch(undefined, + find(two_masters(), master3)), + ?assertMatch({ok, #master{name=master1, host="host1", port=1, pids=[]}}, + find(two_masters(), master1)), + ?assertMatch({ok, #master{name=master2, host="host2", port=2, pids=[]}}, + find(two_masters(), master2)). + +subscribe_test() -> + Pid = self(), + ?assertMatch({error, no_master_found}, + subscribe(two_masters(), master3, self())), + ?assertMatch([#master{name=master1,host="host1",port=1, pids=[Pid]}, + #master{name=master2, host="host2", port=2, pids=[]}], + sort(ok(subscribe(two_masters(), master1, Pid)) )), + ?assertMatch([#master{name=master1,host="host1",port=1, pids=[Pid]}, + #master{name=master2, host="host2", port=2, pids=[]}], + sort(ok(subscribe( + ok(subscribe(two_masters(), master1, Pid)), + master1, Pid)))). + +unsubscribe_test() -> + Pid1 = list_to_pid("<0.1.0>"), + Pid2 = list_to_pid("<0.2.0>"), + {ok,Ms1} = subscribe(two_masters(), master1, Pid1), + {ok,Ms2} = subscribe(Ms1, master2, Pid2), + {ok,Ms3} = unsubscribe(Ms2, Pid1), + ?assertMatch([#master{name=master1, host="host1", port=1, pids=[]}, + #master{name=master2, host="host2", port=2, pids=[Pid2]}], + sort(Ms3)). + +update_notify_test() -> + Pid = self(), + {ok, Ms1} = subscribe(two_masters(), master1, Pid), + {ok, Ms2} = update(Ms1, master1, "host1", 1), + ?assertMatch([], get_messages()), + + {ok, Ms3} = update(Ms2, master1, "host11", 1), + ?assertMatch(ok, get_message({sentinel, {reconnect, master1, "host11", 1}})), + ?assertMatch([], get_messages()), + + {ok, _} = update(Ms3, master1, "host11", 2), + ?assertMatch(ok, get_message({sentinel, {reconnect, master1, "host11", 2}})), + ?assertMatch([], get_messages()). + + +%%% Test helpers ------------------------------------------------------ + +ok({ok,S})-> S. + +get_message(Message) -> + receive + Message -> + ok + after 0 -> + {no_message, Message} + end. + +get_messages() -> + {messages, Res} = erlang:process_info(self(), messages), + Res. + + +-endif. From 31e71af8fadfbf2268c4f1c961f5fe96bd5a077c Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Tue, 16 Apr 2013 19:03:41 +0400 Subject: [PATCH 2/7] Add eredis_sentinel main module with test specs --- include/eredis_sentinel.hrl | 12 ++ priv/redis_cache1.conf | 7 + priv/redis_cache2.conf | 7 + priv/redis_sentinel1.conf | 21 +++ priv/redis_sentinel2.conf | 21 +++ priv/redis_session1.conf | 6 + priv/redis_session2.conf | 7 + src/eredis_sentinel.erl | 229 +++++++++++++++++++++++++++++++++ test/eredis_sentinel_tests.erl | 186 ++++++++++++++++++++++++++ 9 files changed, 496 insertions(+) create mode 100644 include/eredis_sentinel.hrl create mode 100644 priv/redis_cache1.conf create mode 100644 priv/redis_cache2.conf create mode 100644 priv/redis_sentinel1.conf create mode 100644 priv/redis_sentinel2.conf create mode 100644 priv/redis_session1.conf create mode 100644 priv/redis_session2.conf create mode 100644 src/eredis_sentinel.erl create mode 100644 test/eredis_sentinel_tests.erl diff --git a/include/eredis_sentinel.hrl b/include/eredis_sentinel.hrl new file mode 100644 index 00000000..20f5b5cf --- /dev/null +++ b/include/eredis_sentinel.hrl @@ -0,0 +1,12 @@ +%% Types +-type master_name() :: atom(). +-type master_host() :: string(). +-type master_port() :: integer(). + +%% Sentinel constants +-define(SENTINEL_PORT, 26379). + +% Sentinel errors +-define(SENTINEL_UNREACHABLE, sentinel_unreachable). +-define(MASTER_UNKNOWN, master_unknown). +-define(MASTER_UNREACHABLE, master_unreachable). diff --git a/priv/redis_cache1.conf b/priv/redis_cache1.conf new file mode 100644 index 00000000..2ba31dae --- /dev/null +++ b/priv/redis_cache1.conf @@ -0,0 +1,7 @@ +bind 127.0.0.1 +port 6382 +pidfile ./redis_cache1.pid +daemonize yes +timeout 0 +logfile stdout +#slaveof 127.0.0.1 6381 diff --git a/priv/redis_cache2.conf b/priv/redis_cache2.conf new file mode 100644 index 00000000..d47ee135 --- /dev/null +++ b/priv/redis_cache2.conf @@ -0,0 +1,7 @@ +bind 127.0.0.1 +port 6383 +pidfile ./redis_cache2.pid +daemonize yes +timeout 0 +logfile stdout +slaveof 127.0.0.1 6382 diff --git a/priv/redis_sentinel1.conf b/priv/redis_sentinel1.conf new file mode 100644 index 00000000..6765cfc1 --- /dev/null +++ b/priv/redis_sentinel1.conf @@ -0,0 +1,21 @@ +port 26380 +daemonize yes +pidfile ./redis_sentinel1.pid + +sentinel monitor session 127.0.0.1 6380 1 +sentinel down-after-milliseconds session 500 +sentinel failover-timeout session 10000 +sentinel can-failover session yes +sentinel parallel-syncs session 1 + +sentinel monitor cache 127.0.0.1 6382 1 +sentinel down-after-milliseconds cache 1000 +sentinel failover-timeout cache 10000 +sentinel can-failover cache yes +sentinel parallel-syncs cache 1 + +sentinel monitor badmaster localhost 6385 1 +sentinel down-after-milliseconds badmaster 1000 +sentinel failover-timeout badmaster 10000 +sentinel can-failover badmaster yes +sentinel parallel-syncs badmaster 1 diff --git a/priv/redis_sentinel2.conf b/priv/redis_sentinel2.conf new file mode 100644 index 00000000..9524cc01 --- /dev/null +++ b/priv/redis_sentinel2.conf @@ -0,0 +1,21 @@ +port 26381 +daemonize yes +pidfile ./redis_sentinel2.pid + +sentinel monitor session 127.0.0.1 6380 1 +sentinel down-after-milliseconds session 500 +sentinel failover-timeout session 10000 +sentinel can-failover session yes +sentinel parallel-syncs session 1 + +sentinel monitor cache 127.0.0.1 6382 1 +sentinel down-after-milliseconds cache 1000 +sentinel failover-timeout cache 10000 +sentinel can-failover cache yes +sentinel parallel-syncs cache 1 + +sentinel monitor badmaster localhost 6385 1 +sentinel down-after-milliseconds badmaster 1000 +sentinel failover-timeout badmaster 10000 +sentinel can-failover badmaster yes +sentinel parallel-syncs badmaster 1 diff --git a/priv/redis_session1.conf b/priv/redis_session1.conf new file mode 100644 index 00000000..f2d36898 --- /dev/null +++ b/priv/redis_session1.conf @@ -0,0 +1,6 @@ +bind 127.0.0.1 +port 6380 +pidfile ./redis_session1.pid +daemonize yes +timeout 0 +logfile stdout diff --git a/priv/redis_session2.conf b/priv/redis_session2.conf new file mode 100644 index 00000000..622376a3 --- /dev/null +++ b/priv/redis_session2.conf @@ -0,0 +1,7 @@ +bind 127.0.0.1 +port 6381 +pidfile ./redis_session2.pid +daemonize yes +timeout 0 +logfile stdout +slaveof 127.0.0.1 6380 diff --git a/src/eredis_sentinel.erl b/src/eredis_sentinel.erl new file mode 100644 index 00000000..54df4262 --- /dev/null +++ b/src/eredis_sentinel.erl @@ -0,0 +1,229 @@ +%% +%% Erlang Redis Sentinel connection module +%% +%% Redis Sentinel is a standart way to failover master in the cluster of redis nodes. +%% It is separate process that monitors cluster master and slaves. Normally there are multiple sentinels looking for cluster. +%% After master shutdown they collaborate to ellect new master from slave nodes. +%% +%% More information here: +%% Sentinel documentation - http://redis.io/topics/sentinel +%% Recomendations for clients authors - http://redis.io/topics/sentinel-clients +%% +%% Every sentinel can monitor multiple redis clusters. Every cluster has it name (master name). +%% At one time only one redis node in cluster can be master, the others are slaves. +%% This module starts process that keep track of all clusters and sentinels that whatch for them. +%% When client wants to connect to cluster it asks sentinels of that cluster which node is master and returns this +%% information to client. +%% +%% Usage: +%% {ok, SentinelConn} = eredis_sentinel:start_link(["sentinel-1.lan", {"sentinel-2.lan",26340}]), +%% {ok, Host, Port} = eredis_sentinel:get_master(session). +%% When client needs notifications about master changes: +%% {ok, Host, Port} = eredis_sentinel:get_master(session, [notify]}. +%% When sentinel process will see that master changed it will send notifications to all subscribers for that master: +%% {sentinel, {master_reconnect, Host, Port}} +%% +%% TBD: it is possible to have different sets of sentinels for different clusters. Do we need support for this case? +%% TODO: now eredis_sentinel knows that master changes only after get_master request. It is possible to receive notifications +%% about this through redis pub/sub channel. + +-module(eredis_sentinel). +-author("Mikl Kurkov "). + +-include("eredis.hrl"). +-include("eredis_sentinel.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0, start_link/1, stop/0]). +-export([get_master/1, get_master/2, get_current_sentinel/0]). + +%% GenServer +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(sentinel, { + host :: string(), + port :: integer() + }). + +-record(errors, { + sentinel_unreachable = 0 :: integer(), + master_unknown = 0 :: integer(), + master_unreachable = 0 :: integer(), + total = 0 :: integer() + }). + +-record(state, { + sentinels :: [#sentinel{}], + conn_pid :: undefined | pid(), + masters :: eredis_sentinel_master:masters(), + errors :: #errors{} + }). + +-record(get_master_req, { + master :: master_name(), + notify :: boolean() + }). + +%%% API --------------------------------------------------------------- +start_link() -> + start_link(["localhost"]). + +start_link(Conf) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []). + +stop() -> + gen_server:call(?MODULE, stop). + +get_master(MasterName) -> + get_master(MasterName, false). + +get_master(MasterName, Notify) when is_atom(MasterName), is_boolean(Notify) -> + gen_server:call(?MODULE, #get_master_req{master=MasterName, notify=Notify}). + +get_current_sentinel() -> + gen_server:call(?MODULE, get_current_sentinel). + +%%% GenServer --------------------------------------------------------- + +init(Sentinels) -> + process_flag(trap_exit, true), + State = #state{ + sentinels = [read_sentinel(S) || S <- Sentinels], + conn_pid = undefined, + masters = [], + errors = #errors{} }, + {ok, State}. + +handle_call(#get_master_req{master=MasterName, notify=Notify}, {FromPid,_Tag}, State) -> + case query_master(MasterName, State#state{errors = #errors{}}) of + {ok, {Host,Port}, S1} -> + {ok,Masters1} = eredis_sentinel_masters:update(S1#state.masters, MasterName, Host, Port), + {ok,Masters2} = + case Notify of + true -> + eredis_sentinel_masters:subscribe(Masters1, MasterName, FromPid); + false -> + {ok, Masters1} + end, + {reply, {ok, {Host,Port}}, S1#state{masters=Masters2}}; + {error, Error, S1} -> + {reply, {error, Error}, S1} + end; + +handle_call(get_current_sentinel, _From, #state{sentinels=[S|_],conn_pid=ConnPid} = State) -> + Res = {S#sentinel.host, S#sentinel.port, ConnPid}, + {reply, {ok, Res}, State}; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +%% Current sentinel connection broken +handle_info({'EXIT', Pid, _Reason}, #state{conn_pid = Pid} = S) -> + {noreply, S#state{conn_pid = undefined}}; + +%% Ignore late exit messages +handle_info({'EXIT', _Pid, _Reason}, S) -> + {noreply, S}; + +handle_info(_Info, State) -> + {stop, {unhandled_message, _Info}, State}. + +terminate(_Reason, State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%% Internal ---------------------------------------------------------- + +read_sentinel(Host) when is_list(Host) -> + #sentinel{host=Host, port=?SENTINEL_PORT}; +read_sentinel({Host,Port}) when is_list(Host), is_integer(Port) -> + #sentinel{host=Host, port=Port}. + + +%% Finding new master host for named cluster: +%% * First try to query already connected sentinel if we have one. +%% * If this failed try to connect and query all sentinels starting from the last connected one. +%% * If connected sentinel returns port:ip - return {ok, {Host, Port}} and remember connection pid. +%% * In case no sentinels return valid answer - response with error: +%% * If all sentinels failed connect to - return {error, sentinel_unreachable} +%% * If all connected sentinels return null - return {error, sentinel_master_unknown} +%% * If some of connected sentinels return -IDONTKNOW - return {error, sentinel_master_unreachable} + +-spec query_master(master_name(), #state{}) -> + {ok, {master_host(), master_port()}, #state{}} | {error, any(), #state{}}. + +%% All sentinels return errors +query_master(MasterName, #state{errors=Errors,sentinels=Sentinels} = S) + when Errors#errors.total >= length(Sentinels) -> + #errors{sentinel_unreachable=SU, master_unknown=MUK, master_unreachable=MUR} = Errors, + if + SU == length(Sentinels) -> + {error, ?SENTINEL_UNREACHABLE,S}; + MUK > 0, MUR == 0 -> + {error, ?MASTER_UNKNOWN,S}; + true -> + {error, ?MASTER_UNREACHABLE,S} + end; + +%% No connected sentinel +query_master(MasterName, #state{conn_pid=undefined, sentinels = [#sentinel{host=H,port=P} | _] } = S) -> + case eredis_sentinel_client:start_link(H,P) of + {ok, ConnPid} -> + query_master(MasterName, S#state{conn_pid=ConnPid}); + {error, E} -> + error_logger:error_msg("Error connecting to sentinel at ~p:~p : ~p~n",[H,P,E]), + Errors = update_errors(?SENTINEL_UNREACHABLE, S#state.errors), + Sentinels = rotate(S#state.sentinels), + query_master(MasterName, S#state{sentinels=Sentinels, errors=Errors}) + end; + +%% Sentinel connected +query_master(MasterName, #state{conn_pid=ConnPid, sentinels=[#sentinel{host=H,port=P}|_]} = S) when is_pid(ConnPid)-> + case eredis_sentinel_client:get_master(ConnPid, MasterName) of + {ok, HostPort} -> + {ok, HostPort, S}; + {error, Error} -> + error_logger:error_msg("Master request for ~p to sentinel ~p:~p failed with ~p~n", [MasterName,H,P,Error]), + eredis_sentinel_client:stop(ConnPid), + Errors = update_errors(Error, S#state.errors), + Sentinels = rotate(S#state.sentinels), + query_master(MasterName, S#state{conn_pid = undefined, errors=Errors, sentinels=Sentinels}) + end. + +update_errors(E, #errors{sentinel_unreachable=SU, master_unknown=MUK, master_unreachable=MUR, total = T} = Errors) -> + Errors1 = + case E of + ?SENTINEL_UNREACHABLE -> + Errors#errors{sentinel_unreachable = SU + 1}; + ?MASTER_UNKNOWN -> + Errors#errors{master_unknown = MUK + 1}; + ?MASTER_UNREACHABLE -> + Errors#errors{master_unreachable = MUR + 1} + end, + Errors1#errors{total = T + 1}. + + +rotate([]) -> []; +rotate([X|Xs]) -> Xs ++ [X]. + +all(Ts) -> lists:all(fun(T) -> T end, Ts). + + +%%% Unit tests -------------------------------------------------------- + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +rotate_test() -> + ?assertEqual([], rotate([])), + ?assertEqual([1], rotate([1])), + ?assertEqual([2,3,1], rotate([1,2,3])). + +-endif. diff --git a/test/eredis_sentinel_tests.erl b/test/eredis_sentinel_tests.erl new file mode 100644 index 00000000..9bda706b --- /dev/null +++ b/test/eredis_sentinel_tests.erl @@ -0,0 +1,186 @@ +-module(eredis_sentinel_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(SERVERS, [ + {session1, 6380}, + {session2, 6381}, + {cache1, 6382}, + {cache2, 6383} + ]). + +-define(SENTINELS, [ + {sentinel1, 26380}, + {sentinel2, 26381} + ]). + +-define(CONFIG, [{"localhost",26380}, {"localhost",26381}]). + +-compile(export_all). + +%% Suite initialization +start_cluster() -> + [start_redis(Name) || {Name, _Port} <- ?SERVERS], + [start_sentinel(Name) || {Name, _Port} <- ?SENTINELS], + ok. + +stop_cluster(_) -> + kill_servers([sentinel1, sentinel2, session1, session2, cache1, cache2], 2000), + ok. + +%%% Working with redis cluster +start_redis(Name) -> + run_cmd("redis-server ~s/redis_~s.conf", [code:priv_dir(eredis),Name]). + +start_sentinel(Name) -> + run_cmd("redis-sentinel ~p/redis_~s.conf ", [code:priv_dir(eredis), Name]). + +server(Name) -> + proplists:lookup(Name, ?SERVERS ++ ?SENTINELS). + +run_cmd(CmdFmt, Args) -> + Cmd = lists:flatten(io_lib:format(CmdFmt, Args)), + os:cmd(Cmd). + +%%% Test definition + +sentinel_test_() -> + error_logger:tty(false), + Ts = + [{setup, + fun start_cluster/0, + fun stop_cluster/1, + T + } || T <- tests() ], + {inorder, Ts}. + +tests() -> + [ + {"it returns sentinel_unreachable if no connections to sentinels", + fun t_no_sentinel_connection/0}, + + {"it returns master_unknown if no one sentinel knows about this cluster", + fun t_master_unknown/0}, + + {"it returns master_unreachable if no one sentinel knows master host for cluster", + fun t_master_unreachable/0}, + + {"it returns valid master host data", + fun t_normal_operation/0}, + + {"it uses the same connection for several master requests", + fun t_connection_reuse/0}, + + {"it connects to the next sentinel if current failed", + fun t_failed_sentinel/0}, + + {timeout, 30, + {"it returns new master host/port on redis failover", + fun t_failover/0}} + + ]. + + +%%% Tests + +t_no_sentinel_connection() -> + {ok,_Pid} = eredis_sentinel:start_link(["unreachablehost"]), + ?assertMatch({error, sentinel_unreachable}, eredis_sentinel:get_master(session)). + +t_master_unknown() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({error, master_unknown}, eredis_sentinel:get_master(unknonwmaster)). + +t_master_unreachable() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({error, master_unreachable}, eredis_sentinel:get_master(badmaster)). + +t_normal_operation() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + ?assertMatch({ok, {"127.0.0.1", 6382}}, eredis_sentinel:get_master(cache)). + +t_connection_reuse() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + {ok, {"localhost", 26380, ConnPid}} = eredis_sentinel:get_current_sentinel(), + ?assertMatch({ok, {"127.0.0.1", 6382}}, eredis_sentinel:get_master(cache)), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + ?assertMatch({ok, {"localhost", 26380, ConnPid}}, eredis_sentinel:get_current_sentinel()). + +t_failed_sentinel() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + {ok, {"localhost", 26380, ConnPid1}} = eredis_sentinel:get_current_sentinel(), + + eredis_sentinel_client:stop(ConnPid1), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + {ok, {"localhost", 26380, ConnPid2}} = eredis_sentinel:get_current_sentinel(), + ?assert(ConnPid1 =/= ConnPid2), + + kill_servers([sentinel1], 2000), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), + ?assertMatch({ok, {"localhost", 26381, _}}, eredis_sentinel:get_current_sentinel()). + +t_failover() -> + {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), + ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session, true)), + % this sleep need to sentinels finds out slaves of master + timer:sleep(1000), + % just change master, not kill it because real failover need up to 35 seconds to complete + change_master(6380,6381), + % waiting sentinels to see new master + timer:sleep(2000), + ?assertMatch({ok, {"127.0.0.1", 6381}}, eredis_sentinel:get_master(session)), + ?assertMatch([{sentinel, {reconnect, session, "127.0.0.1", 6381}}], get_messages()). + + +%%% Internal ---------------------------------------------------------- + +get_messages() -> + get_messages([]). +get_messages(Acc) -> + receive + M -> + get_messages([M|Acc]) + after 0 -> + Acc + end. + +%% Killing external OS processes + +kill_servers(Names, Timeout) -> + [kill_server(N) || N <- Names], + wait_or_die(Names, [], Timeout). + +kill_server(Name) -> + Cmd = lists:flatten(io_lib:format("kill $(cat ./redis_~s.pid)", [Name])), + os:cmd(Cmd). + +wait_or_die(Names, Acc, Timeout) when Timeout =< 0 -> + throw({"Killing servers timeout", Names++Acc}); +wait_or_die([], [], _) -> + ok; +wait_or_die([], Names, Timeout) -> + timer:sleep(100), + kill_servers(Names, Timeout - 100); +wait_or_die([Name|Names], Acc, Timeout) -> + case is_server_alive(Name) of + true -> + wait_or_die(Names, [Name | Acc ], Timeout); + _ -> + wait_or_die(Names, Acc, Timeout) + end. + +is_server_alive(Name) -> + Cmd = lists:flatten(io_lib:format("kill -0 $(cat ./redis_~s.pid) && echo -n ok", [Name])), + "ok" == os:cmd(Cmd). + +%% Failover imitation + +change_master(FromPort, ToPort) -> + {ok, From} = eredis:start_link("localhost", FromPort), + {ok, To} = eredis:start_link("localhost", ToPort), + eredis:q(To, ["slaveof", "no", "one"]), + eredis:q(From, ["slaveof", "localhost", ToPort]), + ok. From 9a8481533b527fbc8066bce873340a53b0b61b6f Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Wed, 17 Apr 2013 19:52:43 +0400 Subject: [PATCH 3/7] Add sentinel support to eredis_client --- priv/redis_sentinel1.conf | 2 +- priv/redis_sentinel2.conf | 2 +- src/eredis_client.erl | 80 ++++++++++++++++++++++++++++------ src/eredis_sentinel.erl | 6 +-- src/eredis_sentinel_client.erl | 3 +- test/eredis_sentinel_tests.erl | 51 +++++++++++++++++----- 6 files changed, 114 insertions(+), 30 deletions(-) diff --git a/priv/redis_sentinel1.conf b/priv/redis_sentinel1.conf index 6765cfc1..d011f454 100644 --- a/priv/redis_sentinel1.conf +++ b/priv/redis_sentinel1.conf @@ -3,7 +3,7 @@ daemonize yes pidfile ./redis_sentinel1.pid sentinel monitor session 127.0.0.1 6380 1 -sentinel down-after-milliseconds session 500 +sentinel down-after-milliseconds session 1000 sentinel failover-timeout session 10000 sentinel can-failover session yes sentinel parallel-syncs session 1 diff --git a/priv/redis_sentinel2.conf b/priv/redis_sentinel2.conf index 9524cc01..34a634c8 100644 --- a/priv/redis_sentinel2.conf +++ b/priv/redis_sentinel2.conf @@ -3,7 +3,7 @@ daemonize yes pidfile ./redis_sentinel2.pid sentinel monitor session 127.0.0.1 6380 1 -sentinel down-after-milliseconds session 500 +sentinel down-after-milliseconds session 1000 sentinel failover-timeout session 10000 sentinel can-failover session yes sentinel parallel-syncs session 1 diff --git a/src/eredis_client.erl b/src/eredis_client.erl index 39d8aeba..44298b18 100644 --- a/src/eredis_client.erl +++ b/src/eredis_client.erl @@ -39,6 +39,9 @@ port :: integer() | undefined, password :: binary() | undefined, database :: binary() | undefined, + + sentinel :: undefined | atom(), + reconnect_sleep :: reconnect_sleep() | undefined, socket :: port() | undefined, @@ -68,6 +71,14 @@ stop(Pid) -> %%==================================================================== init([Host, Port, Database, Password, ReconnectSleep]) -> + Sentinel = + case Host of + "sentinel:"++MasterStr -> + list_to_atom(MasterStr); + _ -> + undefined + end, + State = #state{host = Host, port = Port, database = read_database(Database), @@ -75,7 +86,9 @@ init([Host, Port, Database, Password, ReconnectSleep]) -> reconnect_sleep = ReconnectSleep, parser_state = eredis_parser:init(), - queue = queue:new()}, + queue = queue:new(), + sentinel = Sentinel + }, case connect(State) of {ok, NewState} -> @@ -126,18 +139,18 @@ handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect} = Stat {stop, normal, State#state{socket = undefined}}; handle_info({tcp_closed, _Socket}, State) -> - Self = self(), - spawn(fun() -> reconnect_loop(Self, State) end), - - %% Throw away the socket and the queue, as we will never get a - %% response to the requests sent on the old socket. The absence of - %% a socket is used to signal we are "down" - {noreply, State#state{socket = undefined, queue = queue:new()}}; + {ok, StateNew} = start_reconnect(State#state{socket = undefined}), + {noreply, StateNew}; %% Redis is ready to accept requests, the given Socket is a socket %% already connected and authenticated. -handle_info({connection_ready, Socket}, #state{socket = undefined} = State) -> - {noreply, State#state{socket = Socket}}; +%% Also keep master Host/Port in case it changed during reconnection +handle_info({connection_ready, Socket, Host, Port}, #state{socket = undefined} = State) -> + {noreply, State#state{socket = Socket, host=Host, port=Port}}; + +%% Notification from eredis_sentinel about new master +handle_info({sentinel, {reconnect, _MasterName, Host, Port}}, State) -> + do_sentinel_reconnect(Host, Port, State); %% eredis can be used in Poolboy, but it requires to support a simple API %% that Poolboy uses to manage the connections. @@ -250,7 +263,17 @@ safe_reply(From, Value) -> %% the correct database. These commands are synchronous and if Redis %% returns something we don't expect, we crash. Returns {ok, State} or %% {SomeError, Reason}. -connect(State) -> +connect(#state{sentinel = undefined} = State) -> + connect1(State); +connect(#state{sentinel = Master} = State) -> + case eredis_sentinel:get_master(Master, true) of + {ok, {Host, Port}} -> + connect1(State#state{host=Host, port=Port}); + {error, Error} -> + {error, {sentinel_error, Error}} + end. + +connect1(State) -> case gen_tcp:connect(State#state.host, State#state.port, ?SOCKET_OPTS) of {ok, Socket} -> case authenticate(Socket, State#state.password) of @@ -301,9 +324,9 @@ do_sync_command(Socket, Command) -> %% connection, give the socket to the redis client. reconnect_loop(Client, #state{reconnect_sleep = ReconnectSleep} = State) -> case catch(connect(State)) of - {ok, #state{socket = Socket}} -> + {ok, #state{socket = Socket, host=Host, port=Port}} -> gen_tcp:controlling_process(Socket, Client), - Client ! {connection_ready, Socket}; + Client ! {connection_ready, Socket, Host, Port}; {error, _Reason} -> timer:sleep(ReconnectSleep), reconnect_loop(Client, State); @@ -319,3 +342,34 @@ read_database(undefined) -> undefined; read_database(Database) when is_integer(Database) -> list_to_binary(integer_to_list(Database)). + + +%% Handle sentinel "reconnect to new master" message +%% 1. we already connected to new master - ignore +do_sentinel_reconnect(Host, Port, #state{host=Host,port=Port}=State) -> + {noreply, State}; +%% 2. we are waiting for reconnecting already - ignore +do_sentinel_reconnect(_Host, _Port, #state{socket=undefined}=State) -> + {noreply, State}; +%% 3. we are not supposed to reconnect - stop processing +do_sentinel_reconnect(_Host, _Port, #state{reconnect_sleep=no_reconnect}=State) -> + {stop, sentinel_reconnect, State}; +%% 4. we are connected to wrong master - reconnect +do_sentinel_reconnect(Host, Port, State) -> + {ok, StateNew} = start_reconnect(State#state{host=Host, port=Port}), + {noreply, StateNew}. + +%% @doc Start reconnecting loop, close old connection if present. +-spec start_reconnect(#state{}) -> {ok, #state{}}. +start_reconnect(#state{socket=undefined} = State) -> + Self = self(), + spawn(fun() -> reconnect_loop(Self, State) end), + + %% Throw away the socket and the queue, as we will never get a + %% response to the requests sent on the old socket. The absence of + %% a socket is used to signal we are "down" + %% TODO shouldn't we need to send error reply to waiting clients? + {ok, State#state{queue = queue:new()}}; +start_reconnect(#state{socket=Socket} = State) -> + gen_tcp:close(Socket), + start_reconnect(State#state{socket=undefined}). diff --git a/src/eredis_sentinel.erl b/src/eredis_sentinel.erl index 54df4262..e19b96cd 100644 --- a/src/eredis_sentinel.erl +++ b/src/eredis_sentinel.erl @@ -133,7 +133,7 @@ handle_info({'EXIT', _Pid, _Reason}, S) -> handle_info(_Info, State) -> {stop, {unhandled_message, _Info}, State}. -terminate(_Reason, State) -> +terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> @@ -160,7 +160,7 @@ read_sentinel({Host,Port}) when is_list(Host), is_integer(Port) -> {ok, {master_host(), master_port()}, #state{}} | {error, any(), #state{}}. %% All sentinels return errors -query_master(MasterName, #state{errors=Errors,sentinels=Sentinels} = S) +query_master(_MasterName, #state{errors=Errors,sentinels=Sentinels} = S) when Errors#errors.total >= length(Sentinels) -> #errors{sentinel_unreachable=SU, master_unknown=MUK, master_unreachable=MUR} = Errors, if @@ -213,8 +213,6 @@ update_errors(E, #errors{sentinel_unreachable=SU, master_unknown=MUK, master_unr rotate([]) -> []; rotate([X|Xs]) -> Xs ++ [X]. -all(Ts) -> lists:all(fun(T) -> T end, Ts). - %%% Unit tests -------------------------------------------------------- diff --git a/src/eredis_sentinel_client.erl b/src/eredis_sentinel_client.erl index 01aa3299..c7a24f01 100644 --- a/src/eredis_sentinel_client.erl +++ b/src/eredis_sentinel_client.erl @@ -23,7 +23,8 @@ get_master(Pid, MasterName) when is_pid(Pid), is_atom(MasterName) -> Result -> Result catch Type:Error -> - {error, ?SENTINEL_UNREACHABLE} + error_logger:error_msg("Sentinel error getting master ~p : ~p:~p", [MasterName, Type, Error]), + {error, ?SENTINEL_UNREACHABLE} end. diff --git a/test/eredis_sentinel_tests.erl b/test/eredis_sentinel_tests.erl index 9bda706b..7470dffe 100644 --- a/test/eredis_sentinel_tests.erl +++ b/test/eredis_sentinel_tests.erl @@ -76,7 +76,10 @@ tests() -> {timeout, 30, {"it returns new master host/port on redis failover", - fun t_failover/0}} + fun t_failover/0}}, + + {"eredis should understand sentine:master_name notation", + fun t_eredis_support/0} ]. @@ -122,17 +125,32 @@ t_failed_sentinel() -> ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session)), ?assertMatch({ok, {"localhost", 26381, _}}, eredis_sentinel:get_current_sentinel()). +t_eredis_support() -> + {ok, _Pid} = eredis_sentinel:start_link(?CONFIG), + {ok, Conn} = eredis:start_link("sentinel:session", 0), + ?assertMatch({ok,[<<"port">>, <<"6380">>]}, eredis:q(Conn, ["config", "get", "port"])), + {ok, Conn2} = eredis:start_link("sentinel:cache", 0), + ?assertMatch({ok,[<<"port">>, <<"6382">>]}, eredis:q(Conn2, ["config", "get", "port"])). + + t_failover() -> {ok,_Pid} = eredis_sentinel:start_link(?CONFIG), ?assertMatch({ok, {"127.0.0.1", 6380}}, eredis_sentinel:get_master(session, true)), - % this sleep need to sentinels finds out slaves of master + {ok, Conn} = eredis:start_link("sentinel:session", 0), + ?assertMatch({ok,[<<"port">>, <<"6380">>]}, eredis:q(Conn, ["config", "get", "port"])), + + % this sleep need to sentinels find out slaves of master timer:sleep(1000), % just change master, not kill it because real failover need up to 35 seconds to complete change_master(6380,6381), % waiting sentinels to see new master timer:sleep(2000), ?assertMatch({ok, {"127.0.0.1", 6381}}, eredis_sentinel:get_master(session)), - ?assertMatch([{sentinel, {reconnect, session, "127.0.0.1", 6381}}], get_messages()). + ?assertMatch([{sentinel, {reconnect, session, "127.0.0.1", 6381}}], get_messages()), + ?assert(is_process_alive(Conn)), + wait_redis_connect(Conn, 2000), + ?assertMatch({ok,[<<"port">>, <<"6381">>]}, eredis:q(Conn, ["config", "get", "port"])). + %%% Internal ---------------------------------------------------------- @@ -151,31 +169,44 @@ get_messages(Acc) -> kill_servers(Names, Timeout) -> [kill_server(N) || N <- Names], - wait_or_die(Names, [], Timeout). + wait_server_die(Names, [], Timeout). kill_server(Name) -> Cmd = lists:flatten(io_lib:format("kill $(cat ./redis_~s.pid)", [Name])), os:cmd(Cmd). -wait_or_die(Names, Acc, Timeout) when Timeout =< 0 -> +wait_server_die(Names, Acc, Timeout) when Timeout =< 0 -> throw({"Killing servers timeout", Names++Acc}); -wait_or_die([], [], _) -> +wait_server_die([], [], _) -> ok; -wait_or_die([], Names, Timeout) -> +wait_server_die([], Names, Timeout) -> timer:sleep(100), kill_servers(Names, Timeout - 100); -wait_or_die([Name|Names], Acc, Timeout) -> +wait_server_die([Name|Names], Acc, Timeout) -> case is_server_alive(Name) of true -> - wait_or_die(Names, [Name | Acc ], Timeout); + wait_server_die(Names, [Name | Acc ], Timeout); _ -> - wait_or_die(Names, Acc, Timeout) + wait_server_die(Names, Acc, Timeout) end. is_server_alive(Name) -> Cmd = lists:flatten(io_lib:format("kill -0 $(cat ./redis_~s.pid) && echo -n ok", [Name])), "ok" == os:cmd(Cmd). + +%% Waiting redis client to connect to redis +wait_redis_connect(Conn, Timeout) when Timeout =< 0 -> + {error, "Waiting redis connection timeout"}; +wait_redis_connect(Conn, Timeout) -> + case eredis:q(Conn, ["PING"]) of + {error, no_connection} -> + timer:sleep(100), + wait_redis_connect(Conn, Timeout - 100); + {ok,<<"PONG">>} -> + ok + end. + %% Failover imitation change_master(FromPort, ToPort) -> From a19ebd70f67bb9f875d301bdd4d8e889cfe26e8b Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Thu, 18 Apr 2013 14:06:50 +0400 Subject: [PATCH 4/7] Add check that sentinel is installed on the system or skip sentinel tests --- test/eredis_sentinel_tests.erl | 52 ++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/test/eredis_sentinel_tests.erl b/test/eredis_sentinel_tests.erl index 7470dffe..c6e23649 100644 --- a/test/eredis_sentinel_tests.erl +++ b/test/eredis_sentinel_tests.erl @@ -30,13 +30,12 @@ stop_cluster(_) -> %%% Working with redis cluster start_redis(Name) -> - run_cmd("redis-server ~s/redis_~s.conf", [code:priv_dir(eredis),Name]). + run_cmd("redis-server ~s/redis_~s.conf", [code:priv_dir(eredis),Name]), + ?assert(is_server_alive(Name)). start_sentinel(Name) -> - run_cmd("redis-sentinel ~p/redis_~s.conf ", [code:priv_dir(eredis), Name]). - -server(Name) -> - proplists:lookup(Name, ?SERVERS ++ ?SENTINELS). + run_cmd("redis-sentinel ~p/redis_~s.conf ", [code:priv_dir(eredis), Name]), + ?assert(is_server_alive(Name)). run_cmd(CmdFmt, Args) -> Cmd = lists:flatten(io_lib:format(CmdFmt, Args)), @@ -45,14 +44,21 @@ run_cmd(CmdFmt, Args) -> %%% Test definition sentinel_test_() -> - error_logger:tty(false), - Ts = - [{setup, - fun start_cluster/0, - fun stop_cluster/1, - T - } || T <- tests() ], - {inorder, Ts}. + case check_env() of + ok -> + error_logger:tty(false), + Ts = + [{setup, + fun start_cluster/0, + fun stop_cluster/1, + T + } || T <- tests() ], + {inorder, Ts}; + {error, Error} -> + [fun() -> + ?debugFmt("~n~nWARNING! SENTINEL TESTS ARE SKIPPED!~nError: ~s~n", [Error]) + end] + end. tests() -> [ @@ -196,7 +202,7 @@ is_server_alive(Name) -> %% Waiting redis client to connect to redis -wait_redis_connect(Conn, Timeout) when Timeout =< 0 -> +wait_redis_connect(_Conn, Timeout) when Timeout =< 0 -> {error, "Waiting redis connection timeout"}; wait_redis_connect(Conn, Timeout) -> case eredis:q(Conn, ["PING"]) of @@ -208,10 +214,26 @@ wait_redis_connect(Conn, Timeout) -> end. %% Failover imitation - change_master(FromPort, ToPort) -> {ok, From} = eredis:start_link("localhost", FromPort), {ok, To} = eredis:start_link("localhost", ToPort), eredis:q(To, ["slaveof", "no", "one"]), eredis:q(From, ["slaveof", "localhost", ToPort]), ok. + +%% Check that sentinel tests can be run +check_env() -> + case {check_prog("redis-server"),check_prog("redis-sentinel")} of + {"",""} -> + ok; + {E1, E2} -> + {error, string:join([E || E <- [E1,E2], E =/= []], "\n")} + end. + +check_prog(ProgName) -> + case os:cmd("which " ++ ProgName) of + [] -> + ProgName ++ " not found"; + _ -> + "" + end. From 9afd7c2fa78182e9d5672782116c0bae19d15bb2 Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Thu, 18 Apr 2013 17:57:50 +0400 Subject: [PATCH 5/7] Add sentinel chapter to README.md --- README.md | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 16c16003..00edd40f 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Supported Redis features: * Pipelining * Authentication & multiple dbs * Pubsub + * Sentinel failover ## Example @@ -53,14 +54,14 @@ Pubsub: received {message,<<"foo">>,<<"bar">>,<0.34.0>} Pattern Subscribe: - - 1> eredis_sub:psub_example(). + + 1> eredis_sub:psub_example(). received {subscribed,<<"foo*">>,<0.33.0>} {<0.33.0>,<0.36.0>} 2> eredis_sub:ppub_example(). received {pmessage,<<"foo*">>,<<"foo123">>,<<"bar">>,<0.33.0>} ok - 3> + 3> EUnit tests: @@ -110,6 +111,52 @@ stampede of clients just waiting for a failed connection attempt or Note: If Eredis is starting up and cannot connect, it will fail immediately with `{connection_error, Reason}`. +## Redis sentinel support + +### Overview + +Starting from version 2.4.16 and 2.6.0-rc6 redis shipped with +standart monitoring and automatic failover tool called Sentinel. +It started as separate process that monitors redis instances and automatically +switch to new master if the current one fails. After this all slaves are reconfigured +to get data from new master automatically by sentinel. +More information is here - http://redis.io/topics/sentinel +When working with cluster that uses sentinel, clients should ask sentinel processes +about current master instance. + +### Working with sentinels + +To enable sentinel support for eredis app: +1. Start eredis_sentinel main process under supervisor with list of all sentinels as argument: + + > eredis_sentinel:start_link([{"host1.lan", 20367}, {"host2.lan", 20367}]). + +2. When starting eredis clients use string `sentinel:master_name` instead host: + + > eredis:start_link("sentinel:mymaster", 0). + +Port is ignored in this case, but needed as eredis:start_link/1 is a special form used in poolboy integration. + +Eredis client will ask `eredis_sentinel` about current master for mymaster cluster and +connect to it. `eredis_sentinel` also tracks all clients and in case that master changes +it will send notifications to all interested clients. + +`eredis_sentinel` implements algorithm described in "Guidelines for Redis clients with +support for Redis Sentinel":http://redis.io/topics/sentinel-clients . +If it is unable to discover master for some cluster it return error code describing source of problem: + +1. `sentinel_unreachable` - couldn't connect to any of sentinels +2. `master_unknown` - sentinels do not know about this cluster name +3. `master_unreachable` - there are no valid master for this cluster now + +### Testing sentinel support + +`eredis_sentinel` has testing suite wich uses real redis cluster with sentinel monitoring. +So for running these tests you should have be allowed to run `redis-server` and `redis-sentinel` executables. +Test suite is integrated as part of common eredis eunit test suite. +Before start it checks that `redis-server` and `redis-sentinel` is installed and prints warning if not. +Every test case start with fresh cluster with config files from `priv/redis_*.conf` at the end of case cluster is shutted down. + ## Pubsub Thanks to Dave Peticolas (jdavisp3), eredis supports From 1f36b98ee581f6724768d5f1549b3a5f7caebd6b Mon Sep 17 00:00:00 2001 From: Mikl Kurkov Date: Thu, 18 Apr 2013 18:54:54 +0400 Subject: [PATCH 6/7] Fix remove stalled pids from subscriber list of sentinel master --- src/eredis_sentinel_masters.erl | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/eredis_sentinel_masters.erl b/src/eredis_sentinel_masters.erl index 1b71b94f..f622e21b 100644 --- a/src/eredis_sentinel_masters.erl +++ b/src/eredis_sentinel_masters.erl @@ -86,14 +86,13 @@ set_master(Masters, Master) -> update_master(#master{host=Host, port=Port}=Master, Host, Port) -> Master; update_master(Master, Host, Port) -> - MasterNew = Master#master{host=Host,port=Port}, - notify_pids(MasterNew), - MasterNew. + notify_pids(Master#master{host=Host,port=Port}). -notify_pids(#master{pids=Pids, name=Name, host=Host, port=Port}) -> +-spec notify_pids(#master{}) -> #master{}. +notify_pids(#master{pids=Pids, name=Name, host=Host, port=Port}=Master) -> Message = {sentinel, {reconnect, Name, Host, Port}}, - [ P ! Message || P <- Pids], - ok. + NewPids = [ begin Pid ! Message, Pid end || Pid <- Pids, is_process_alive(Pid) ], + Master#master{pids=NewPids}. add_pid(#master{pids=Pids} = Master, Pid) -> Master#master{pids = lists:umerge(Pids, [Pid])}. @@ -169,13 +168,23 @@ unsubscribe_test() -> update_notify_test() -> Pid = self(), + PidFailed = spawn(fun() -> ok end), + exit(PidFailed, kill), + ?assert(is_process_alive(PidFailed) == false), + {ok, Ms1} = subscribe(two_masters(), master1, Pid), - {ok, Ms2} = update(Ms1, master1, "host1", 1), + {ok, Ms11} = subscribe(Ms1, master1, PidFailed), + {ok, Ms2} = update(Ms11, master1, "host1", 1), ?assertMatch([], get_messages()), + {ok,Master1} = find(Ms2, master1), + ?assertMatch(true, lists:member(PidFailed, Master1#master.pids)), {ok, Ms3} = update(Ms2, master1, "host11", 1), ?assertMatch(ok, get_message({sentinel, {reconnect, master1, "host11", 1}})), ?assertMatch([], get_messages()), + %% non alive pids should be removed from pids + {ok,Master11} = find(Ms3, master1), + ?assertMatch(false, lists:member(PidFailed, Master11#master.pids)), {ok, _} = update(Ms3, master1, "host11", 2), ?assertMatch(ok, get_message({sentinel, {reconnect, master1, "host11", 2}})), From 80c2ffd35c6e90015ece4f32a3d78c2700d663f5 Mon Sep 17 00:00:00 2001 From: mkurkov Date: Thu, 18 Apr 2013 19:46:32 +0400 Subject: [PATCH 7/7] Clean up sentinel part in README.md --- README.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 00edd40f..2bc54b27 100644 --- a/README.md +++ b/README.md @@ -127,17 +127,19 @@ about current master instance. ### Working with sentinels To enable sentinel support for eredis app: -1. Start eredis_sentinel main process under supervisor with list of all sentinels as argument: - > eredis_sentinel:start_link([{"host1.lan", 20367}, {"host2.lan", 20367}]). +Start eredis_sentinel main process under supervisor with list of all sentinels as argument: -2. When starting eredis clients use string `sentinel:master_name` instead host: + eredis_sentinel:start_link([{"host1.lan", 20367}, {"host2.lan", 20367}]). - > eredis:start_link("sentinel:mymaster", 0). + +When starting eredis clients use string `sentinel:master_name` instead host: + + eredis:start_link("sentinel:mymaster", 0). Port is ignored in this case, but needed as eredis:start_link/1 is a special form used in poolboy integration. -Eredis client will ask `eredis_sentinel` about current master for mymaster cluster and +`eredis_client` process will ask `eredis_sentinel` about current master for `mymaster` cluster and connect to it. `eredis_sentinel` also tracks all clients and in case that master changes it will send notifications to all interested clients.