From 83ce92afa41fe1a7d9a6a03ca369676dd60512d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=A1=D0=B0?= =?UTF-8?q?=D0=B1=D0=B8=D0=BB=D0=B8=D0=BD?= Date: Sun, 3 May 2026 21:02:29 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B0=D1=81=D1=82=D1=80=D0=BE=D0=B9?= =?UTF-8?q?=D0=BA=D0=B0=20=D1=80=D0=B5=D0=BF=D0=BB=D0=B8=D0=BA=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D0=B8.=20=D0=A4=D0=B8=D0=BD=D0=B0=D0=BB=20https://git.sa?= =?UTF-8?q?bilin.com/EventHub/EventHubBack/issues/14?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 8 +-- docker/docker-compose.swarm.yml | 2 + src/eventhub_app.erl | 64 +++++++++++++++++------- src/infra/cluster_discovery.erl | 87 ++++++++++++++------------------- src/infra/infra_mnesia.erl | 80 ++++++++++++++++++++++-------- 5 files changed, 148 insertions(+), 93 deletions(-) diff --git a/Makefile b/Makefile index 9ed9251..e8248b9 100644 --- a/Makefile +++ b/Makefile @@ -278,9 +278,9 @@ docker-clean: docker-stop ## Очистить Docker образы и volumes @echo "✅ Docker очищен" docker-swarm-deploy: ## Запустить кластер - RELEASE_COOKIE=$$(grep RELEASE_COOKIE docker/.env | cut -d '=' -f2) \ - JWT_SECRET=$$(grep JWT_SECRET docker/.env | cut -d '=' -f2) \ - docker stack deploy -c docker/docker-compose.swarm.yml eventhub + @set RELEASE_COOKIE=$$(grep RELEASE_COOKIE docker/.env | cut -d '=' -f2) + @set JWT_SECRET=$$(grep JWT_SECRET docker/.env | cut -d '=' -f2) + @docker stack deploy -c docker/docker-compose.swarm.yml eventhub --detach=true @echo "✅ Кластер запущен" docker-swarm-stop: ## Запустить кластер @@ -298,6 +298,8 @@ docker-swarm-status: ## Показать состояние кластера @docker exec $$(docker ps -qf "name=eventhub_eventhub" | head -n 1) /app/bin/eventhub eval 'length(nodes()).' @echo "Список узлов:" @docker exec $$(docker ps -qf "name=eventhub_eventhub" | head -n 1) /app/bin/eventhub eval 'nodes().' + @echo "Список узлов Mnesia:" + @docker exec $$(docker ps -qf "name=eventhub_eventhub" | head -n 1) /app/bin/eventhub eval 'mnesia:table_info(user, disc_copies).' docker-swarm-reg-admin: @docker exec $$(docker ps -qf "name=eventhub_eventhub" | head -n 1) /app/bin/eventhub eval 'core_admin:create(<<"admin">>,<<"123456">>,superadmin).' diff --git a/docker/docker-compose.swarm.yml b/docker/docker-compose.swarm.yml index 639b766..1a835ef 100644 --- a/docker/docker-compose.swarm.yml +++ b/docker/docker-compose.swarm.yml @@ -64,6 +64,8 @@ services: - ADMIN_MODER_PASSWORD=${ADMIN_MODER_PASSWORD:-Moderator123!} - ADMIN_SUPPORT_EMAIL=${ADMIN_SUPPORT_EMAIL:-support@eventhub.local} - ADMIN_SUPPORT_PASSWORD=${ADMIN_SUPPORT_PASSWORD:-Support123!} + - CLUSTER_MODE=true + - DNS_NAME=eventhub-node networks: eventhub-net: aliases: diff --git a/src/eventhub_app.erl b/src/eventhub_app.erl index 0545e2e..7ee391a 100644 --- a/src/eventhub_app.erl +++ b/src/eventhub_app.erl @@ -8,22 +8,40 @@ start(_StartType, _StartArgs) -> application:ensure_all_started(cowboy), case infra_sup:start_link() of {ok, Pid} -> - % Чтение extra_db_nodes из переменной окружения (если задана) - case os:getenv("MNESIA_EXTRA_DB_NODES") of - false -> ok; - NodesStr -> - Nodes = [list_to_atom(N) || N <- string:tokens(NodesStr, ",")], + % Определяем список узлов кластера, если режим CLUSTER_MODE=true + Nodes = case os:getenv("CLUSTER_MODE", "false") of + "true" -> + DnsName = os:getenv("DNS_NAME", "eventhub-node"), + try inet:getaddrs(DnsName, inet) of + {ok, IPs} when is_list(IPs), IPs /= [] -> + % Получаем имена всех узлов Erlang, зарегистрированных в EPMD + AllNodes = lists:flatmap(fun(IP) -> + case erl_epmd:names(IP) of + {ok, Names} -> + [list_to_atom(Name ++ "@" ++ Name) || {Name, _Port} <- Names, + lists:prefix("eventhub-node", Name)]; + _ -> [] + end + end, IPs), + % Исключаем свой узел, чтобы не подключаться к самому себе + AllNodes -- [node()]; + _ -> [] + catch + _:_ -> + io:format("DNS lookup failed, starting as first node~n"), + [] + end; + _ -> [] + end, + case Nodes of + [] -> + io:format("Cluster: no nodes found or first node~n"); + _ -> + io:format("Cluster: discovered nodes ~p, joining cluster~n", [Nodes]), application:set_env(eventhub, extra_db_nodes, Nodes) end, ok = infra_mnesia:init_tables(), ok = infra_mnesia:wait_for_tables(), - % Включаем авто‑обнаружение только в режиме remote/swarm - ClusterMode = os:getenv("CLUSTER_MODE", "local"), - case ClusterMode of - "swarm" -> cluster_discovery:discover_and_replicate(); - "remote" -> cluster_discovery:discover_and_replicate(); - _ -> ok - end, start_http(), % Пользовательский API (8080) start_admin_http(), % Административный API (8445) application:ensure_all_started(prometheus), @@ -40,7 +58,7 @@ stop(_State) -> ok. %% Пользовательский HTTP (порт 8080) — только публичные эндпоинты %% =================================================================== start_http() -> - Port = application:get_env(eventhub, http_port, 8080), + Port = get_env_int(http_port, 8080), Dispatch = cowboy_router:compile([ {'_', [ {"/metrics/[:registry]", prometheus_cowboy2_handler, []}, @@ -77,7 +95,7 @@ start_http() -> %% Административный HTTP (порт 8445) — все админские эндпоинты %% =================================================================== start_admin_http() -> - Port = application:get_env(eventhub, admin_http_port, 8445), + PortAdmin = get_env_int(admin_http_port, 8445), Dispatch = cowboy_router:compile([ {'_', [ % ================== БАЗОВЫЕ ================== @@ -114,18 +132,20 @@ start_admin_http() -> Middlewares = [cowboy_router, cowboy_handler], Env = #{dispatch => Dispatch}, - cowboy:start_clear(admin_http, [{port, Port}], #{env => Env, middlewares => Middlewares}), - io:format("Admin HTTP server started on port ~p~n", [Port]), + cowboy:start_clear(admin_http, [{port, PortAdmin}], #{env => Env, middlewares => Middlewares}), + io:format("Admin HTTP server started on port ~p~n", [PortAdmin]), % WebSocket для пользователей WsDispatch = cowboy_router:compile([{'_', [{"/ws", ws_handler, []}]}]), - cowboy:start_clear(ws, [{port, 8081}], #{env => #{dispatch => WsDispatch}}), + PortWs = get_env_int(ws_port, 8081), + cowboy:start_clear(ws, [{port, PortWs}], #{env => #{dispatch => WsDispatch}}), % WebSocket для админов AdminWsDispatch = cowboy_router:compile([{'_', [{"/admin/ws", admin_ws_handler, []}]}]), - cowboy:start_clear(admin_ws, [{port, 8446}], #{env => #{dispatch => AdminWsDispatch}}), + PortAdminWs = get_env_int(admin_ws_port, 8446), + cowboy:start_clear(admin_ws, [{port, PortAdminWs}], #{env => #{dispatch => AdminWsDispatch}}), - io:format("WebSocket started on ports 8081 (user) and 8446 (admin)~n"). + io:format("WebSocket started on ports ~p (user) and ~p (admin)~n", [PortWs, PortAdminWs]). %% ---------- Инициализация администраторов ---------- init_default_admins() -> @@ -156,4 +176,10 @@ init_default_admins() -> io:format("Default support created: ~s~n", [SupportEmail]); _ -> io:format("Admins already exist. Skipping creation.~n") + end. + +get_env_int(Key, Default) -> + case application:get_env(eventhub, Key, Default) of + Val when is_list(Val) -> list_to_integer(Val); + Val when is_integer(Val) -> Val end. \ No newline at end of file diff --git a/src/infra/cluster_discovery.erl b/src/infra/cluster_discovery.erl index dd240a4..db1b3c6 100644 --- a/src/infra/cluster_discovery.erl +++ b/src/infra/cluster_discovery.erl @@ -1,64 +1,51 @@ -%% =================================================================== -%% EventHub – Cluster Discovery (с репликацией, задача #14) -%% =================================================================== -module(cluster_discovery). - -export([discover_and_replicate/0]). -%% ------------------------------------------------------------------ -%% @doc Основная точка входа: запускает процесс обнаружения и репликации -%% ------------------------------------------------------------------ -discover_and_replicate() -> - spawn(fun() -> - io:format("Cluster discovery started (mode: ~s)~n", [os:getenv("CLUSTER_MODE", "local")]), - discover_loop() - end). +-define(DNS_ALIAS, get_dns_name()). +-define(RETRY_INTERVAL, 5000). -%% ------------------------------------------------------------------ -%% Внутренние функции -%% ------------------------------------------------------------------ +discover_and_replicate() -> + io:format("Starting cluster DNS discovery via epmd (~s)...~n", [?DNS_ALIAS]), + discover_loop(). discover_loop() -> - % Получаем список IP-адресов через DNS - case inet_res:getbyname("eventhub-node", in, a) of - {ok, {hostent, _, _, in, 4, Ips}} -> - Nodes = [list_to_atom("eventhub@" ++ inet:ntoa(Ip)) || Ip <- Ips], - io:format("Discovered nodes: ~p~n", [Nodes]), - lists:foreach(fun(Node) -> - case net_kernel:connect_node(Node) of - true -> - io:format("Connected to ~p, joining Mnesia cluster...~n", [Node]), - join_and_replicate(Node); - false -> - io:format("Failed to connect to ~p~n", [Node]) + case inet:getaddrs(?DNS_ALIAS, inet) of + {ok, IPs} when is_list(IPs) -> + lists:foreach(fun(IP) -> + IPStr = inet:ntoa(IP), + io:format("Checking epmd on ~s...~n", [IPStr]), + case erl_epmd:names(IP) of + {ok, List} -> + lists:foreach(fun({Name, _Port}) -> + Node = list_to_atom(Name ++ "@" ++ Name), + io:format(" Trying net_kernel:connect_node(~s)...~n", [Node]), + case net_kernel:connect_node(Node) of + true -> io:format(" *** Connected to ~s ***~n", [Node]), join_and_replicate(Node); %io:format(" *** Connected to ~s ***~n", [Node]); + false -> io:format(" *** Failed to connect to ~s ***~n", [Node]); + ignored -> ok + end + end, List); + {error, Reason} -> + io:format(" epmd error on ~s: ~p~n", [IPStr, Reason]) end - end, Nodes); + end, IPs); {error, Reason} -> - io:format("DNS lookup failed: ~p, retrying in 5s...~n", [Reason]), - timer:sleep(5000), - discover_loop() - end. + io:format("DNS lookup failed (~p), retrying...~n", [Reason]) + end, + timer:sleep(?RETRY_INTERVAL), + discover_loop(). %% ------------------------------------------------------------------ %% @doc Добавляет удалённый узел в Mnesia и реплицирует данные %% ------------------------------------------------------------------ join_and_replicate(Node) -> - % Добавляем узел в список extra_db_nodes, чтобы Mnesia знала о нём - ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), - application:set_env(eventhub, extra_db_nodes, [Node | ExtraNodes]), + case lists:member(Node, mnesia:system_info(db_nodes)) of + true -> + io:format("Node ~p already in Mnesia cluster, skipping~n", [Node]); + false -> + io:format("Adding node ~p to Mnesia cluster...~n", [Node]), + infra_mnesia:add_cluster_nodes([Node]) + end. - % Подключаем узел к кластеру Mnesia - mnesia:change_config(extra_db_nodes, [Node]), - - % Реплицируем все пользовательские таблицы на себя - Tables = mnesia:system_info(tables) -- [schema], - lists:foreach(fun(Tab) -> - case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of - false -> - io:format("Adding local disc copy of table ~p...~n", [Tab]), - mnesia:add_table_copy(Tab, node(), disc_copies); - true -> - ok - end - end, Tables), - ok. \ No newline at end of file +get_dns_name() -> + os:getenv("DNS_NAME", "eventhub-node"). % значение по умолчанию оставим \ No newline at end of file diff --git a/src/infra/infra_mnesia.erl b/src/infra/infra_mnesia.erl index 0048a72..167a458 100644 --- a/src/infra/infra_mnesia.erl +++ b/src/infra/infra_mnesia.erl @@ -1,5 +1,5 @@ %% =================================================================== -%% EventHub – infra_mnesia (финальная рабочая версия, задача #14) +%% EventHub – infra_mnesia (финальная версия с автоочисткой кластера) %% =================================================================== -module(infra_mnesia). -behaviour(gen_server). @@ -7,6 +7,7 @@ -include("records.hrl"). -export([start_link/0, init_tables/0, wait_for_tables/0]). +-export([add_cluster_nodes/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -20,10 +21,9 @@ admin_audit, notification ]). -%% Таблицы, которые должны иметь дисковые копии на каждом узле -define(DISC_TABLES, ?TABLES -- [session, admin_session]). - -define(TABLE_WAIT_TIMEOUT, 5000). +-define(CLEANUP_INTERVAL, 30000). % 30 секунд %% =================================================================== %% API @@ -38,6 +38,9 @@ init_tables() -> wait_for_tables() -> gen_server:call(?MODULE, wait_for_tables). +add_cluster_nodes(Nodes) -> + gen_server:call(?MODULE, {add_nodes, Nodes}). + %% =================================================================== %% gen_server callbacks %% =================================================================== @@ -55,6 +58,11 @@ handle_call(init_tables, _From, State) -> end, lists:foreach(fun create_table/1, ?TABLES), ok = create_indices(), + ok = start_cleanup_timer(), + {reply, ok, State}; + +handle_call({add_nodes, Nodes}, _From, State) -> + ok = do_add_nodes(Nodes), {reply, ok, State}; handle_call(wait_for_tables, _From, State) -> @@ -62,12 +70,17 @@ handle_call(wait_for_tables, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. +handle_info({cleanup}, State) -> + prune_dead_nodes(), + erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. + terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% =================================================================== -%% Локальное создание схемы (только если нет extra_db_nodes) +%% Управление схемой %% =================================================================== maybe_recreate_schema() -> @@ -88,10 +101,6 @@ maybe_recreate_schema() -> end end. -%% =================================================================== -%% Присоединение к кластеру – добавляет локальные дисковые копии таблиц -%% =================================================================== - join_cluster(Nodes) -> case mnesia:system_info(is_running) of yes -> mnesia:stop(); @@ -100,22 +109,16 @@ join_cluster(Nodes) -> application:set_env(mnesia, extra_db_nodes, Nodes), mnesia:start(), ensure_schema_disc(), - % Дожидаемся появления всех таблиц в локальной схеме wait_for_tables_available(), lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES). -wait_for_tables_available() -> - lists:foreach(fun(Tab) -> - wait_for_table(Tab) - end, ?DISC_TABLES). - -wait_for_table(Tab) -> - case lists:member(Tab, mnesia:system_info(tables)) of - true -> ok; - false -> - timer:sleep(100), - wait_for_table(Tab) - end. +do_add_nodes(Nodes) -> + ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), + application:set_env(eventhub, extra_db_nodes, Nodes ++ ExtraNodes), + {ok, _} = mnesia:change_config(extra_db_nodes, Nodes), + ensure_schema_disc(), + wait_for_tables_available(), + lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES). ensure_schema_disc() -> case lists:member(node(), mnesia:table_info(schema, disc_copies)) of @@ -142,6 +145,41 @@ add_local_disc_copy(Tab) -> true -> ok end. +wait_for_tables_available() -> + lists:foreach(fun(Tab) -> wait_for_table(Tab) end, ?DISC_TABLES). + +wait_for_table(Tab) -> + case lists:member(Tab, mnesia:system_info(tables)) of + true -> ok; + false -> + timer:sleep(100), + wait_for_table(Tab) + end. + +%% =================================================================== +%% Автоматическая очистка мёртвых узлов +%% =================================================================== + +start_cleanup_timer() -> + erlang:send_after(?CLEANUP_INTERVAL, self(), {cleanup}), + ok. + +prune_dead_nodes() -> + AliveNodes = lists:filter(fun(Node) -> + Node =/= node() andalso net_adm:ping(Node) =:= pong + end, mnesia:system_info(db_nodes)), + DeadNodes = mnesia:system_info(db_nodes) -- [node() | AliveNodes], + lists:foreach(fun(Node) -> + io:format("Removing dead node ~p from Mnesia schema...~n", [Node]), + lists:foreach(fun(Tab) -> + case lists:member(Node, mnesia:table_info(Tab, disc_copies)) of + true -> catch mnesia:del_table_copy(Tab, Node); + false -> ok + end + end, ?DISC_TABLES), + catch mnesia:del_table_copy(schema, Node) + end, DeadNodes). + %% =================================================================== %% Создание / открытие таблиц %% ===================================================================