Сбор статистики через триггеры #16
This commit is contained in:
@@ -234,3 +234,11 @@
|
|||||||
is_read :: boolean(),
|
is_read :: boolean(),
|
||||||
created_at :: calendar:datetime()
|
created_at :: calendar:datetime()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-record(stats, {
|
||||||
|
id :: binary(),
|
||||||
|
metric :: atom(),
|
||||||
|
entity_id :: binary(),
|
||||||
|
value :: integer() | {integer(), integer()},
|
||||||
|
timestamp :: calendar:datetime()
|
||||||
|
}).
|
||||||
@@ -18,7 +18,8 @@
|
|||||||
booking,
|
booking,
|
||||||
review, report, banned_word,
|
review, report, banned_word,
|
||||||
ticket, subscription,
|
ticket, subscription,
|
||||||
admin_audit, notification
|
admin_audit, notification,
|
||||||
|
stats
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
|
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
|
||||||
@@ -58,6 +59,7 @@ handle_call(init_tables, _From, State) ->
|
|||||||
end,
|
end,
|
||||||
lists:foreach(fun create_table/1, ?TABLES),
|
lists:foreach(fun create_table/1, ?TABLES),
|
||||||
ok = create_indices(),
|
ok = create_indices(),
|
||||||
|
ok = stats_collector:subscribe(),
|
||||||
ok = start_cleanup_timer(),
|
ok = start_cleanup_timer(),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
@@ -212,6 +214,7 @@ table_opts(ticket) -> [{disc_copies, [node()]}, {attributes, record_info(fields,
|
|||||||
table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(fields, subscription)}];
|
table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(fields, subscription)}];
|
||||||
table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}];
|
table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}];
|
||||||
table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}];
|
table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}];
|
||||||
|
table_opts(stats) -> [{disc_copies, [node()]}, {attributes, record_info(fields, stats)}];
|
||||||
table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}];
|
table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}];
|
||||||
table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}].
|
table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}].
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,12 @@ init([]) ->
|
|||||||
shutdown => 5000,
|
shutdown => 5000,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [infra_mnesia]},
|
modules => [infra_mnesia]},
|
||||||
|
#{id => stats_collector,
|
||||||
|
start => {stats_collector, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 5000,
|
||||||
|
type => worker,
|
||||||
|
modules => [stats_collector]},
|
||||||
#{id => archive_manager,
|
#{id => archive_manager,
|
||||||
start => {archive_manager, start_link, []},
|
start => {archive_manager, start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
|
|||||||
133
src/infra/stats_collector.erl
Normal file
133
src/infra/stats_collector.erl
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
-module(stats_collector).
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("records.hrl").
|
||||||
|
|
||||||
|
-export([start_link/0, get_stats/0, subscribe/0]).
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(FLUSH_INTERVAL, 300000).
|
||||||
|
|
||||||
|
start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
get_stats() -> gen_server:call(?MODULE, get_stats).
|
||||||
|
subscribe() -> gen_server:call(?MODULE, subscribe).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
ets:new(stats_ets, [set, public, named_table, {keypos, 1}]),
|
||||||
|
erlang:send_after(?FLUSH_INTERVAL, self(), flush),
|
||||||
|
{ok, #{}}.
|
||||||
|
|
||||||
|
handle_call(subscribe, _From, State) ->
|
||||||
|
mnesia:subscribe({table, event, simple}),
|
||||||
|
mnesia:subscribe({table, booking, simple}),
|
||||||
|
mnesia:subscribe({table, review, simple}),
|
||||||
|
{reply, ok, State};
|
||||||
|
handle_call(get_stats, _From, State) ->
|
||||||
|
{reply, ets:tab2list(stats_ets), State};
|
||||||
|
handle_call(_Msg, _From, State) ->
|
||||||
|
{reply, {error, unknown}, State}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, State) -> {noreply, State}.
|
||||||
|
|
||||||
|
handle_info(flush, State) ->
|
||||||
|
flush_stats(),
|
||||||
|
erlang:send_after(?FLUSH_INTERVAL, self(), flush),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({mnesia_table_event, {write, Record, _ActivityId}}, State) ->
|
||||||
|
Table = element(1, Record),
|
||||||
|
process_write(Table, Record, []),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({write, Table, New, Old, _ActivityId}, State) ->
|
||||||
|
process_write(Table, New, Old),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info(_Info, State) -> {noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) -> ok.
|
||||||
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||||||
|
|
||||||
|
ensure_counter(Key) ->
|
||||||
|
case ets:member(stats_ets, Key) of
|
||||||
|
false -> ets:insert(stats_ets, {Key, 0});
|
||||||
|
true -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
process_write(event, New, Old) ->
|
||||||
|
CId = element(#event.calendar_id, New),
|
||||||
|
StNew = element(#event.status, New),
|
||||||
|
StOld = if Old =:= [] -> []; true -> element(#event.status, Old) end,
|
||||||
|
if StNew =:= active, StOld =/= active ->
|
||||||
|
ensure_counter({events_created, CId}),
|
||||||
|
ets:update_counter(stats_ets, {events_created, CId}, {2, 1});
|
||||||
|
StNew =:= cancelled, StOld =/= cancelled ->
|
||||||
|
ensure_counter({events_cancelled, CId}),
|
||||||
|
ets:update_counter(stats_ets, {events_cancelled, CId}, {2, 1});
|
||||||
|
true -> ok
|
||||||
|
end;
|
||||||
|
process_write(booking, New, Old) ->
|
||||||
|
EvId = element(#booking.event_id, New),
|
||||||
|
StNew = element(#booking.status, New),
|
||||||
|
StOld = if Old =:= [] -> []; true -> element(#booking.status, Old) end,
|
||||||
|
CId = case mnesia:dirty_read({event, EvId}) of
|
||||||
|
[#event{calendar_id = C}] -> C;
|
||||||
|
_ -> undefined
|
||||||
|
end,
|
||||||
|
if CId =/= undefined ->
|
||||||
|
if StNew =:= confirmed, StOld =/= confirmed ->
|
||||||
|
ensure_counter({bookings_confirmed, CId}),
|
||||||
|
ets:update_counter(stats_ets, {bookings_confirmed, CId}, {2, 1});
|
||||||
|
StNew =:= cancelled, StOld =/= cancelled ->
|
||||||
|
ensure_counter({bookings_cancelled, CId}),
|
||||||
|
ets:update_counter(stats_ets, {bookings_cancelled, CId}, {2, 1});
|
||||||
|
true -> ok
|
||||||
|
end,
|
||||||
|
if Old =:= [] ->
|
||||||
|
ensure_counter({bookings_created, CId}),
|
||||||
|
ets:update_counter(stats_ets, {bookings_created, CId}, {2, 1});
|
||||||
|
true -> ok
|
||||||
|
end;
|
||||||
|
true -> ok
|
||||||
|
end;
|
||||||
|
process_write(review, New, Old) ->
|
||||||
|
CId = case element(#review.target_type, New) of
|
||||||
|
event ->
|
||||||
|
EvId = element(#review.target_id, New),
|
||||||
|
case mnesia:dirty_read({event, EvId}) of
|
||||||
|
[#event{calendar_id = C}] -> C;
|
||||||
|
_ -> undefined
|
||||||
|
end;
|
||||||
|
calendar ->
|
||||||
|
element(#review.target_id, New)
|
||||||
|
end,
|
||||||
|
Rating = element(#review.rating, New),
|
||||||
|
if CId =/= undefined ->
|
||||||
|
if Old =:= [] ->
|
||||||
|
ensure_counter({reviews_created, CId}),
|
||||||
|
ets:update_counter(stats_ets, {reviews_created, CId}, {2, 1}),
|
||||||
|
ensure_counter({reviews_sum, CId}),
|
||||||
|
ets:update_counter(stats_ets, {reviews_sum, CId}, {2, Rating}),
|
||||||
|
ensure_counter({reviews_count, CId}),
|
||||||
|
ets:update_counter(stats_ets, {reviews_count, CId}, {2, 1});
|
||||||
|
true ->
|
||||||
|
OldRating = element(#review.rating, Old),
|
||||||
|
if Rating =/= OldRating ->
|
||||||
|
ensure_counter({reviews_sum, CId}),
|
||||||
|
ets:update_counter(stats_ets, {reviews_sum, CId}, {2, Rating - OldRating});
|
||||||
|
true -> ok
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
true -> ok
|
||||||
|
end;
|
||||||
|
process_write(_, _, _) -> ok.
|
||||||
|
|
||||||
|
flush_stats() ->
|
||||||
|
All = ets:tab2list(stats_ets),
|
||||||
|
lists:foreach(fun({{Metric, EntityId}, Value}) ->
|
||||||
|
mnesia:dirty_write(#stats{
|
||||||
|
id = list_to_binary(io_lib:format("~p_~p_~p", [Metric, EntityId, os:system_time(millisecond)])),
|
||||||
|
metric = Metric,
|
||||||
|
entity_id = EntityId,
|
||||||
|
value = Value,
|
||||||
|
timestamp = calendar:local_time()
|
||||||
|
})
|
||||||
|
end, All),
|
||||||
|
ets:match_delete(stats_ets, '_').
|
||||||
Reference in New Issue
Block a user