248 lines
9.4 KiB
Erlang
248 lines
9.4 KiB
Erlang
%% ===================================================================
|
||
%% EventHub – infra_mnesia (финальная версия с автоочисткой кластера)
|
||
%% ===================================================================
|
||
-module(infra_mnesia).
|
||
-behaviour(gen_server).
|
||
|
||
-include("records.hrl").
|
||
|
||
-export([start_link/0, init_tables/0, wait_for_tables/0]).
|
||
-export([add_cluster_nodes/1]).
|
||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||
terminate/2, code_change/3]).
|
||
|
||
-define(TABLES, [
|
||
user, session, admin, admin_session,
|
||
calendar, calendar_share, calendar_specialist,
|
||
event, recurrence_exception,
|
||
booking,
|
||
review, report, banned_word,
|
||
ticket, subscription,
|
||
admin_audit, notification,
|
||
stats, schema_migration
|
||
]).
|
||
|
||
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
|
||
-define(TABLE_WAIT_TIMEOUT, 5000).
|
||
-define(CLEANUP_INTERVAL, 30000). % 30 секунд
|
||
|
||
%% ===================================================================
|
||
%% API
|
||
%% ===================================================================
|
||
|
||
start_link() ->
|
||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||
|
||
init_tables() ->
|
||
gen_server:call(?MODULE, init_tables).
|
||
|
||
wait_for_tables() ->
|
||
gen_server:call(?MODULE, wait_for_tables).
|
||
|
||
add_cluster_nodes(Nodes) ->
|
||
gen_server:call(?MODULE, {add_nodes, Nodes}).
|
||
|
||
%% ===================================================================
|
||
%% gen_server callbacks
|
||
%% ===================================================================
|
||
|
||
init([]) ->
|
||
{ok, #{}}.
|
||
|
||
handle_call(init_tables, _From, State) ->
|
||
ExtraNodes = application:get_env(eventhub, extra_db_nodes, []),
|
||
case ExtraNodes of
|
||
[] ->
|
||
ok = maybe_recreate_schema();
|
||
_ ->
|
||
ok = join_cluster(ExtraNodes)
|
||
end,
|
||
lists:foreach(fun create_table/1, ?TABLES),
|
||
ok = create_indices(),
|
||
ok = stats_collector:subscribe(),
|
||
ok = start_cleanup_timer(),
|
||
ok = migration_engine:init_migrations_table(),
|
||
_ = migration_engine:apply_pending(),
|
||
{reply, ok, State};
|
||
|
||
handle_call({add_nodes, Nodes}, _From, State) ->
|
||
ok = do_add_nodes(Nodes),
|
||
{reply, ok, State};
|
||
|
||
handle_call(wait_for_tables, _From, State) ->
|
||
mnesia:wait_for_tables(?TABLES, ?TABLE_WAIT_TIMEOUT),
|
||
{reply, ok, State}.
|
||
|
||
handle_cast(_Msg, State) -> {noreply, State}.
|
||
handle_info({cleanup}, State) ->
|
||
prune_dead_nodes(),
|
||
erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}),
|
||
{noreply, State};
|
||
handle_info(_Info, State) -> {noreply, State}.
|
||
|
||
terminate(_Reason, _State) -> ok.
|
||
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||
|
||
%% ===================================================================
|
||
%% Управление схемой
|
||
%% ===================================================================
|
||
|
||
maybe_recreate_schema() ->
|
||
MnesiaDir = mnesia:system_info(directory),
|
||
case filelib:is_dir(MnesiaDir) of
|
||
false ->
|
||
io:format("Mnesia directory not found. Creating fresh schema...~n"),
|
||
mnesia:stop(),
|
||
mnesia:delete_schema([node()]),
|
||
mnesia:create_schema([node()]),
|
||
mnesia:start(),
|
||
ok;
|
||
true ->
|
||
io:format("Mnesia directory exists (~s). Reusing existing schema.~n", [MnesiaDir]),
|
||
case mnesia:system_info(is_running) of
|
||
yes -> ok;
|
||
_ -> mnesia:start()
|
||
end
|
||
end.
|
||
|
||
join_cluster(Nodes) ->
|
||
case mnesia:system_info(is_running) of
|
||
yes -> mnesia:stop();
|
||
no -> ok
|
||
end,
|
||
application:set_env(mnesia, extra_db_nodes, Nodes),
|
||
mnesia:start(),
|
||
ensure_schema_disc(),
|
||
wait_for_tables_available(),
|
||
lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES).
|
||
|
||
do_add_nodes(Nodes) ->
|
||
ExtraNodes = application:get_env(eventhub, extra_db_nodes, []),
|
||
application:set_env(eventhub, extra_db_nodes, Nodes ++ ExtraNodes),
|
||
{ok, _} = mnesia:change_config(extra_db_nodes, Nodes),
|
||
ensure_schema_disc(),
|
||
wait_for_tables_available(),
|
||
lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES).
|
||
|
||
ensure_schema_disc() ->
|
||
case lists:member(node(), mnesia:table_info(schema, disc_copies)) of
|
||
false ->
|
||
io:format("Changing schema copy to disc...~n"),
|
||
case mnesia:change_table_copy_type(schema, node(), disc_copies) of
|
||
{atomic, ok} -> ok;
|
||
{aborted, {already_exists, _, _}} -> ok;
|
||
{aborted, Reason} -> error({failed_schema_disc, Reason})
|
||
end;
|
||
true -> ok
|
||
end.
|
||
|
||
add_local_disc_copy(Tab) ->
|
||
case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of
|
||
false ->
|
||
io:format("Adding local disc copy of table ~p...~n", [Tab]),
|
||
case mnesia:add_table_copy(Tab, node(), disc_copies) of
|
||
{atomic, ok} -> ok;
|
||
{aborted, {already_exists, _}} -> ok;
|
||
{aborted, Reason} ->
|
||
io:format("Could not add disc copy for ~p: ~p~n", [Tab, Reason])
|
||
end;
|
||
true -> ok
|
||
end.
|
||
|
||
wait_for_tables_available() ->
|
||
lists:foreach(fun(Tab) -> wait_for_table(Tab) end, ?DISC_TABLES).
|
||
|
||
wait_for_table(Tab) ->
|
||
case lists:member(Tab, mnesia:system_info(tables)) of
|
||
true -> ok;
|
||
false ->
|
||
timer:sleep(100),
|
||
wait_for_table(Tab)
|
||
end.
|
||
|
||
%% ===================================================================
|
||
%% Автоматическая очистка мёртвых узлов
|
||
%% ===================================================================
|
||
|
||
start_cleanup_timer() ->
|
||
erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}),
|
||
ok.
|
||
|
||
prune_dead_nodes() ->
|
||
AliveNodes = lists:filter(fun(Node) ->
|
||
Node =/= node() andalso net_adm:ping(Node) =:= pong
|
||
end, mnesia:system_info(db_nodes)),
|
||
DeadNodes = mnesia:system_info(db_nodes) -- [node() | AliveNodes],
|
||
lists:foreach(fun(Node) ->
|
||
io:format("Removing dead node ~p from Mnesia schema...~n", [Node]),
|
||
lists:foreach(fun(Tab) ->
|
||
case lists:member(Node, mnesia:table_info(Tab, disc_copies)) of
|
||
true -> catch mnesia:del_table_copy(Tab, Node);
|
||
false -> ok
|
||
end
|
||
end, ?DISC_TABLES),
|
||
catch mnesia:del_table_copy(schema, Node)
|
||
end, DeadNodes).
|
||
|
||
%% ===================================================================
|
||
%% Создание / открытие таблиц
|
||
%% ===================================================================
|
||
|
||
create_table(Table) ->
|
||
Opts = table_opts(Table),
|
||
case mnesia:create_table(Table, Opts) of
|
||
{atomic, ok} -> ok;
|
||
{aborted, {already_exists, _}} -> ok;
|
||
{aborted, Reason} ->
|
||
error({table_creation_failed, Table, Reason})
|
||
end.
|
||
|
||
%% ===================================================================
|
||
%% Опции хранения таблиц
|
||
%% ===================================================================
|
||
|
||
table_opts(user) -> [{disc_copies, [node()]}, {attributes, record_info(fields, user)}];
|
||
table_opts(admin) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin)}];
|
||
table_opts(calendar) -> [{disc_copies, [node()]}, {attributes, record_info(fields, calendar)}];
|
||
table_opts(calendar_share) -> [{disc_copies, [node()]}, {attributes, record_info(fields, calendar_share)}];
|
||
table_opts(calendar_specialist) -> [{disc_copies, [node()]}, {attributes, record_info(fields, calendar_specialist)}];
|
||
table_opts(event) -> [{disc_copies, [node()]}, {attributes, record_info(fields, event)}];
|
||
table_opts(recurrence_exception) -> [{disc_copies, [node()]}, {attributes, record_info(fields, recurrence_exception)}];
|
||
table_opts(booking) -> [{disc_copies, [node()]}, {attributes, record_info(fields, booking)}];
|
||
table_opts(review) -> [{disc_copies, [node()]}, {attributes, record_info(fields, review)}];
|
||
table_opts(report) -> [{disc_copies, [node()]}, {attributes, record_info(fields, report)}];
|
||
table_opts(banned_word) -> [{disc_copies, [node()]}, {attributes, record_info(fields, banned_word)}];
|
||
table_opts(ticket) -> [{disc_copies, [node()]}, {attributes, record_info(fields, ticket)}];
|
||
table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(fields, subscription)}];
|
||
table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}];
|
||
table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}];
|
||
table_opts(stats) -> [{disc_copies, [node()]}, {attributes, record_info(fields, stats)}];
|
||
table_opts(schema_migration) -> [{disc_copies, [node()]}, {attributes, record_info(fields, schema_migration)}];
|
||
table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}];
|
||
table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}].
|
||
|
||
%% ===================================================================
|
||
%% Индексы
|
||
%% ===================================================================
|
||
|
||
create_indices() ->
|
||
mnesia:add_table_index(event, calendar_id),
|
||
mnesia:add_table_index(event, start_time),
|
||
mnesia:add_table_index(event, event_type),
|
||
mnesia:add_table_index(event, master_id),
|
||
mnesia:add_table_index(event, specialist_id),
|
||
mnesia:add_table_index(event, status),
|
||
mnesia:add_table_index(booking, event_id),
|
||
mnesia:add_table_index(booking, user_id),
|
||
mnesia:add_table_index(booking, status),
|
||
mnesia:add_table_index(calendar, owner_id),
|
||
mnesia:add_table_index(calendar, status),
|
||
mnesia:add_table_index(calendar, short_name),
|
||
mnesia:add_table_index(calendar, category),
|
||
mnesia:add_table_index(calendar_specialist, calendar_id),
|
||
mnesia:add_table_index(calendar_specialist, user_id),
|
||
mnesia:add_table_index(user, nickname),
|
||
mnesia:add_table_index(user, email),
|
||
mnesia:add_table_index(notification, user_id),
|
||
mnesia:add_table_index(notification, is_read),
|
||
ok. |