From af0a36185ba6bae0c76c11e571aa31c707b4bf2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=A1=D0=B0?= =?UTF-8?q?=D0=B1=D0=B8=D0=BB=D0=B8=D0=BD?= Date: Mon, 4 May 2026 20:56:27 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A1=D0=B1=D0=BE=D1=80=20=D1=81=D1=82=D0=B0?= =?UTF-8?q?=D1=82=D0=B8=D1=81=D1=82=D0=B8=D0=BA=D0=B8=20=D1=87=D0=B5=D1=80?= =?UTF-8?q?=D0=B5=D0=B7=20=D1=82=D1=80=D0=B8=D0=B3=D0=B3=D0=B5=D1=80=D1=8B?= =?UTF-8?q?=20https://git.sabilin.com/EventHub/EventHubBack/issues/16?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/records.hrl | 8 ++ src/infra/infra_mnesia.erl | 5 +- src/infra/infra_sup.erl | 6 ++ src/infra/stats_collector.erl | 133 ++++++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 src/infra/stats_collector.erl diff --git a/include/records.hrl b/include/records.hrl index 29c9925..ebc66a5 100644 --- a/include/records.hrl +++ b/include/records.hrl @@ -233,4 +233,12 @@ body :: binary(), is_read :: boolean(), created_at :: calendar:datetime() +}). + +-record(stats, { + id :: binary(), + metric :: atom(), + entity_id :: binary(), + value :: integer() | {integer(), integer()}, + timestamp :: calendar:datetime() }). \ No newline at end of file diff --git a/src/infra/infra_mnesia.erl b/src/infra/infra_mnesia.erl index 167a458..c611a07 100644 --- a/src/infra/infra_mnesia.erl +++ b/src/infra/infra_mnesia.erl @@ -18,7 +18,8 @@ booking, review, report, banned_word, ticket, subscription, - admin_audit, notification + admin_audit, notification, + stats ]). -define(DISC_TABLES, ?TABLES -- [session, admin_session]). @@ -58,6 +59,7 @@ handle_call(init_tables, _From, State) -> end, lists:foreach(fun create_table/1, ?TABLES), ok = create_indices(), + ok = stats_collector:subscribe(), ok = start_cleanup_timer(), {reply, ok, State}; @@ -212,6 +214,7 @@ table_opts(ticket) -> [{disc_copies, [node()]}, {attributes, record_info(fields, table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(fields, subscription)}]; table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}]; table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}]; +table_opts(stats) -> [{disc_copies, [node()]}, {attributes, record_info(fields, stats)}]; table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}]; table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}]. diff --git a/src/infra/infra_sup.erl b/src/infra/infra_sup.erl index 0dca659..f73deb5 100644 --- a/src/infra/infra_sup.erl +++ b/src/infra/infra_sup.erl @@ -21,6 +21,12 @@ init([]) -> shutdown => 5000, type => worker, modules => [infra_mnesia]}, + #{id => stats_collector, + start => {stats_collector, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [stats_collector]}, #{id => archive_manager, start => {archive_manager, start_link, []}, restart => permanent, diff --git a/src/infra/stats_collector.erl b/src/infra/stats_collector.erl new file mode 100644 index 0000000..f0cd353 --- /dev/null +++ b/src/infra/stats_collector.erl @@ -0,0 +1,133 @@ +-module(stats_collector). +-behaviour(gen_server). + +-include("records.hrl"). + +-export([start_link/0, get_stats/0, subscribe/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(FLUSH_INTERVAL, 300000). + +start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +get_stats() -> gen_server:call(?MODULE, get_stats). +subscribe() -> gen_server:call(?MODULE, subscribe). + +init([]) -> + ets:new(stats_ets, [set, public, named_table, {keypos, 1}]), + erlang:send_after(?FLUSH_INTERVAL, self(), flush), + {ok, #{}}. + +handle_call(subscribe, _From, State) -> + mnesia:subscribe({table, event, simple}), + mnesia:subscribe({table, booking, simple}), + mnesia:subscribe({table, review, simple}), + {reply, ok, State}; +handle_call(get_stats, _From, State) -> + {reply, ets:tab2list(stats_ets), State}; +handle_call(_Msg, _From, State) -> + {reply, {error, unknown}, State}. + +handle_cast(_Msg, State) -> {noreply, State}. + +handle_info(flush, State) -> + flush_stats(), + erlang:send_after(?FLUSH_INTERVAL, self(), flush), + {noreply, State}; +handle_info({mnesia_table_event, {write, Record, _ActivityId}}, State) -> + Table = element(1, Record), + process_write(Table, Record, []), + {noreply, State}; +handle_info({write, Table, New, Old, _ActivityId}, State) -> + process_write(Table, New, Old), + {noreply, State}; +handle_info(_Info, State) -> {noreply, State}. + +terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +ensure_counter(Key) -> + case ets:member(stats_ets, Key) of + false -> ets:insert(stats_ets, {Key, 0}); + true -> ok + end. + +process_write(event, New, Old) -> + CId = element(#event.calendar_id, New), + StNew = element(#event.status, New), + StOld = if Old =:= [] -> []; true -> element(#event.status, Old) end, + if StNew =:= active, StOld =/= active -> + ensure_counter({events_created, CId}), + ets:update_counter(stats_ets, {events_created, CId}, {2, 1}); + StNew =:= cancelled, StOld =/= cancelled -> + ensure_counter({events_cancelled, CId}), + ets:update_counter(stats_ets, {events_cancelled, CId}, {2, 1}); + true -> ok + end; +process_write(booking, New, Old) -> + EvId = element(#booking.event_id, New), + StNew = element(#booking.status, New), + StOld = if Old =:= [] -> []; true -> element(#booking.status, Old) end, + CId = case mnesia:dirty_read({event, EvId}) of + [#event{calendar_id = C}] -> C; + _ -> undefined + end, + if CId =/= undefined -> + if StNew =:= confirmed, StOld =/= confirmed -> + ensure_counter({bookings_confirmed, CId}), + ets:update_counter(stats_ets, {bookings_confirmed, CId}, {2, 1}); + StNew =:= cancelled, StOld =/= cancelled -> + ensure_counter({bookings_cancelled, CId}), + ets:update_counter(stats_ets, {bookings_cancelled, CId}, {2, 1}); + true -> ok + end, + if Old =:= [] -> + ensure_counter({bookings_created, CId}), + ets:update_counter(stats_ets, {bookings_created, CId}, {2, 1}); + true -> ok + end; + true -> ok + end; +process_write(review, New, Old) -> + CId = case element(#review.target_type, New) of + event -> + EvId = element(#review.target_id, New), + case mnesia:dirty_read({event, EvId}) of + [#event{calendar_id = C}] -> C; + _ -> undefined + end; + calendar -> + element(#review.target_id, New) + end, + Rating = element(#review.rating, New), + if CId =/= undefined -> + if Old =:= [] -> + ensure_counter({reviews_created, CId}), + ets:update_counter(stats_ets, {reviews_created, CId}, {2, 1}), + ensure_counter({reviews_sum, CId}), + ets:update_counter(stats_ets, {reviews_sum, CId}, {2, Rating}), + ensure_counter({reviews_count, CId}), + ets:update_counter(stats_ets, {reviews_count, CId}, {2, 1}); + true -> + OldRating = element(#review.rating, Old), + if Rating =/= OldRating -> + ensure_counter({reviews_sum, CId}), + ets:update_counter(stats_ets, {reviews_sum, CId}, {2, Rating - OldRating}); + true -> ok + end + end; + true -> ok + end; +process_write(_, _, _) -> ok. + +flush_stats() -> + All = ets:tab2list(stats_ets), + lists:foreach(fun({{Metric, EntityId}, Value}) -> + mnesia:dirty_write(#stats{ + id = list_to_binary(io_lib:format("~p_~p_~p", [Metric, EntityId, os:system_time(millisecond)])), + metric = Metric, + entity_id = EntityId, + value = Value, + timestamp = calendar:local_time() + }) + end, All), + ets:match_delete(stats_ets, '_'). \ No newline at end of file