From 62554ae537cee8f4fe2164c4221c72c730b0e27d Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Tue, 12 Aug 2014 21:08:05 +0200 Subject: [PATCH 1/3] exometer_report keeps state in ets --- include/exometer.hrl | 2 + src/exometer_admin.erl | 17 +- src/exometer_report.erl | 614 ++++++++++++++++++----------------- src/exometer_report_snmp.erl | 26 +- test/exometer_snmp_SUITE.erl | 1 + 5 files changed, 350 insertions(+), 310 deletions(-) diff --git a/include/exometer.hrl b/include/exometer.hrl index 4915383..eac8541 100644 --- a/include/exometer.hrl +++ b/include/exometer.hrl @@ -10,6 +10,8 @@ -define(EXOMETER_SHARED, exometer_shared). -define(EXOMETER_ENTRIES, exometer_entries). +-define(EXOMETER_SUBS, exometer_subscriptions). +-define(EXOMETER_REPORTERS, exometer_reporters). -record(exometer_event, { time = exometer_util:timestamp(), diff --git a/src/exometer_admin.erl b/src/exometer_admin.erl index 6975a22..b3f481a 100644 --- a/src/exometer_admin.erl +++ b/src/exometer_admin.erl @@ -333,8 +333,19 @@ terminate(_, _) -> ok. code_change(_, S, _) -> + case ets:info(?EXOMETER_REPORTERS, name) of + undefined -> create_reporter_tabs(); + _ -> ok + end, {ok, S}. +create_reporter_tabs() -> + Heir = {heir, whereis(exometer_sup), []}, + ets:new(?EXOMETER_REPORTERS, [public, named_table, set, + {keypos, 2}, Heir]), + ets:new(?EXOMETER_SUBS, [public, named_table, ordered_set, + {keypos, 2}, Heir]). + create_ets_tabs() -> case ets:info(?EXOMETER_SHARED, name) of @@ -344,7 +355,11 @@ create_ets_tabs() -> ets:new(?EXOMETER_SHARED, [public, named_table, ordered_set, {keypos, 2}]), ets:new(?EXOMETER_ENTRIES, [public, named_table, ordered_set, - {keypos, 2}]); + {keypos, 2}]), + ets:new(?EXOMETER_REPORTERS, [public, named_table, set, + {keypos, 2}]), + ets:new(?EXOMETER_SUBS, [public, named_table, ordered_set, + {keypos, 2}]); _ -> true end. diff --git a/src/exometer_report.erl b/src/exometer_report.erl index 34cdf01..1049f65 100644 --- a/src/exometer_report.erl +++ b/src/exometer_report.erl @@ -435,10 +435,12 @@ disable_me(Mod, St) -> %% instance. Note that the reporter type must recognize the request. %% @end call_reporter(Reporter, Msg) -> - case lists:keyfind(Reporter, 1, list_reporters()) of - {_, Pid} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{pid = Pid}] when is_pid(Pid) -> exometer_proc:call(Pid, Msg); - false -> + [#reporter{status = disabled}] -> + {error, disabled}; + [] -> {error, {no_such_reporter, Reporter}} end. @@ -449,10 +451,12 @@ call_reporter(Reporter, Msg) -> %% instance. Note that the reporter type must recognize the message. %% @end cast_reporter(Reporter, Msg) -> - case lists:keyfind(Reporter, 1, list_reporters()) of - {_, Pid} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{pid = Pid}] when is_pid(Pid) -> exometer_proc:cast(Pid, Msg); - false -> + [#reporter{status = disabled}] -> + {error, disabled}; + [] -> {error, {no_such_reporter, Reporter}} end. @@ -485,7 +489,7 @@ setopts(Metric, Options, Status) -> %% `Mod:exometer_newentry(Entry, St)'. %% @end new_entry(Entry) -> - call({new_entry, Entry}). + cast({new_entry, Entry}). %%%=================================================================== %%% gen_server callbacks @@ -517,41 +521,35 @@ do_start_reporters(S) -> %% { reporters, [ {reporter1, [{opt1, val}, ...]}, {reporter2, [...]}]} %% Traverse list of reporter and launch reporter gen servers as dynamic %% supervisor children. - Reporters0 = case lists:keyfind(reporters, 1, Opts) of - {reporters, ReporterList} -> - ReporterRecs = make_reporter_recs(ReporterList), - assert_no_duplicates(ReporterRecs), - lists:foldr( - fun(#reporter{name = Reporter, - status = Status, - opts = ROpts} = R, Acc) -> - Restart = get_restart(ROpts), - {Pid, MRef} = - if Status =:= enabled -> - spawn_reporter(Reporter, ROpts); - true -> {undefined, undefined} - end, - [ R#reporter{pid = Pid, - mref = MRef, - restart = Restart} | Acc] - end, [], ReporterRecs); - false -> - [] - end, + case lists:keyfind(reporters, 1, Opts) of + {reporters, ReporterList} -> + ReporterRecs = make_reporter_recs(ReporterList), + assert_no_duplicates(ReporterRecs), + lists:foreach( + fun(#reporter{name = Reporter, + status = Status, + opts = ROpts} = R) -> + Restart = get_restart(ROpts), + {Pid, MRef} = + if Status =:= enabled -> + spawn_reporter(Reporter, ROpts); + true -> {undefined, undefined} + end, + ets:insert(?EXOMETER_REPORTERS, + R#reporter{pid = Pid, + mref = MRef, + restart = Restart}) + end, ReporterRecs); + false -> + [] + end, %% Dig out configured 'static' subscribers - SubsList = - case lists:keyfind(subscribers, 1, Opts) of - {subscribers, Subscribers} -> - lists:foldr(fun(Subscr, Acc) -> - init_subscriber(Subscr, Acc, Reporters0) - end, [], Subscribers); - false -> [] - end, - - S#st{ - reporters = Reporters0, - subscribers = SubsList - }. + case lists:keyfind(subscribers, 1, Opts) of + {subscribers, Subscribers} -> + lists:foreach(fun init_subscriber/1, Subscribers); + false -> [] + end, + S#st{}. make_reporter_recs([{R, Opts}|T]) -> [#reporter{name = R, @@ -613,11 +611,11 @@ handle_call({subscribe, datapoint = DataPoint, retry_failed_metrics = RetryFailedMetrics, extra = Extra} , Interval }, - _From, #st{reporters = Rs, subscribers = Subs} = St) -> + _From, #st{} = St) -> %% Verify that the given metric/data point actually exist. - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{status = Status} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{status = Status}] -> case is_valid_metric(Metric, DataPoint) of true -> if Status =:= enabled -> @@ -625,14 +623,14 @@ handle_call({subscribe, DataPoint, Interval, Extra}; true -> ignore end, - Sub = subscribe_(Reporter, Metric, DataPoint, - Interval, RetryFailedMetrics, - Extra, Status), - {reply, ok, St#st{ subscribers = [Sub | Subs] }}; + subscribe_(Reporter, Metric, DataPoint, + Interval, RetryFailedMetrics, + Extra, Status), + {reply, ok, St}; %% Nope - Not found. false -> {reply, not_found, St } end; - false -> + [] -> {reply, unknown_reporter, St} end; @@ -640,35 +638,26 @@ handle_call({unsubscribe, #key{reporter = Reporter, metric = Metric, datapoint = DataPoint, - extra = Extra}}, _, - #st{subscribers = Subs} = St) -> - - {Res, NSubs} = unsubscribe_(Reporter, Metric, DataPoint, Extra, Subs), - {reply, Res, St#st{ subscribers = NSubs } }; + extra = Extra}}, _, #st{} = St) -> + Res = unsubscribe_(Reporter, Metric, DataPoint, Extra), + {reply, Res, St}; handle_call({unsubscribe_all, Reporter, Metric}, _, - #st{subscribers=Subs0}=St) -> - Subs1 = lists:foldl( - fun - (#subscriber{key=#key{metric=Metric1}=Key, t_ref=TRef}, Acc) - when Metric == Metric1 -> - #key{datapoint=Dp, extra=Extra} = Key, - try Reporter ! {exometer_unsubscribe, Metric, Dp, Extra} - catch error:_ -> ok end, - cancel_timer(TRef), - Acc; - (Sub, Acc) -> - [Sub | Acc] - end, [], Subs0), - {reply, ok, St#st{subscribers=Subs1}}; + #st{}=St) -> + Subs = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + metric = Metric}, + _ = '_'}, [], ['$_']}]), + lists:foreach(fun unsubscribe_/1, Subs), + {reply, ok, St}; handle_call({list_metrics, Path}, _, St) -> DP = lists:foldr(fun(Metric, Acc) -> - retrieve_metric(Metric, St#st.subscribers, Acc) + retrieve_metric(Metric, Acc) end, [], exometer:find_entries(Path)), {reply, {ok, DP}, St}; -handle_call({list_subscriptions, Reporter}, _, #st{subscribers = Subs0} = St) -> +handle_call({list_subscriptions, Reporter}, _, #st{} = St) -> Subs1 = lists:foldl( fun (#subscriber{key=#key{reporter=Rep}}=Sub, Acc) when Reporter == Rep -> @@ -681,55 +670,53 @@ handle_call({list_subscriptions, Reporter}, _, #st{subscribers = Subs0} = St) -> [{Metric, Dp, Interval, Extra} | Acc]; (_, Acc) -> Acc - end, [], Subs0), + end, [], ets:select(?EXOMETER_SUBS, [{'_',[],['$_']}])), {reply, Subs1, St}; -handle_call(list_reporters, _, #st{reporters = Reporters} = St) -> - Info = [{N, Pid} || #reporter{name = N, pid = Pid} <- Reporters], +handle_call(list_reporters, _, #st{} = St) -> + Info = ets:select(?EXOMETER_REPORTERS, + [{#reporter{name = '$1', pid = '$2', _ = '_'}, + [], [{{'$1', '$2'}}]}]), {reply, Info, St}; -handle_call({add_reporter, Reporter, Opts}, _, #st{reporters = Rs} = St) -> - case lists:keymember(Reporter, #reporter.name, Rs) of +handle_call({add_reporter, Reporter, Opts}, _, #st{} = St) -> + case ets:member(?EXOMETER_REPORTERS, Reporter) of true -> {reply, {error, already_running}, St}; false -> try {Pid, MRef} = spawn_reporter(Reporter, Opts), - Rs1 = [#reporter {name = Reporter, - module = get_module(Reporter, Opts), - opts = Opts, - pid = Pid, - mref = MRef} | Rs], - {reply, ok, St#st{reporters = Rs1}} + R = #reporter {name = Reporter, + module = get_module(Reporter, Opts), + opts = Opts, + pid = Pid, + mref = MRef}, + ets:insert(?EXOMETER_REPORTERS, R), + {reply, ok, St} catch error:Reason -> {reply, {error, Reason}, St} end end; -handle_call({remove_reporter, Reporter}, _, St0) -> - case do_remove_reporter(Reporter, St0) of - {ok, St1} -> - {reply, ok, St1}; +handle_call({remove_reporter, Reporter}, _, St) -> + case do_remove_reporter(Reporter) of + ok -> + {reply, ok, St}; E -> - {reply, E, St0} + {reply, E, St} end; -handle_call({change_reporter_status, Reporter, Status}, _, St0) -> - case change_reporter_status(Status, Reporter, St0) of - {ok, St1} -> - {reply, ok, St1}; +handle_call({change_reporter_status, Reporter, Status}, _, St) -> + case change_reporter_status(Reporter, Status) of + ok -> + {reply, ok, St}; E -> - {reply, E, St0} + {reply, E, St} end; -handle_call({setopts, Metric, Options, Status}, _, #st{reporters=Rs}=St) -> +handle_call({setopts, Metric, Options, Status}, _, #st{}=St) -> [erlang:send(Pid, {exometer_setopts, Metric, Options, Status}) - || #reporter{pid = Pid} <- Rs], - {reply, ok, St}; - -handle_call({new_entry, Entry}, _, #st{reporters=Rs}=St) -> - [erlang:send(Pid, {exometer_newentry, Entry}) - || #reporter{pid = Pid} <- Rs], + || Pid <- reporter_pids()], {reply, ok, St}; handle_call(_Request, _From, State) -> @@ -744,27 +731,28 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({remove_reporter, Reporter, Reason}, St0) -> +handle_cast({new_entry, Entry}, #st{} = St) -> + [try erlang:send(Pid, {exometer_newentry, Entry}) catch error:_ -> ok end + || Pid <- reporter_pids()], + maybe_enable_subscriptions(Entry), + {noreply, St}; + +handle_cast({remove_reporter, Reporter, Reason}, St) -> Terminate = case Reason of user -> true; _ -> false end, - case do_remove_reporter(Reporter, St0, Terminate) of - {ok, St1} -> - {noreply, St1}; - _ -> - {noreply, St0} - end; -handle_cast({disable, Pid}, #st{reporters = Rs} = St) -> - case lists:keyfind(Pid, #reporter.pid, Rs) of - #reporter{name = Reporter} -> - {ok, St1} = change_reporter_status(disabled, Reporter, St), - {noreply, St1}; - false -> - {noreply, St} - end; + do_remove_reporter(Reporter, Terminate), + {noreply, St}; +handle_cast({disable, Pid}, #st{} = St) -> + case reporter_by_pid(Pid) of + [#reporter{} = Reporter] -> + do_change_reporter_status(Reporter, disabled); + [] -> ok + end, + {noreply, St}; handle_cast(_Msg, State) -> {noreply, State}. @@ -781,10 +769,9 @@ handle_info({ report, #key{ reporter = Reporter, metric = Metric, datapoint = DataPoint, retry_failed_metrics = RetryFailedMetrics, - extra = Extra} = Key, Interval}, - #st{subscribers = Subs} = St) -> - case lists:keyfind(Key, #subscriber.key, Subs) of - #subscriber{} = Sub -> + extra = Extra} = Key, Interval}, #st{} = St) -> + case ets:member(?EXOMETER_SUBS, Key) of + true -> case {RetryFailedMetrics, get_values(Metric, DataPoint)} of %% We found a value, or values. {_, [_|_] = Found} -> @@ -798,10 +785,9 @@ handle_info({ report, #key{ reporter = Reporter, %% Replace the pid_subscriber info with a record having %% the new timer ref. - {noreply, St#st{subscribers = - lists:keyreplace( - Key, #subscriber.key, Subs, - Sub#subscriber{ t_ref = TRef })}}; + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, TRef}]), + {noreply, St}; %% We did not find a value, but we should try again. {true, _ } -> @@ -817,10 +803,9 @@ handle_info({ report, #key{ reporter = Reporter, %% Replace the pid_subscriber info with a record having %% the new timer ref. - {noreply, St#st{subscribers = - lists:keyreplace( - Key, #subscriber.key, Subs, - Sub#subscriber{ t_ref = TRef })}}; + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, TRef}]), + {noreply, St}; %% We did not find a value, and we should not retry. _ -> %% Entry removed while timer in progress. @@ -834,61 +819,80 @@ handle_info({ report, #key{ reporter = Reporter, {noreply, St} end; -handle_info({'DOWN', Ref, process, _Pid, Reason}, - #st{reporters = Rs} = S) -> - S1 = case lists:keyfind(Ref, #reporter.mref, Rs) of - #reporter {module = Module, restart = Restart} = R -> - case add_restart(Restart) of - {remove, How} -> - case How of - {M, F} when is_atom(M), is_atom(F) -> - try M:F(Module, Reason) catch _:_ -> ok end; - _ -> - ok - end, - S; - {restart, Restart1} -> - restart_reporter(R#reporter{restart = Restart1}, S) - end; - _ -> S - end, - {noreply, S1}; +handle_info({'DOWN', Ref, process, _Pid, Reason}, #st{} = S) -> + case reporter_by_mref(Ref) of + [#reporter{module = Module, restart = Restart} = R] -> + case add_restart(Restart) of + {remove, How} -> + case How of + {M, F} when is_atom(M), is_atom(F) -> + try M:F(Module, Reason) catch _:_ -> ok end; + _ -> + ok + end, + S; + {restart, Restart1} -> + restart_reporter(R#reporter{restart = Restart1}) + end; + _ -> S + end, + {noreply, S}; handle_info(_Info, State) -> ?warning("exometer_report:info(??): ~p~n", [ _Info ]), {noreply, State}. -restart_reporter(#reporter{name = Name, opts = Opts} = R, - #st{subscribers = Subs, reporters = Reporters} = S) -> +restart_reporter(#reporter{name = Name, opts = Opts, restart = Restart}) -> {Pid, MRef} = spawn_reporter(Name, Opts), - Subs1 = re_subscribe(Subs, Name), - R1 = R#reporter{pid = Pid, mref = MRef, status = enabled}, - S#st{subscribers = Subs1, - reporters = lists:keyreplace(Name, #reporter.name, Reporters, R1)}. - -re_subscribe([#subscriber{key = #key{reporter = RName, - metric = Metric, - datapoint = DataPoint, - extra = Extra} = Key, - t_ref = OldTRef, - interval = Interval} = S | Subs], RName) -> - RName ! {exometer_subscribe, Metric, DataPoint, Interval, Extra}, + [resubscribe(S) || + S <- ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Name, + _ = '_'}, + _ = '_'}, [], ['$_']}])], + ets:update_element(?EXOMETER_REPORTERS, Name, + [{#reporter.pid, Pid}, + {#reporter.mref, MRef}, + {#reporter.restart, Restart}, + {#reporter.status, enabled}]), + ok. + +%% If there are already subscriptions, enable them. +maybe_enable_subscriptions(Entry) -> + lists:foreach( + fun(#subscriber{key = #key{reporter = RName}} = S) -> + case get_reporter_status(RName) of + enabled -> + resubscribe(S); + _ -> + ok + end + end, ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{metric = Entry, + _ = '_'}, + _ = '_'}, [], ['$_']}])). + +resubscribe(#subscriber{key = #key{reporter = RName, + metric = Metric, + datapoint = DataPoint, + extra = Extra} = Key, + t_ref = OldTRef, + interval = Interval}) -> + try_send(RName, {exometer_subscribe, Metric, DataPoint, Interval, Extra}), cancel_timer(OldTRef), TRef = erlang:send_after(Interval, self(), {report, Key, Interval}), - [S#subscriber{t_ref = TRef} | re_subscribe(Subs, RName)]; -re_subscribe([S|Subs], R) -> - [S|re_subscribe(Subs, R)]; -re_subscribe([], _) -> - []. + ets:update_element(?EXOMETER_SUBS, Key, [{#subscriber.t_ref, TRef}]). + -cancel_subscr_timers(Reporter, Subs) -> - lists:map( - fun(#subscriber{key = #key{reporter = R}, - t_ref = TRef} = S) when R =:= Reporter -> +cancel_subscr_timers(Reporter) -> + lists:foreach( + fun(#subscriber{key = Key, t_ref = TRef}) -> cancel_timer(TRef), - S#subscriber{t_ref = undefined}; - (S) -> S - end, Subs). + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, undefined}]) + end, ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + _ = '_'}, + _ = '_'}, [], ['$_']}])). cancel_timer(undefined) -> false; @@ -907,8 +911,8 @@ cancel_timer(TRef) -> %% @spec terminate(Reason, State) -> void() %% @end %%-------------------------------------------------------------------- -terminate(_Reason, #st{reporters=Rs}) -> - rpc:pmap({?MODULE, terminate_reporter}, [], Rs), +terminate(_Reason, _) -> + [terminate_reporter(R) || R <- ets:tab2list(?EXOMETER_REPORTERS)], ok. %%-------------------------------------------------------------------- @@ -928,7 +932,7 @@ terminate(_Reason, #st{reporters=Rs}) -> %% opts = [] :: [{atom(), any()}], %% restart = #restart{} %% }). -code_change(_OldVan, #st{reporters = Rs} = S, _Extra) -> +code_change(_OldVan, #st{reporters = Rs, subscribers = Ss} = S, _Extra) -> Rs1 = lists:map( fun({reporter,Pid,MRef,Module,Opts,Restart}) -> #reporter{name = Module, pid = Pid, mref = MRef, @@ -940,7 +944,9 @@ code_change(_OldVan, #st{reporters = Rs} = S, _Extra) -> restart = Restart}; (#reporter{} = R) -> R end, Rs), - {ok, S#st{reporters = Rs1}}; + [ets:insert(?EXOMETER_REPORTERS, R) || R <- Rs1], + [ets:insert(?EXOMETER_SUBS, Sub) || Sub <- Ss], + {ok, S#st{reporters = [], subscribers = []}}; code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -948,6 +954,25 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +reporter_pids() -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{pid = '$1', _ = '_'}, [], ['$1']}]). + +reporter_by_pid(Pid) -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{pid = Pid, _='_'}, [], ['$_']}]). + +reporter_by_mref(Ref) -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{mref = Ref, _='_'}, [], ['$_']}]). + +try_send(To, Msg) -> + try To ! Msg + catch + error:_ -> + Msg + end. + is_valid_metric({find, Name}, _DataPoint) when is_list(Name) -> true; is_valid_metric({select, Name}, _DataPoint) when is_list(Name) -> @@ -1054,36 +1079,40 @@ subscribe_(Reporter, Metric, DataPoint, Interval, RetryFailedMetrics, extra = Extra, retry_failed_metrics = RetryFailedMetrics }, - - %% FIXME: Validate Metric and datapoint - %% ?info("Subscribe_(Intv(~p), self(~p))~n", [ Interval, self()]), - #subscriber{key = Key, - interval = Interval, - t_ref = maybe_send_after(Status, Key, Interval)}. + ets:insert(?EXOMETER_SUBS, + #subscriber{key = Key, + interval = Interval, + t_ref = maybe_send_after(Status, Key, Interval)}). maybe_send_after(enabled, Key, Interval) -> erlang:send_after(Interval, self(), {report, Key, Interval}); maybe_send_after(_, _, _) -> undefined. -unsubscribe_(Reporter, Metric, DataPoint, Extra, Subs) -> - ?info("unsubscribe_(~p, ~p, ~p, ~p, ~p)~n", - [ Reporter, Metric, DataPoint, Extra, Subs]), - case lists:keytake(#key{reporter = Reporter, - metric = Metric, - datapoint = DataPoint, - extra = Extra}, - #subscriber.key, Subs) of - {value, #subscriber{t_ref = TRef}, Rem} -> - %% FIXME: Validate Metric and datapoint - try Reporter ! { exometer_unsubscribe, Metric, DataPoint, Extra } - catch error:_ -> ok end, - cancel_timer(TRef), - {ok, Rem}; - _ -> - {not_found, Subs} +unsubscribe_(Reporter, Metric, DataPoint, Extra) -> + ?info("unsubscribe_(~p, ~p, ~p, ~p)~n", + [ Reporter, Metric, DataPoint, Extra]), + case ets:lookup(?EXOMETER_SUBS, #key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + extra = Extra}) of + [#subscriber{} = Sub] -> + unsubscribe_(Sub); + [] -> + not_found end. +unsubscribe_(#subscriber{key = #key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + extra = Extra} = Key, t_ref = TRef}) -> + try_send( + Reporter, {exometer_unsubscribe, Metric, DataPoint, Extra}), + cancel_timer(TRef), + ets:delete(?EXOMETER_SUBS, Key), + ok. + + report_value(Reporter, Metric, DataPoint, Extra, Val) -> try Reporter ! {exometer_report, Metric, DataPoint, Extra, Val}, true @@ -1092,17 +1121,21 @@ report_value(Reporter, Metric, DataPoint, Extra, Val) -> exit:_ -> false end. -retrieve_metric({Metric, Type, Enabled}, Subscribers, Acc) -> +retrieve_metric({Metric, Type, Enabled}, Acc) -> + Cands = ets:select( + ?EXOMETER_SUBS, + [{#subscriber{key = #key{metric = Metric, _='_'}, + _ = '_'}, [], ['$_']}]), [ { Metric, exometer:info(Metric, datapoints), - get_subscribers(Metric, Type, Enabled, Subscribers), Enabled } | Acc ]. + get_subscribers(Metric, Type, Enabled, Cands), Enabled } | Acc ]. -find_entries_in_list(find, Path, List) -> - Pat = Path ++ '_', - Spec = ets:match_spec_compile([{ {Pat, '_', '_'}, [], ['$_'] }]), - ets:match_spec_run(List, Spec); -find_entries_in_list(select, Pat, List) -> - Spec = ets:match_spec_compile(Pat), - ets:match_spec_run(List, Spec). +%% find_entries_in_list(find, Path, List) -> +%% Pat = Path ++ '_', +%% Spec = ets:match_spec_compile([{ {Pat, '_', '_'}, [], ['$_'] }]), +%% ets:match_spec_run(List, Spec); +%% find_entries_in_list(select, Pat, List) -> +%% Spec = ets:match_spec_compile(Pat), +%% ets:match_spec_run(List, Spec). get_subscribers(_Metric, _Type, _Status, []) -> []; @@ -1118,20 +1151,20 @@ get_subscribers(Metric, Type, Status, ?debug("get_subscribers(~p, ~p, ~p): match~n", [ Metric, SDataPoint, SReporter]), [ { SReporter, SDataPoint } | get_subscribers(Metric, Type, Status, T) ]; -get_subscribers(Metric, Type, Status, - [ #subscriber { - key = #key { - metric = {How, Path}, - reporter = SReporter, - datapoint = SDataPoint - }} | T ]) -> - case find_entries_in_list(How, Path, [{Metric, Type, Status}]) of - [] -> - get_subscribers(Metric, Type, Status, T); - [_] -> - [ { SReporter, SDataPoint } - | get_subscribers(Metric, Type, Status, T) ] - end; +%% get_subscribers(Metric, Type, Status, +%% [ #subscriber { +%% key = #key { +%% metric = {How, Path}, +%% reporter = SReporter, +%% datapoint = SDataPoint +%% }} | T ]) -> +%% case find_entries_in_list(How, Path, [{Metric, Type, Status}]) of +%% [] -> +%% get_subscribers(Metric, Type, Status, T); +%% [_] -> +%% [ { SReporter, SDataPoint } +%% | get_subscribers(Metric, Type, Status, T) ] +%% end; %% This subscription does not match Metric. get_subscribers(Metric, Type, Status, @@ -1147,19 +1180,19 @@ get_subscribers(Metric, Type, Status, %% Purge all subscriptions associated with a specific reporter %% (that just went down). -purge_subscriptions(R, Subs) -> +purge_subscriptions(R) -> %% Go through all #subscriber elements in Subs and %% cancel the timer of those who match the provided reporter %% %% Return new #subscriber list with all original subscribers %% that do not reference reporter R. - lists:foldr(fun(#subscriber { key = #key {reporter = Rptr}, - t_ref = TRef}, Acc) when Rptr =:= R-> - cancel_timer(TRef), - Acc; - (Subscriber, Acc) -> - [ Subscriber | Acc ] - end, [], Subs). + Subs = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = R, _='_'}, + _ = '_'}, [], ['$_']}]), + lists:foreach(fun(#subscriber {key = Key, t_ref = TRef}) -> + cancel_timer(TRef), + ets:delete(?EXOMETER_SUBS, Key) + end, Subs). %% Called by the spawn_monitor() call in init %% Loop and run reporters. @@ -1242,57 +1275,49 @@ call(Req) -> cast(Req) -> gen_server:cast(?MODULE, Req). +init_subscriber({Reporter, Metric, DataPoint, Interval, RetryFailedMetrics}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + RetryFailedMetrics, undefined, Status); init_subscriber({Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, undefined, Status) | Acc]; - -init_subscriber({Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, Extra}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, Extra, Status) | Acc]; - -init_subscriber({Reporter, Metric, DataPoint, Interval}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - true, undefined, Status) | Acc]; - -init_subscriber({apply, {M, F, A}}, Acc, Rs) -> - lists:foldr(fun(Sub, Acc1) -> - init_subscriber(Sub, Acc1, Rs) - end, Acc, apply(M, F, A)); - -init_subscriber({select, Expr}, Acc, Rs) when tuple_size(Expr)==3; - tuple_size(Expr)==4; - tuple_size(Expr)==5 -> + RetryFailedMetrics, Extra}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + RetryFailedMetrics, Extra, Status); +init_subscriber({Reporter, Metric, DataPoint, Interval}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + true, undefined, Status); +init_subscriber({apply, {M, F, A}}) -> + lists:foreach(fun(Sub) -> + init_subscriber(Sub) + end, apply(M, F, A)); +init_subscriber({select, Expr}) when tuple_size(Expr)==3; + tuple_size(Expr)==4; + tuple_size(Expr)==5 -> {Pattern, Reporter, DataPoint, Interval, Retry, Extra} = case Expr of {P, R, D, I} -> {P, R, D, I, true, undefined}; {P, R, D, I, Rf} -> {P, R, D, I, Rf, undefined}; {P, R, D, I, Rf, X} -> {P, R, D, I, Rf, X} end, - Status = get_reporter_status(Reporter, Rs), + Status = get_reporter_status(Reporter), Entries = exometer:select(Pattern), - lists:foldr( - fun({Entry, _, _}, Acc1) -> - [subscribe_(Reporter, Entry, DataPoint, Interval, - Retry, Extra, Status) - | Acc1] - end, Acc, Entries); + lists:foreach( + fun({Entry, _, _}) -> + subscribe_(Reporter, Entry, DataPoint, Interval, + Retry, Extra, Status) + end, Entries); -init_subscriber(Other, Acc, _) -> +init_subscriber(Other) -> ?warning("Incorrect static subscriber spec ~p. " - "Use { Reporter, Metric, DataPoint, Interval [, Extra ]}~n", [ Other ]), - Acc. + "Use { Reporter, Metric, DataPoint, Interval [, Extra ]}~n", + [ Other ]). -get_reporter_status(R, Rs) -> - case lists:keyfind(R, #reporter.name, Rs) of - #reporter{status = St} -> - St; - false -> - disabled +get_reporter_status(R) -> + try ets:lookup_element(?EXOMETER_REPORTERS, R, #reporter.status) + catch + error:_ -> disabled end. add_restart(#restart{spec = Spec, @@ -1367,43 +1392,40 @@ valid_restart(L) when is_list(L) -> end, L), L. -do_remove_reporter(Reporter, St0) -> - do_remove_reporter(Reporter, St0, true). +do_remove_reporter(Reporter) -> + do_remove_reporter(Reporter, true). -do_remove_reporter(Reporter, #st{subscribers=Subs, reporters=Rs}=St0, Terminate) -> - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{} = R -> +do_remove_reporter(Reporter, Terminate) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{} = R] -> case Terminate of true -> terminate_reporter(R); false -> ok end, - St1 = St0#st{reporters = lists:keydelete( - Reporter, #reporter.name, Rs), - subscribers = purge_subscriptions( - Reporter, Subs)}, - {ok, St1}; - false -> + ets:delete(?EXOMETER_REPORTERS, Reporter), + purge_subscriptions(Reporter), + ok; + [] -> {error, not_found} end. -change_reporter_status(New, Reporter, #st{subscribers = Subs, - reporters = Rs} = St0) -> - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{status = disabled} = R when New==enabled -> - St1 = restart_reporter(R, St0), - {ok, St1}; - #reporter{status = enabled} = R when New==disabled -> - Subs1 = cancel_subscr_timers(Reporter, Subs), - terminate_reporter(R), - St1 = St0#st{reporters = lists:keyreplace( - Reporter, #reporter.name, Rs, - R#reporter{status = disabled}), - subscribers = Subs1}, - {ok, St1}; - #reporter{status = New} -> - {ok, St0}; - false -> - {error, not_found} +change_reporter_status(Reporter, New) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [R] -> do_change_reporter_status(R, New); + [] -> {error, not_found} end. + +do_change_reporter_status(#reporter{name = Reporter, + status = Old} = R, New) -> + case {Old, New} of + {disabled, enabled} -> + restart_reporter(R); + {enabled, disabled} -> + cancel_subscr_timers(Reporter), + terminate_reporter(R), + ets:update_element(?EXOMETER_REPORTERS, + Reporter, [{#reporter.status, disabled}]) + end, + ok. diff --git a/src/exometer_report_snmp.erl b/src/exometer_report_snmp.erl index ad29fd0..80816c3 100644 --- a/src/exometer_report_snmp.erl +++ b/src/exometer_report_snmp.erl @@ -51,7 +51,7 @@ -define(OBJECT_GROUP_NAME, <<"allObjects">>). -define(INFORM_GROUP_NAME, <<"allNotifications">>). --type snmp_option() :: {exometer_entry:datapoint(), exometer_report:interval()} | +-type snmp_option() :: {exometer_entry:datapoint(), exometer_report:interval()} | {exometer_entry:datapoint(), exometer_report:interval(), exometer_report:extra()}. -type snmp() :: disabled | [snmp_option()]. @@ -82,7 +82,7 @@ exometer_init(Opts) -> ets:insert(?MIB_NR_MAP, {?MIB_NR_NEXT, 0}), ets:insert(?MIB_NR_MAP, {?MIB_NR_FREE, []}), - % load MIB template which is used through the operation of + % load MIB template which is used through the operation of % the process to dynamically export metrics MibPath0 = proplists:get_value(mib_template, Opts, ?MIB_TEMPLATE), MibWorkPath = proplists:get_value(mib_dir, Opts, ?MIB_DIR), @@ -101,9 +101,9 @@ exometer_init(Opts) -> {ok, Vsn} = load_mib(0, MibPath1, true), State0 = #st{mib_version=Vsn, - mib_file_path=MibPath1, - mib_file=FileBin, - mib_domain=Id, + mib_file_path=MibPath1, + mib_file=FileBin, + mib_domain=Id, mib_funcs_file_path=FuncsPath}, % ensure the mib is synced with exometer in case of reporter restarts State = sync_mib(State0), @@ -178,14 +178,14 @@ exometer_terminate(_, #st{mib_file_path=MibPath0}) -> % @doc Returns the latest mib and its metadata. get_mib() -> - try + try exometer_proc:call(?MODULE, get_mib) catch error:badarg -> {error, not_running} end. -% @doc +% @doc % Callback function used by the SNMP master agent upon operations performed by a manager. % Currently only get operations are handled. % @end @@ -207,7 +207,7 @@ snmp_operation(Op, Val, Key) -> %%%=================================================================== enable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -223,7 +223,7 @@ enable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, end. disable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -239,7 +239,7 @@ disable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, end. enable_inform(E, Dp, Extra, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -261,7 +261,7 @@ enable_inform(E, Dp, Extra, #st{mib_version=Vsn0, end. disable_inform(E, Dp, Extra, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -336,7 +336,7 @@ modify_mib(enable_metric, #exometer_entry{name = Metric} = E, case create_bin(Name, Dp, E) of {ok, Bin} -> L = [ - A, B, + A, B, <<"-- METRIC ", Name/binary, " START\n">>, Bin, <<" ::= { ", Domain/binary, " ">>, Nr1, <<" }\n">>, @@ -640,7 +640,7 @@ update_subscriptions_(M, {[Opt | A], R, Ch, Co}) -> option({Dp, Int}) -> {Dp, Int, undefined}; option({_, _, _}=Opt) -> Opt. --spec compare_subscriptions([snmp_option()], [snmp_option()]) -> +-spec compare_subscriptions([snmp_option()], [snmp_option()]) -> {[snmp_option()], [snmp_option()], [{snmp_option(), snmp_option()}], [snmp_option()]}. compare_subscriptions(Old, New) -> {A, Ch, Co} = lists:foldl( diff --git a/test/exometer_snmp_SUITE.erl b/test/exometer_snmp_SUITE.erl index eea079a..c9077b4 100644 --- a/test/exometer_snmp_SUITE.erl +++ b/test/exometer_snmp_SUITE.erl @@ -455,6 +455,7 @@ deps_code_flags() -> string:join(Deps1, " "). start_manager(Config) -> + io:fwrite(user, "STARTMGR: ~p~n", [Config]), Host = gethostname(), Node = test_manager, Opts = [{boot_timeout, 30}, {monitor_master, true}, From 2308847146780a4823701c5c6f938fb2393ac455 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Mon, 18 Aug 2014 16:53:02 +0200 Subject: [PATCH 2/3] named report intervals --- README.md | 2 +- doc/README.md | 2 +- doc/exometer_report.md | 120 ++++++++++++++- src/exometer_function.erl | 14 ++ src/exometer_report.erl | 317 +++++++++++++++++++++++++++++++++----- 5 files changed, 410 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index d4dfdb2..92e7f50 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. -__Version:__ Aug 5 2014 16:25:27 +__Version:__ Aug 18 2014 16:34:44 __Authors:__ Ulf Wiger ([`ulf.wiger@feuerlabs.com`](mailto:ulf.wiger@feuerlabs.com)), Magnus Feuer ([`magnus.feuer@feuerlabs.com`](mailto:magnus.feuer@feuerlabs.com)). diff --git a/doc/README.md b/doc/README.md index 3d1696a..ecc76c2 100644 --- a/doc/README.md +++ b/doc/README.md @@ -4,7 +4,7 @@ Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. -__Version:__ Aug 5 2014 16:25:27 +__Version:__ Aug 18 2014 16:34:44 __Authors:__ Ulf Wiger ([`ulf.wiger@feuerlabs.com`](mailto:ulf.wiger@feuerlabs.com)), Magnus Feuer ([`magnus.feuer@feuerlabs.com`](mailto:magnus.feuer@feuerlabs.com)). diff --git a/doc/exometer_report.md b/doc/exometer_report.md index a57ea18..5ca65c1 100644 --- a/doc/exometer_report.md +++ b/doc/exometer_report.md @@ -121,12 +121,14 @@ as a list of atoms. -+ `DataPoint`
Specifies the data point within the subscribed-to metric as an atom, or a list of atoms. ++ `DataPoint`
Specifies the data point within the subscribed-to metric +as an atom, or a list of atoms. -+ `Interval`
Specifies the interval, in milliseconds, that the subscribed-to -value will be reported at. ++ `Interval`
Specifies the interval, in milliseconds, that the +subscribed-to value will be reported at, or an atom, referring to a named +interval configured in the reporter. @@ -240,6 +242,30 @@ datapoint() = atom() +### delay() ### + + + +

+delay() = time_ms()
+
+ + + + + +### error() ### + + + +

+error() = {error, any()}
+
+ + + + + ### extra() ### @@ -257,7 +283,7 @@ extra() = any()

-interval() = pos_integer()
+interval() = pos_integer() | atom()
 
@@ -299,12 +325,24 @@ reporter_name() = atom() Restart specification + + + +### time_ms() ### + + + +

+time_ms() = pos_integer()
+
+ + ## Function Index ## -
add_reporter/2Add a reporter.
call_reporter/2Send a custom (synchronous) call to Reporter.
cast_reporter/2Send a custom (asynchronous) cast to Reporter.
disable_me/2Used by a reporter to disable itself.
disable_reporter/1Disable Reporter.
enable_reporter/1Enable Reporter.
list_metrics/0Equivalent to list_metrics([]).
list_metrics/1List all metrics matching Path, together with subscription status.
list_reporters/0List the name and pid of each known reporter.
list_subscriptions/1List all subscriptions for Reporter.
new_entry/1Called by exometer whenever a new entry is created.
remove_reporter/1Remove reporter and all its subscriptions.
remove_reporter/2Remove Reporter (non-blocking call).
setopts/3Called by exometer when options of a metric entry are changed.
start_link/0Starts the server +
add_reporter/2Add a reporter.
call_reporter/2Send a custom (synchronous) call to Reporter.
cast_reporter/2Send a custom (asynchronous) cast to Reporter.
delete_interval/2Delete a named interval.
disable_me/2Used by a reporter to disable itself.
disable_reporter/1Disable Reporter.
enable_reporter/1Enable Reporter.
list_metrics/0Equivalent to list_metrics([]).
list_metrics/1List all metrics matching Path, together with subscription status.
list_reporters/0List the name and pid of each known reporter.
list_subscriptions/1List all subscriptions for Reporter.
new_entry/1Called by exometer whenever a new entry is created.
remove_reporter/1Remove reporter and all its subscriptions.
remove_reporter/2Remove Reporter (non-blocking call).
restart_intervals/1Restart all named intervals, respecting specified delays.
set_interval/3Specify a named interval.
setopts/3Called by exometer when options of a metric entry are changed.
start_link/0Starts the server --------------------------------------------------------------------.
start_reporters/0
subscribe/4Equivalent to subscribe(Reporter, Metric, DataPoint, Interval, []).
subscribe/5Add a subscription to an existing reporter.
terminate_reporter/1
unsubscribe/3Equivalent to unsubscribe(Reporter, Metric, DataPoint, []).
unsubscribe/4Removes a subscription.
unsubscribe_all/2Removes all subscriptions related to Metric in Reporter.
@@ -337,11 +375,23 @@ additional options. is given, the module name defaults to the given reporter name. + `{status, enabled | disabled}` - The operational status of the reporter if enabled, the reporter will report values to its target. If disabled, the reporter process will be terminated and subscription timers canceled, but the subscriptions will remain, and it will also be possible to add new subscriptions to the reporter. + + +`{intervals, [named_interval()]}` +named_interval() :: {Name::atom(), Interval::pos_integer()} +| {Name::atom(), Interval::time_ms(), delay()::time_ms()} +Define named intervals. The name can be used by subscribers, so that all +subsriptions for a given named interval will be reported when the interval +triggers. An optional delay (in ms) can be given: this will cause the first +interval to start in `Delay` milliseconds. When all intervals are named +at the same time, the delay parameter can be used to achieve staggered +reporting. ### call_reporter/2 ### @@ -374,6 +424,18 @@ Send a custom (asynchronous) cast to `Reporter`. This function is used to make an asynchronous cast to a given reporter instance. Note that the reporter type must recognize the message. + + +### delete_interval/2 ### + + +

+delete_interval(Reporter::reporter_name(), Name::atom()) -> ok | error()
+
+
+ +Delete a named interval. + ### disable_me/2 ### @@ -527,6 +589,45 @@ Remove `Reporter` (non-blocking call). This function can be used to order removal of a reporter with a custom reason. Note that the function is asynchronous, making it suitable e.g. for calling from within the reporter itself. + + +### restart_intervals/1 ### + + +

+restart_intervals(Reporter::reporter_name()) -> ok
+
+
+ + +Restart all named intervals, respecting specified delays. + + +This function can be used if named intervals are added incrementally, and +it is important that all intervals trigger separated by the given delays. + + +### set_interval/3 ### + + +

+set_interval(Reporter::reporter_name(), Name::atom(), Time::time_ms() | {time_ms(), delay()}) -> ok | error()
+
+
+ + +Specify a named interval. + + + +See [`add_reporter/2`](#add_reporter-2) for a description of named intervals. +The named interval is here specified as either `Time` (milliseconds) or +`{Time, Delay}`, where a delay in milliseconds is provided. + + +If the named interval exists, it will be replaced with the new definition. +Otherwise, it will be added. Use [`restart_intervals/1`](#restart_intervals-1) if you want +all intervals to be restarted/resynched with corresponding relative delays. ### setopts/3 ### @@ -594,7 +695,10 @@ is either a single data point (an atom) or a list of data points (a list). -`Interval` is the sampling/reporting interval in milliseconds. +`Interval` is the sampling/reporting interval in milliseconds, or an atom, +referring to a named interval configured in the reporter. The named +interval need not be defined yet in the reporter (the subscription will +not trigger until it _is_ defined.) `Extra` can be anything that the chosen reporter understands (default: `[]`). @@ -634,8 +738,8 @@ Removes a subscription. Note that the subscription is identified by the combination -`{Reporter, Metric, DataPoint, Extra}`. The exact information can be extracted -using [`list_subscriptions/1`](#list_subscriptions-1). +`{Reporter, Metric, DataPoint, Extra}`. The exact information can be +extracted using [`list_subscriptions/1`](#list_subscriptions-1). ### unsubscribe_all/2 ### diff --git a/src/exometer_function.erl b/src/exometer_function.erl index 82a7f22..9bd9997 100644 --- a/src/exometer_function.erl +++ b/src/exometer_function.erl @@ -550,6 +550,8 @@ e(A, _) when is_atom(A) -> A; e({T,I}, _) when T==i; T==integer -> I; e({T,A}, _) when T==a; T==atom -> A; e({cons,Eh,Et}, Bs) -> [e(Eh, Bs)|e(Et, Bs)]; +e({hd,E}, Bs) -> hd(e(E, Bs)); +e({tl,E}, Bs) -> tl(e(E, Bs)); e({l, Es}, Bs) -> [e(E, Bs) || E <- Es]; e({T,S}, _) when T==s; T==string -> S; e({T,Es}, Bs) when T==t; T==tuple -> list_to_tuple([e(E,Bs) || E <- Es]); @@ -649,6 +651,18 @@ call1(length , [L]) -> length(L); call1(size , [T]) -> size(T); call1(byte_size, [B]) -> byte_size(B); call1(bit_size , [B]) -> bit_size(B); +call1(tuple_to_list , [T]) -> tuple_to_list(T); +call1(list_to_tuple , [L]) -> list_to_tuple(L); +call1(atom_to_list , [A]) -> atom_to_list(A); +call1(list_to_atom , [L]) -> list_to_atom(L); +call1(list_to_binary, [L]) -> list_to_binary(L); +call1(binary_to_list, [B]) -> binary_to_list(B); +call1(t2l, [T]) -> tuple_to_list(T); +call1(l2t, [L]) -> list_to_tuple(L); +call1(a2l, [A]) -> atom_to_list(A); +call1(l2a, [L]) -> list_to_atom(L); +call1(l2b, [L]) -> list_to_binary(L); +call1(b2l, [B]) -> binary_to_list(B); call1({M,F}, As) when is_atom(M), is_atom(F) -> apply(M, F, As). diff --git a/src/exometer_report.erl b/src/exometer_report.erl index 1049f65..a4fb07e 100644 --- a/src/exometer_report.erl +++ b/src/exometer_report.erl @@ -76,10 +76,12 @@ %% + `Metric'
Specifies the metric that is now subscribed to by the plugin %% as a list of atoms. %% -%% + `DataPoint'
Specifies the data point within the subscribed-to metric as an atom, or a list of atoms. +%% + `DataPoint'
Specifies the data point within the subscribed-to metric +%% as an atom, or a list of atoms. %% -%% + `Interval'
Specifies the interval, in milliseconds, that the subscribed-to -%% value will be reported at. +%% + `Interval'
Specifies the interval, in milliseconds, that the +%% subscribed-to value will be reported at, or an atom, referring to a named +%% interval configured in the reporter. %% %% + `State'
Contains the state returned by the last called plugin function. %% @@ -155,6 +157,9 @@ list_reporters/0, list_subscriptions/1, add_reporter/2, + set_interval/3, + delete_interval/2, + restart_intervals/1, remove_reporter/1, remove_reporter/2, terminate_reporter/1, enable_reporter/1, @@ -181,6 +186,7 @@ -define(SERVER, ?MODULE). +-type error() :: {error, any()}. -type metric() :: exometer:name() | {find, exometer:name()} | {select, ets:match_spec()}. @@ -188,7 +194,11 @@ -type options() :: [{atom(), any()}]. -type mod_state() :: any(). -type value() :: any(). --type interval() :: pos_integer(). +-type interval() :: pos_integer() | atom(). +-type time_ms() :: pos_integer(). +-type delay() :: time_ms(). +-type named_interval() :: {atom(), time_ms()} + | {atom(), time_ms(), delay()}. -type callback_result() :: {ok, mod_state()} | any(). -type extra() :: any(). -type reporter_name() :: atom(). @@ -243,7 +253,7 @@ -record(subscriber, { key :: #key{}, interval :: interval(), - t_ref :: reference() + t_ref :: reference() | undefined }). -record(restart, { @@ -252,12 +262,20 @@ save_n = 10 :: pos_integer()} ). +-record(interval, { + name :: atom(), + time = 0 :: non_neg_integer(), + delay = 0 :: non_neg_integer(), + t_ref :: reference() | undefined + }). + -record(reporter, { name :: atom(), pid :: pid(), mref :: reference(), module :: module(), opts = [] :: [{atom(), any()}], + intervals = [] :: [#interval{}], restart = #restart{}, status = enabled :: enabled | disabled }). @@ -293,7 +311,10 @@ subscribe(Reporter, Metric, DataPoint, Interval) -> %% a static configuration. `Metric' is the name of an exometer entry. `DataPoint' %% is either a single data point (an atom) or a list of data points (a list). %% -%% `Interval' is the sampling/reporting interval in milliseconds. +%% `Interval' is the sampling/reporting interval in milliseconds, or an atom, +%% referring to a named interval configured in the reporter. The named +%% interval need not be defined yet in the reporter (the subscription will +%% not trigger until it is defined.) %% %% `Extra' can be anything that the chosen reporter understands (default: `[]'). %% If the reporter uses {@link exometer_util:report_type/3}, `Extra' should be @@ -318,8 +339,8 @@ unsubscribe(Reporter, Metric, DataPoint) -> %% @doc Removes a subscription. %% %% Note that the subscription is identified by the combination -%% `{Reporter, Metric, DataPoint, Extra}'. The exact information can be extracted -%% using {@link list_subscriptions/1}. +%% `{Reporter, Metric, DataPoint, Extra}'. The exact information can be +%% extracted using {@link list_subscriptions/1}. %% @end unsubscribe(Reporter, Metric, DataPoint, Extra) -> call({unsubscribe, #key{reporter = Reporter, @@ -382,6 +403,16 @@ list_subscriptions(Reporter) -> %% reporter process will be terminated and subscription timers canceled, but %% the subscriptions will remain, and it will also be possible to add new %% subscriptions to the reporter. +%% +%% `{intervals, [named_interval()]}' +%% named_interval() :: {Name::atom(), Interval::pos_integer()} +%% | {Name::atom(), Interval::time_ms(), delay()::time_ms()} +%% Define named intervals. The name can be used by subscribers, so that all +%% subsriptions for a given named interval will be reported when the interval +%% triggers. An optional delay (in ms) can be given: this will cause the first +%% interval to start in `Delay' milliseconds. When all intervals are named +%% at the same time, the delay parameter can be used to achieve staggered +%% reporting. %% @end add_reporter(Reporter, Options) -> call({add_reporter, Reporter, Options}). @@ -391,6 +422,43 @@ add_reporter(Reporter, Options) -> remove_reporter(Reporter) -> call({remove_reporter, Reporter}). +-spec set_interval(reporter_name(), atom(), + time_ms() | {time_ms(), delay()}) -> ok |error(). +%% @doc Specify a named interval. +%% +%% See {@link add_reporter/2} for a description of named intervals. +%% The named interval is here specified as either `Time' (milliseconds) or +%% `{Time, Delay}', where a delay in milliseconds is provided. +%% +%% If the named interval exists, it will be replaced with the new definition. +%% Otherwise, it will be added. Use {@link restart_intervals/1} if you want +%% all intervals to be restarted/resynched with corresponding relative delays. +%% @end +set_interval(Reporter, Name, Time) when is_atom(Name), + is_integer(Time), Time >= 0 -> + call({set_interval, Reporter, Name, Time}); +set_interval(Reporter, Name, {Time, Delay}) when is_atom(Name), + is_integer(Time), Time >= 0, + is_integer(Delay), + Delay >= 0 -> + call({set_interval, Reporter, Name, {Time, Delay}}). + +-spec delete_interval(reporter_name(), atom()) -> ok | error(). +%% @doc Delete a named interval. +%% +delete_interval(Reporter, Name) -> + call({delete_interval, Reporter, Name}). + +-spec restart_intervals(reporter_name()) -> ok. +%% @doc Restart all named intervals, respecting specified delays. +%% +%% This function can be used if named intervals are added incrementally, and +%% it is important that all intervals trigger separated by the given delays. +%% @end +restart_intervals(Reporter) -> + call({restart_intervals, Reporter}). + + -spec enable_reporter(reporter_name()) -> ok | {error, any()}. %% @doc Enable `Reporter'. %% @@ -528,16 +596,20 @@ do_start_reporters(S) -> lists:foreach( fun(#reporter{name = Reporter, status = Status, - opts = ROpts} = R) -> + opts = ROpts, + intervals = Ints0} = R) -> Restart = get_restart(ROpts), - {Pid, MRef} = + {Pid, MRef, Ints} = if Status =:= enabled -> - spawn_reporter(Reporter, ROpts); - true -> {undefined, undefined} + {P1,R1} = spawn_reporter(Reporter, ROpts), + I1 = start_interval_timers(R), + {P1,R1,I1}; + true -> {undefined, undefined, Ints0} end, ets:insert(?EXOMETER_REPORTERS, R#reporter{pid = Pid, mref = MRef, + intervals = Ints, restart = Restart}) end, ReporterRecs); false -> @@ -555,13 +627,50 @@ make_reporter_recs([{R, Opts}|T]) -> [#reporter{name = R, module = get_module(R, Opts), status = proplists:get_value(status, Opts, enabled), - opts = Opts}|make_reporter_recs(T)]; + opts = Opts, + intervals = get_intervals(Opts)}|make_reporter_recs(T)]; make_reporter_recs([]) -> []. get_module(R, Opts) -> proplists:get_value(module, Opts, R). +get_intervals(Opts) -> + case lists:keyfind(intervals, 1, Opts) of + false -> []; + {_, Is} -> + lists:map( + fun({Name, Time}) when is_atom(Name), + is_integer(Time), Time >= 0 -> + #interval{name = Name, time = Time}; + ({Name, Time, Delay}) when is_atom(Name), + is_integer(Time), Time >= 0, + is_integer(Delay), Delay >= 0 -> + #interval{name = Name, time = Time, delay = Delay}; + (Other) -> + error({invalid_interval, Other}) + end, Is) + end. + +start_interval_timers(#reporter{name = R, intervals = Ints}) -> + lists:map(fun(I) -> start_interval_timer(I, R) end, Ints). + +start_interval_timer(#interval{name = Name, delay = Delay, + t_ref = Ref} = I, R) -> + cancel_timer(Ref), + case Delay of + 0 -> + do_start_interval_timer(I, R); + D -> + TRef = erlang:send_after(D, self(), {start_interval, R, Name}), + I#interval{t_ref = TRef} + end. + +do_start_interval_timer(#interval{name = Name, time = Time} = I, R) -> + TRef = erlang:send_after(Time, self(), {report_batch, R, Name}), + I#interval{t_ref = TRef}. + + get_report_env() -> Opts0 = exometer_util:get_env(report, []), {Rs1, Opts1} = split_env(reporters, Opts0), @@ -685,13 +794,13 @@ handle_call({add_reporter, Reporter, Opts}, _, #st{} = St) -> {reply, {error, already_running}, St}; false -> try + [R] = make_reporter_recs([{Reporter, Opts}]), {Pid, MRef} = spawn_reporter(Reporter, Opts), - R = #reporter {name = Reporter, - module = get_module(Reporter, Opts), - opts = Opts, - pid = Pid, - mref = MRef}, - ets:insert(?EXOMETER_REPORTERS, R), + Ints = start_interval_timers(R), + R1 = R#reporter{intervals = Ints, + pid = Pid, + mref = MRef}, + ets:insert(?EXOMETER_REPORTERS, R1), {reply, ok, St} catch error:Reason -> @@ -714,6 +823,61 @@ handle_call({change_reporter_status, Reporter, Status}, _, St) -> E -> {reply, E, St} end; +handle_call({set_interval, Reporter, Name, Int}, _, #st{}=St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + try + I0 = case lists:keyfind(Name, #interval.name, Ints) of + false -> #interval{name = Name}; + Interval -> Interval + end, + I1 = case Int of + {Time, Delay} when is_integer(Time), Time >= 0, + is_integer(Delay), Delay >= 0 -> + I0#interval{time = Time, delay = Delay}; + Time when is_integer(Time), Time >= 0 -> + I0#interval{time = Time} + end, + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keystore( + Name, #interval.name, Ints, + start_interval_timer(I1, Reporter))}]), + {reply, ok, St} + catch + error:Reason -> + {reply, {error, Reason}, St} + end; + [] -> + {reply, {error, not_found}, St} + end; +handle_call({delete_interval, Reporter, Name}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{t_ref = TRef} -> + cancel_timer(TRef), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keydelete( + Name, #interval.name, Ints)}]), + {reply, ok, St}; + false -> + {reply, {error, not_found}, St} + end; + [] -> + {reply, {error, not_found}, St} + end; +handle_call({restart_intervals, Reporter}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{} = R] -> + Ints = start_interval_timers(R), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, Ints}]), + {reply, ok, St}; + [] -> + {reply, {error, not_found}, St} + end; handle_call({setopts, Metric, Options, Status}, _, #st{}=St) -> [erlang:send(Pid, {exometer_setopts, Metric, Options, Status}) || Pid <- reporter_pids()], @@ -765,6 +929,42 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({start_interval, Reporter, Name}, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{} = I -> + I1 = do_start_interval_timer(I, Reporter), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keyreplace( + Name, #interval.name, Ints, I1)}]); + false -> + ok + end; + [] -> + ok + end, + {noreply, St}; +handle_info({report_batch, Reporter, Name}, #st{} = St) -> + %% Find all entries where reporter is Reporter and interval is Name, + %% and report them. + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [R] -> + Entries = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + _ = '_'}, + interval = Name, + _ = '_'}, [], ['$_']}]), + lists:foreach( + fun(#subscriber{key = Key}) -> + do_report(Key, Name) + end, Entries), + restart_batch_timer(Name, R); + [] -> + skip + end, + {noreply, St}; handle_info({ report, #key{ reporter = Reporter, metric = Metric, datapoint = DataPoint, @@ -772,6 +972,7 @@ handle_info({ report, #key{ reporter = Reporter, extra = Extra} = Key, Interval}, #st{} = St) -> case ets:member(?EXOMETER_SUBS, Key) of true -> + do_report(Key, Interval), case {RetryFailedMetrics, get_values(Metric, DataPoint)} of %% We found a value, or values. {_, [_|_] = Found} -> @@ -780,13 +981,7 @@ handle_info({ report, #key{ reporter = Reporter, || {DP, Val} <- Values] || {Name, Values} <- Found], %% Re-arm the timer for next round - TRef = erlang:send_after(Interval, self(), - {report, Key, Interval}), - - %% Replace the pid_subscriber info with a record having - %% the new timer ref. - ets:update_element(?EXOMETER_SUBS, Key, - [{#subscriber.t_ref, TRef}]), + restart_subscr_timer(Key, Interval), {noreply, St}; %% We did not find a value, but we should try again. @@ -798,14 +993,8 @@ handle_info({ report, #key{ reporter = Reporter, true -> ok end, %% Re-arm the timer for next round - TRef = erlang:send_after(Interval, self(), - {report, Key, Interval}), - - %% Replace the pid_subscriber info with a record having - %% the new timer ref. - ets:update_element(?EXOMETER_SUBS, Key, - [{#subscriber.t_ref, TRef}]), - {noreply, St}; + restart_subscr_timer(Key, Interval), + {noreply, St}; %% We did not find a value, and we should not retry. _ -> %% Entry removed while timer in progress. @@ -883,6 +1072,38 @@ resubscribe(#subscriber{key = #key{reporter = RName, ets:update_element(?EXOMETER_SUBS, Key, [{#subscriber.t_ref, TRef}]). +do_report(#key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + retry_failed_metrics = RetryFailedMetrics, + extra = Extra} = Key, Interval) -> + case {RetryFailedMetrics, get_values(Metric, DataPoint)} of + %% We found a value, or values. + {_, [_|_] = Found} -> + %% Distribute metric value to the correct process + [[report_value(Reporter, Name, DP, Extra, Val) + || {DP, Val} <- Values] || {Name, Values} <- Found], + %% Re-arm the timer for next round + restart_subscr_timer(Key, Interval); + %% We did not find a value, but we should try again. + {true, _ } -> + if is_list(Metric) -> + ?debug("Metric(~p) Datapoint(~p) not found." + " Will try again in ~p msec~n", + [Metric, DataPoint, Interval]); + true -> ok + end, + %% Re-arm the timer for next round + restart_subscr_timer(Key, Interval); + %% We did not find a value, and we should not retry. + _ -> + %% Entry removed while timer in progress. + ?warning("Metric(~p) Datapoint(~p) not found. Will not try again~n", + [Metric, DataPoint]) + end, + ok. + + cancel_subscr_timers(Reporter) -> lists:foreach( fun(#subscriber{key = Key, t_ref = TRef}) -> @@ -894,6 +1115,28 @@ cancel_subscr_timers(Reporter) -> _ = '_'}, _ = '_'}, [], ['$_']}])). +restart_subscr_timer(Key, Interval) when is_integer(Interval) -> + TRef = erlang:send_after(Interval, self(), + {report, Key, Interval}), + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, TRef}]); +restart_subscr_timer(_, _) -> + true. + +restart_batch_timer(Name, #reporter{name = Reporter, + intervals = Ints}) -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{time = Time} = I -> + TRef = erlang:send_after(Time, self(), + {report_batch, Reporter, Name}), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keyreplace(Name, #interval.name, Ints, + I#interval{t_ref = TRef})}]); + false -> + false + end. + cancel_timer(undefined) -> false; cancel_timer(TRef) -> @@ -942,6 +1185,10 @@ code_change(_OldVan, #st{reporters = Rs, subscribers = Ss} = S, _Extra) -> #reporter{name = Name, pid = Pid, mref = MRef, module = Module, opts = Opts, restart = Restart}; + ({reporter,Name,Pid,Mref,Module,Opts,Restart,Status}) -> + #reporter{name = Name, pid = Pid, mref = Mref, + module = Module, opts = Opts, + restart = Restart, status = Status}; (#reporter{} = R) -> R end, Rs), [ets:insert(?EXOMETER_REPORTERS, R) || R <- Rs1], @@ -1084,7 +1331,7 @@ subscribe_(Reporter, Metric, DataPoint, Interval, RetryFailedMetrics, interval = Interval, t_ref = maybe_send_after(Status, Key, Interval)}). -maybe_send_after(enabled, Key, Interval) -> +maybe_send_after(enabled, Key, Interval) when is_integer(Interval) -> erlang:send_after(Interval, self(), {report, Key, Interval}); maybe_send_after(_, _, _) -> undefined. From 9223abcaa0b281bbe0e8eab07279daa63d0f3d13 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Wed, 20 Aug 2014 22:59:52 +0200 Subject: [PATCH 3/3] add exometer_report:get_intervals/1 --- doc/exometer_report.md | 13 ++++++++++++- src/exometer_report.erl | 28 ++++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/doc/exometer_report.md b/doc/exometer_report.md index 5ca65c1..e3fbafd 100644 --- a/doc/exometer_report.md +++ b/doc/exometer_report.md @@ -342,7 +342,7 @@ time_ms() = pos_integer() ## Function Index ## -
add_reporter/2Add a reporter.
call_reporter/2Send a custom (synchronous) call to Reporter.
cast_reporter/2Send a custom (asynchronous) cast to Reporter.
delete_interval/2Delete a named interval.
disable_me/2Used by a reporter to disable itself.
disable_reporter/1Disable Reporter.
enable_reporter/1Enable Reporter.
list_metrics/0Equivalent to list_metrics([]).
list_metrics/1List all metrics matching Path, together with subscription status.
list_reporters/0List the name and pid of each known reporter.
list_subscriptions/1List all subscriptions for Reporter.
new_entry/1Called by exometer whenever a new entry is created.
remove_reporter/1Remove reporter and all its subscriptions.
remove_reporter/2Remove Reporter (non-blocking call).
restart_intervals/1Restart all named intervals, respecting specified delays.
set_interval/3Specify a named interval.
setopts/3Called by exometer when options of a metric entry are changed.
start_link/0Starts the server +
add_reporter/2Add a reporter.
call_reporter/2Send a custom (synchronous) call to Reporter.
cast_reporter/2Send a custom (asynchronous) cast to Reporter.
delete_interval/2Delete a named interval.
disable_me/2Used by a reporter to disable itself.
disable_reporter/1Disable Reporter.
enable_reporter/1Enable Reporter.
get_intervals/1List the named intervals for Reporter.
list_metrics/0Equivalent to list_metrics([]).
list_metrics/1List all metrics matching Path, together with subscription status.
list_reporters/0List the name and pid of each known reporter.
list_subscriptions/1List all subscriptions for Reporter.
new_entry/1Called by exometer whenever a new entry is created.
remove_reporter/1Remove reporter and all its subscriptions.
remove_reporter/2Remove Reporter (non-blocking call).
restart_intervals/1Restart all named intervals, respecting specified delays.
set_interval/3Specify a named interval.
setopts/3Called by exometer when options of a metric entry are changed.
start_link/0Starts the server --------------------------------------------------------------------.
start_reporters/0
subscribe/4Equivalent to subscribe(Reporter, Metric, DataPoint, Interval, []).
subscribe/5Add a subscription to an existing reporter.
terminate_reporter/1
unsubscribe/3Equivalent to unsubscribe(Reporter, Metric, DataPoint, []).
unsubscribe/4Removes a subscription.
unsubscribe_all/2Removes all subscriptions related to Metric in Reporter.
@@ -491,6 +491,17 @@ a restart. If the reporter was already enabled, nothing is changed. + + +### get_intervals/1 ### + + +

+get_intervals(Reporter::reporter_name()) -> [{atom(), [{time, pos_integer()} | {delay, pos_integer()} | {timer_ref, reference()}]}]
+
+
+ +List the named intervals for `Reporter`. ### list_metrics/0 ### diff --git a/src/exometer_report.erl b/src/exometer_report.erl index a4fb07e..c6d6f69 100644 --- a/src/exometer_report.erl +++ b/src/exometer_report.erl @@ -160,6 +160,7 @@ set_interval/3, delete_interval/2, restart_intervals/1, + get_intervals/1, remove_reporter/1, remove_reporter/2, terminate_reporter/1, enable_reporter/1, @@ -458,6 +459,14 @@ delete_interval(Reporter, Name) -> restart_intervals(Reporter) -> call({restart_intervals, Reporter}). +-spec get_intervals(reporter_name()) -> + [{atom(), [{time, pos_integer()} + | {delay, pos_integer()} + | {timer_ref, reference()}]}]. +%% @doc List the named intervals for `Reporter'. +get_intervals(Reporter) -> + call({get_intervals, Reporter}). + -spec enable_reporter(reporter_name()) -> ok | {error, any()}. %% @doc Enable `Reporter'. @@ -628,14 +637,15 @@ make_reporter_recs([{R, Opts}|T]) -> module = get_module(R, Opts), status = proplists:get_value(status, Opts, enabled), opts = Opts, - intervals = get_intervals(Opts)}|make_reporter_recs(T)]; + intervals = get_interval_opts(Opts)}|make_reporter_recs(T)]; make_reporter_recs([]) -> []. get_module(R, Opts) -> proplists:get_value(module, Opts, R). -get_intervals(Opts) -> +-spec get_interval_opts([named_interval() | any()]) -> [#interval{}]. +get_interval_opts(Opts) -> case lists:keyfind(intervals, 1, Opts) of false -> []; {_, Is} -> @@ -878,6 +888,20 @@ handle_call({restart_intervals, Reporter}, _, #st{} = St) -> [] -> {reply, {error, not_found}, St} end; +handle_call({get_intervals, Reporter}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + Info = + [{Name, [{time, T}, + {delay, D}, + {timer_ref, TR}]} || #interval{name = Name, + time = T, + delay = D, + t_ref = TR} <- Ints], + {reply, Info, St}; + [] -> + {reply, {error, not_found}, St} + end; handle_call({setopts, Metric, Options, Status}, _, #st{}=St) -> [erlang:send(Pid, {exometer_setopts, Metric, Options, Status}) || Pid <- reporter_pids()],