Настройка репликации. Финал #14
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
%% ===================================================================
|
||||
%% EventHub – infra_mnesia (финальная рабочая версия, задача #14)
|
||||
%% EventHub – infra_mnesia (финальная версия с автоочисткой кластера)
|
||||
%% ===================================================================
|
||||
-module(infra_mnesia).
|
||||
-behaviour(gen_server).
|
||||
@@ -7,6 +7,7 @@
|
||||
-include("records.hrl").
|
||||
|
||||
-export([start_link/0, init_tables/0, wait_for_tables/0]).
|
||||
-export([add_cluster_nodes/1]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
@@ -20,10 +21,9 @@
|
||||
admin_audit, notification
|
||||
]).
|
||||
|
||||
%% Таблицы, которые должны иметь дисковые копии на каждом узле
|
||||
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
|
||||
|
||||
-define(TABLE_WAIT_TIMEOUT, 5000).
|
||||
-define(CLEANUP_INTERVAL, 30000). % 30 секунд
|
||||
|
||||
%% ===================================================================
|
||||
%% API
|
||||
@@ -38,6 +38,9 @@ init_tables() ->
|
||||
wait_for_tables() ->
|
||||
gen_server:call(?MODULE, wait_for_tables).
|
||||
|
||||
add_cluster_nodes(Nodes) ->
|
||||
gen_server:call(?MODULE, {add_nodes, Nodes}).
|
||||
|
||||
%% ===================================================================
|
||||
%% gen_server callbacks
|
||||
%% ===================================================================
|
||||
@@ -55,6 +58,11 @@ handle_call(init_tables, _From, State) ->
|
||||
end,
|
||||
lists:foreach(fun create_table/1, ?TABLES),
|
||||
ok = create_indices(),
|
||||
ok = start_cleanup_timer(),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call({add_nodes, Nodes}, _From, State) ->
|
||||
ok = do_add_nodes(Nodes),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(wait_for_tables, _From, State) ->
|
||||
@@ -62,12 +70,17 @@ handle_call(wait_for_tables, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Msg, State) -> {noreply, State}.
|
||||
handle_info({cleanup}, State) ->
|
||||
prune_dead_nodes(),
|
||||
erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}),
|
||||
{noreply, State};
|
||||
handle_info(_Info, State) -> {noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) -> ok.
|
||||
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||||
|
||||
%% ===================================================================
|
||||
%% Локальное создание схемы (только если нет extra_db_nodes)
|
||||
%% Управление схемой
|
||||
%% ===================================================================
|
||||
|
||||
maybe_recreate_schema() ->
|
||||
@@ -88,10 +101,6 @@ maybe_recreate_schema() ->
|
||||
end
|
||||
end.
|
||||
|
||||
%% ===================================================================
|
||||
%% Присоединение к кластеру – добавляет локальные дисковые копии таблиц
|
||||
%% ===================================================================
|
||||
|
||||
join_cluster(Nodes) ->
|
||||
case mnesia:system_info(is_running) of
|
||||
yes -> mnesia:stop();
|
||||
@@ -100,22 +109,16 @@ join_cluster(Nodes) ->
|
||||
application:set_env(mnesia, extra_db_nodes, Nodes),
|
||||
mnesia:start(),
|
||||
ensure_schema_disc(),
|
||||
% Дожидаемся появления всех таблиц в локальной схеме
|
||||
wait_for_tables_available(),
|
||||
lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES).
|
||||
|
||||
wait_for_tables_available() ->
|
||||
lists:foreach(fun(Tab) ->
|
||||
wait_for_table(Tab)
|
||||
end, ?DISC_TABLES).
|
||||
|
||||
wait_for_table(Tab) ->
|
||||
case lists:member(Tab, mnesia:system_info(tables)) of
|
||||
true -> ok;
|
||||
false ->
|
||||
timer:sleep(100),
|
||||
wait_for_table(Tab)
|
||||
end.
|
||||
do_add_nodes(Nodes) ->
|
||||
ExtraNodes = application:get_env(eventhub, extra_db_nodes, []),
|
||||
application:set_env(eventhub, extra_db_nodes, Nodes ++ ExtraNodes),
|
||||
{ok, _} = mnesia:change_config(extra_db_nodes, Nodes),
|
||||
ensure_schema_disc(),
|
||||
wait_for_tables_available(),
|
||||
lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES).
|
||||
|
||||
ensure_schema_disc() ->
|
||||
case lists:member(node(), mnesia:table_info(schema, disc_copies)) of
|
||||
@@ -142,6 +145,41 @@ add_local_disc_copy(Tab) ->
|
||||
true -> ok
|
||||
end.
|
||||
|
||||
wait_for_tables_available() ->
|
||||
lists:foreach(fun(Tab) -> wait_for_table(Tab) end, ?DISC_TABLES).
|
||||
|
||||
wait_for_table(Tab) ->
|
||||
case lists:member(Tab, mnesia:system_info(tables)) of
|
||||
true -> ok;
|
||||
false ->
|
||||
timer:sleep(100),
|
||||
wait_for_table(Tab)
|
||||
end.
|
||||
|
||||
%% ===================================================================
|
||||
%% Автоматическая очистка мёртвых узлов
|
||||
%% ===================================================================
|
||||
|
||||
start_cleanup_timer() ->
|
||||
erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}),
|
||||
ok.
|
||||
|
||||
prune_dead_nodes() ->
|
||||
AliveNodes = lists:filter(fun(Node) ->
|
||||
Node =/= node() andalso net_adm:ping(Node) =:= pong
|
||||
end, mnesia:system_info(db_nodes)),
|
||||
DeadNodes = mnesia:system_info(db_nodes) -- [node() | AliveNodes],
|
||||
lists:foreach(fun(Node) ->
|
||||
io:format("Removing dead node ~p from Mnesia schema...~n", [Node]),
|
||||
lists:foreach(fun(Tab) ->
|
||||
case lists:member(Node, mnesia:table_info(Tab, disc_copies)) of
|
||||
true -> catch mnesia:del_table_copy(Tab, Node);
|
||||
false -> ok
|
||||
end
|
||||
end, ?DISC_TABLES),
|
||||
catch mnesia:del_table_copy(schema, Node)
|
||||
end, DeadNodes).
|
||||
|
||||
%% ===================================================================
|
||||
%% Создание / открытие таблиц
|
||||
%% ===================================================================
|
||||
|
||||
Reference in New Issue
Block a user