Настройка репликации. Часть 1 #14

This commit is contained in:
2026-05-03 14:55:57 +03:00
parent 4fdf380f15
commit 3d61830f1c
6 changed files with 137 additions and 58 deletions

1
.gitignore vendored
View File

@@ -24,3 +24,4 @@ rebar3.crashdump
/rebar.lock /rebar.lock
/build/ /build/
docker/.env docker/.env
/Mnesia.*/

View File

@@ -76,6 +76,7 @@ stop: ## Остановить приложение
@echo "Остановка приложения..." @echo "Остановка приложения..."
@pkill -f "rebar3 shell --sname $(SNAME)" || true @pkill -f "rebar3 shell --sname $(SNAME)" || true
@pkill -f "beam.*$(SNAME)" || true @pkill -f "beam.*$(SNAME)" || true
@pkill -f beam.smp || true
@echo "✓ Приложение остановлено" @echo "✓ Приложение остановлено"
restart: stop run ## Перезапустить приложение restart: stop run ## Перезапустить приложение

View File

@@ -1,14 +1,11 @@
[ [
{eventhub, [ {eventhub, [
{http_port, 8080}, {http_port, "${HTTP_PORT:-8080}"},
{ws_port, 8081}, {ws_port, "${WS_PORT:-8081}"},
{admin_http_port, 8445}, {admin_http_port, "${ADMIN_HTTP_PORT:-8445}"},
{admin_ws_port, 8446}, {admin_ws_port, "${ADMIN_WS_PORT:-8446}"},
{jwt_secret, <<"${JWT_SECRET:-change_me_in_production}">>} {jwt_secret, <<"${JWT_SECRET:-change_me_in_production}">>}
]}, ]},
{mnesia, [
{dir, "${MNESIA_DIR:-Mnesia.${NODE}}"}
]},
{kernel, [ {kernel, [
{logger_level, info}, {logger_level, info},
{logger, [ {logger, [

View File

@@ -8,15 +8,21 @@ start(_StartType, _StartArgs) ->
application:ensure_all_started(cowboy), application:ensure_all_started(cowboy),
case infra_sup:start_link() of case infra_sup:start_link() of
{ok, Pid} -> {ok, Pid} ->
% Чтение extra_db_nodes из переменной окружения (если задана)
case os:getenv("MNESIA_EXTRA_DB_NODES") of
false -> ok;
NodesStr ->
Nodes = [list_to_atom(N) || N <- string:tokens(NodesStr, ",")],
application:set_env(eventhub, extra_db_nodes, Nodes)
end,
ok = infra_mnesia:init_tables(), ok = infra_mnesia:init_tables(),
ok = infra_mnesia:wait_for_tables(), ok = infra_mnesia:wait_for_tables(),
% Включаем авто‑обнаружение только в режиме remote/swarm % Включаем авто‑обнаружение только в режиме remote/swarm
ClusterMode = os:getenv("CLUSTER_MODE", "local"), ClusterMode = os:getenv("CLUSTER_MODE", "local"),
if ClusterMode =:= "swarm" orelse ClusterMode =:= "remote" -> case ClusterMode of
spawn(fun cluster_discovery:discover/0); "swarm" -> cluster_discovery:discover_and_replicate();
true -> "remote" -> cluster_discovery:discover_and_replicate();
% Локальный запуск подключаемся по старинке или вообще не подключаемся _ -> ok
ok
end, end,
start_http(), % Пользовательский API (8080) start_http(), % Пользовательский API (8080)
start_admin_http(), % Административный API (8445) start_admin_http(), % Административный API (8445)

View File

@@ -1,36 +1,64 @@
%% ===================================================================
%% EventHub Cluster Discovery (с репликацией, задача #14)
%% ===================================================================
-module(cluster_discovery). -module(cluster_discovery).
-export([discover/0]).
-define(DNS_ALIAS, "eventhub-node"). -export([discover_and_replicate/0]).
-define(RETRY_INTERVAL, 5000).
discover() -> %% ------------------------------------------------------------------
io:format("Starting cluster DNS discovery via epmd (~s)...~n", [?DNS_ALIAS]), %% @doc Основная точка входа: запускает процесс обнаружения и репликации
discover_loop(). %% ------------------------------------------------------------------
discover_and_replicate() ->
spawn(fun() ->
io:format("Cluster discovery started (mode: ~s)~n", [os:getenv("CLUSTER_MODE", "local")]),
discover_loop()
end).
%% ------------------------------------------------------------------
%% Внутренние функции
%% ------------------------------------------------------------------
discover_loop() -> discover_loop() ->
case inet:getaddrs(?DNS_ALIAS, inet) of % Получаем список IP-адресов через DNS
{ok, IPs} when is_list(IPs) -> case inet_res:getbyname("eventhub-node", in, a) of
lists:foreach(fun(IP) -> {ok, {hostent, _, _, in, 4, Ips}} ->
IPStr = inet:ntoa(IP), Nodes = [list_to_atom("eventhub@" ++ inet:ntoa(Ip)) || Ip <- Ips],
%% io:format("Checking epmd on ~s...~n", [IPStr]), io:format("Discovered nodes: ~p~n", [Nodes]),
case erl_epmd:names(IP) of lists:foreach(fun(Node) ->
{ok, List} -> case net_kernel:connect_node(Node) of
lists:foreach(fun({Name, _Port}) -> true ->
Node = list_to_atom(Name ++ "@" ++ Name), io:format("Connected to ~p, joining Mnesia cluster...~n", [Node]),
%% io:format(" Trying net_kernel:connect_node(~s)...~n", [Node]), join_and_replicate(Node);
case net_kernel:connect_node(Node) of false ->
true -> ok; %io:format(" *** Connected to ~s ***~n", [Node]); io:format("Failed to connect to ~p~n", [Node])
false -> io:format(" *** Failed to connect to ~s ***~n", [Node]);
ignored -> ok
end
end, List);
{error, Reason} ->
io:format(" epmd error on ~s: ~p~n", [IPStr, Reason])
end end
end, IPs); end, Nodes);
{error, Reason} -> {error, Reason} ->
io:format("DNS lookup failed (~p), retrying...~n", [Reason]) io:format("DNS lookup failed: ~p, retrying in 5s...~n", [Reason]),
end, timer:sleep(5000),
timer:sleep(?RETRY_INTERVAL), discover_loop()
discover_loop(). end.
%% ------------------------------------------------------------------
%% @doc Добавляет удалённый узел в Mnesia и реплицирует данные
%% ------------------------------------------------------------------
join_and_replicate(Node) ->
% Добавляем узел в список extra_db_nodes, чтобы Mnesia знала о нём
ExtraNodes = application:get_env(eventhub, extra_db_nodes, []),
application:set_env(eventhub, extra_db_nodes, [Node | ExtraNodes]),
% Подключаем узел к кластеру Mnesia
mnesia:change_config(extra_db_nodes, [Node]),
% Реплицируем все пользовательские таблицы на себя
Tables = mnesia:system_info(tables) -- [schema],
lists:foreach(fun(Tab) ->
case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of
false ->
io:format("Adding local disc copy of table ~p...~n", [Tab]),
mnesia:add_table_copy(Tab, node(), disc_copies);
true ->
ok
end
end, Tables),
ok.

View File

@@ -1,5 +1,5 @@
%% =================================================================== %% ===================================================================
%% EventHub infra_mnesia (стабильная версия с автоочисткой при fresh старте) %% EventHub infra_mnesia (финальная рабочая версия, задача #14)
%% =================================================================== %% ===================================================================
-module(infra_mnesia). -module(infra_mnesia).
-behaviour(gen_server). -behaviour(gen_server).
@@ -20,6 +20,9 @@
admin_audit, notification admin_audit, notification
]). ]).
%% Таблицы, которые должны иметь дисковые копии на каждом узле
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
-define(TABLE_WAIT_TIMEOUT, 5000). -define(TABLE_WAIT_TIMEOUT, 5000).
%% =================================================================== %% ===================================================================
@@ -43,8 +46,13 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call(init_tables, _From, State) -> handle_call(init_tables, _From, State) ->
ok = maybe_recreate_schema(), ExtraNodes = application:get_env(eventhub, extra_db_nodes, []),
ok = ensure_cluster_join(), case ExtraNodes of
[] ->
ok = maybe_recreate_schema();
_ ->
ok = join_cluster(ExtraNodes)
end,
lists:foreach(fun create_table/1, ?TABLES), lists:foreach(fun create_table/1, ?TABLES),
ok = create_indices(), ok = create_indices(),
{reply, ok, State}; {reply, ok, State};
@@ -59,7 +67,7 @@ terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% =================================================================== %% ===================================================================
%% Проверка директории Mnesia и при необходимости пересоздание схемы %% Локальное создание схемы (только если нет extra_db_nodes)
%% =================================================================== %% ===================================================================
maybe_recreate_schema() -> maybe_recreate_schema() ->
@@ -81,23 +89,61 @@ maybe_recreate_schema() ->
end. end.
%% =================================================================== %% ===================================================================
%% Кластер %% Присоединение к кластеру добавляет локальные дисковые копии таблиц
%% =================================================================== %% ===================================================================
ensure_cluster_join() -> join_cluster(Nodes) ->
ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), case mnesia:system_info(is_running) of
case ExtraNodes of yes -> mnesia:stop();
[] -> ok; no -> ok
Nodes -> end,
ok = mnesia:change_config(extra_db_nodes, Nodes), application:set_env(mnesia, extra_db_nodes, Nodes),
case lists:member(node(), mnesia:table_info(schema, disc_copies)) of mnesia:start(),
false -> mnesia:add_table_copy(schema, node(), disc_copies); ensure_schema_disc(),
true -> ok % Дожидаемся появления всех таблиц в локальной схеме
end wait_for_tables_available(),
lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES).
wait_for_tables_available() ->
lists:foreach(fun(Tab) ->
wait_for_table(Tab)
end, ?DISC_TABLES).
wait_for_table(Tab) ->
case lists:member(Tab, mnesia:system_info(tables)) of
true -> ok;
false ->
timer:sleep(100),
wait_for_table(Tab)
end.
ensure_schema_disc() ->
case lists:member(node(), mnesia:table_info(schema, disc_copies)) of
false ->
io:format("Changing schema copy to disc...~n"),
case mnesia:change_table_copy_type(schema, node(), disc_copies) of
{atomic, ok} -> ok;
{aborted, {already_exists, _, _}} -> ok;
{aborted, Reason} -> error({failed_schema_disc, Reason})
end;
true -> ok
end.
add_local_disc_copy(Tab) ->
case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of
false ->
io:format("Adding local disc copy of table ~p...~n", [Tab]),
case mnesia:add_table_copy(Tab, node(), disc_copies) of
{atomic, ok} -> ok;
{aborted, {already_exists, _}} -> ok;
{aborted, Reason} ->
io:format("Could not add disc copy for ~p: ~p~n", [Tab, Reason])
end;
true -> ok
end. end.
%% =================================================================== %% ===================================================================
%% Создание таблиц %% Создание / открытие таблиц
%% =================================================================== %% ===================================================================
create_table(Table) -> create_table(Table) ->