diff --git a/.gitignore b/.gitignore index e37b086..b8b2ca7 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ rebar3.crashdump /.tool-versions /rebar.lock /build/ -docker/.env \ No newline at end of file +docker/.env +/Mnesia.*/ \ No newline at end of file diff --git a/Makefile b/Makefile index e6ad211..9ed9251 100644 --- a/Makefile +++ b/Makefile @@ -76,6 +76,7 @@ stop: ## Остановить приложение @echo "Остановка приложения..." @pkill -f "rebar3 shell --sname $(SNAME)" || true @pkill -f "beam.*$(SNAME)" || true + @pkill -f beam.smp || true @echo "✓ Приложение остановлено" restart: stop run ## Перезапустить приложение diff --git a/src/config/sys.config b/src/config/sys.config index 128d631..2c32a2a 100644 --- a/src/config/sys.config +++ b/src/config/sys.config @@ -1,14 +1,11 @@ [ {eventhub, [ - {http_port, 8080}, - {ws_port, 8081}, - {admin_http_port, 8445}, - {admin_ws_port, 8446}, + {http_port, "${HTTP_PORT:-8080}"}, + {ws_port, "${WS_PORT:-8081}"}, + {admin_http_port, "${ADMIN_HTTP_PORT:-8445}"}, + {admin_ws_port, "${ADMIN_WS_PORT:-8446}"}, {jwt_secret, <<"${JWT_SECRET:-change_me_in_production}">>} ]}, - {mnesia, [ - {dir, "${MNESIA_DIR:-Mnesia.${NODE}}"} - ]}, {kernel, [ {logger_level, info}, {logger, [ diff --git a/src/eventhub_app.erl b/src/eventhub_app.erl index e1395e4..0545e2e 100644 --- a/src/eventhub_app.erl +++ b/src/eventhub_app.erl @@ -8,15 +8,21 @@ start(_StartType, _StartArgs) -> application:ensure_all_started(cowboy), case infra_sup:start_link() of {ok, Pid} -> + % Чтение extra_db_nodes из переменной окружения (если задана) + case os:getenv("MNESIA_EXTRA_DB_NODES") of + false -> ok; + NodesStr -> + Nodes = [list_to_atom(N) || N <- string:tokens(NodesStr, ",")], + application:set_env(eventhub, extra_db_nodes, Nodes) + end, ok = infra_mnesia:init_tables(), ok = infra_mnesia:wait_for_tables(), % Включаем авто‑обнаружение только в режиме remote/swarm ClusterMode = os:getenv("CLUSTER_MODE", "local"), - if ClusterMode =:= "swarm" orelse ClusterMode =:= "remote" -> - spawn(fun cluster_discovery:discover/0); - true -> - % Локальный запуск – подключаемся по старинке или вообще не подключаемся - ok + case ClusterMode of + "swarm" -> cluster_discovery:discover_and_replicate(); + "remote" -> cluster_discovery:discover_and_replicate(); + _ -> ok end, start_http(), % Пользовательский API (8080) start_admin_http(), % Административный API (8445) diff --git a/src/infra/cluster_discovery.erl b/src/infra/cluster_discovery.erl index ef11faa..dd240a4 100644 --- a/src/infra/cluster_discovery.erl +++ b/src/infra/cluster_discovery.erl @@ -1,36 +1,64 @@ +%% =================================================================== +%% EventHub – Cluster Discovery (с репликацией, задача #14) +%% =================================================================== -module(cluster_discovery). --export([discover/0]). --define(DNS_ALIAS, "eventhub-node"). --define(RETRY_INTERVAL, 5000). +-export([discover_and_replicate/0]). -discover() -> - io:format("Starting cluster DNS discovery via epmd (~s)...~n", [?DNS_ALIAS]), - discover_loop(). +%% ------------------------------------------------------------------ +%% @doc Основная точка входа: запускает процесс обнаружения и репликации +%% ------------------------------------------------------------------ +discover_and_replicate() -> + spawn(fun() -> + io:format("Cluster discovery started (mode: ~s)~n", [os:getenv("CLUSTER_MODE", "local")]), + discover_loop() + end). + +%% ------------------------------------------------------------------ +%% Внутренние функции +%% ------------------------------------------------------------------ discover_loop() -> - case inet:getaddrs(?DNS_ALIAS, inet) of - {ok, IPs} when is_list(IPs) -> - lists:foreach(fun(IP) -> - IPStr = inet:ntoa(IP), -%% io:format("Checking epmd on ~s...~n", [IPStr]), - case erl_epmd:names(IP) of - {ok, List} -> - lists:foreach(fun({Name, _Port}) -> - Node = list_to_atom(Name ++ "@" ++ Name), -%% io:format(" Trying net_kernel:connect_node(~s)...~n", [Node]), - case net_kernel:connect_node(Node) of - true -> ok; %io:format(" *** Connected to ~s ***~n", [Node]); - false -> io:format(" *** Failed to connect to ~s ***~n", [Node]); - ignored -> ok - end - end, List); - {error, Reason} -> - io:format(" epmd error on ~s: ~p~n", [IPStr, Reason]) + % Получаем список IP-адресов через DNS + case inet_res:getbyname("eventhub-node", in, a) of + {ok, {hostent, _, _, in, 4, Ips}} -> + Nodes = [list_to_atom("eventhub@" ++ inet:ntoa(Ip)) || Ip <- Ips], + io:format("Discovered nodes: ~p~n", [Nodes]), + lists:foreach(fun(Node) -> + case net_kernel:connect_node(Node) of + true -> + io:format("Connected to ~p, joining Mnesia cluster...~n", [Node]), + join_and_replicate(Node); + false -> + io:format("Failed to connect to ~p~n", [Node]) end - end, IPs); + end, Nodes); {error, Reason} -> - io:format("DNS lookup failed (~p), retrying...~n", [Reason]) - end, - timer:sleep(?RETRY_INTERVAL), - discover_loop(). \ No newline at end of file + io:format("DNS lookup failed: ~p, retrying in 5s...~n", [Reason]), + timer:sleep(5000), + discover_loop() + end. + +%% ------------------------------------------------------------------ +%% @doc Добавляет удалённый узел в Mnesia и реплицирует данные +%% ------------------------------------------------------------------ +join_and_replicate(Node) -> + % Добавляем узел в список extra_db_nodes, чтобы Mnesia знала о нём + ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), + application:set_env(eventhub, extra_db_nodes, [Node | ExtraNodes]), + + % Подключаем узел к кластеру Mnesia + mnesia:change_config(extra_db_nodes, [Node]), + + % Реплицируем все пользовательские таблицы на себя + Tables = mnesia:system_info(tables) -- [schema], + lists:foreach(fun(Tab) -> + case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of + false -> + io:format("Adding local disc copy of table ~p...~n", [Tab]), + mnesia:add_table_copy(Tab, node(), disc_copies); + true -> + ok + end + end, Tables), + ok. \ No newline at end of file diff --git a/src/infra/infra_mnesia.erl b/src/infra/infra_mnesia.erl index eaa9c3e..0048a72 100644 --- a/src/infra/infra_mnesia.erl +++ b/src/infra/infra_mnesia.erl @@ -1,5 +1,5 @@ %% =================================================================== -%% EventHub – infra_mnesia (стабильная версия с автоочисткой при fresh старте) +%% EventHub – infra_mnesia (финальная рабочая версия, задача #14) %% =================================================================== -module(infra_mnesia). -behaviour(gen_server). @@ -20,6 +20,9 @@ admin_audit, notification ]). +%% Таблицы, которые должны иметь дисковые копии на каждом узле +-define(DISC_TABLES, ?TABLES -- [session, admin_session]). + -define(TABLE_WAIT_TIMEOUT, 5000). %% =================================================================== @@ -43,8 +46,13 @@ init([]) -> {ok, #{}}. handle_call(init_tables, _From, State) -> - ok = maybe_recreate_schema(), - ok = ensure_cluster_join(), + ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), + case ExtraNodes of + [] -> + ok = maybe_recreate_schema(); + _ -> + ok = join_cluster(ExtraNodes) + end, lists:foreach(fun create_table/1, ?TABLES), ok = create_indices(), {reply, ok, State}; @@ -59,7 +67,7 @@ terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% =================================================================== -%% Проверка директории Mnesia и при необходимости пересоздание схемы +%% Локальное создание схемы (только если нет extra_db_nodes) %% =================================================================== maybe_recreate_schema() -> @@ -81,23 +89,61 @@ maybe_recreate_schema() -> end. %% =================================================================== -%% Кластер +%% Присоединение к кластеру – добавляет локальные дисковые копии таблиц %% =================================================================== -ensure_cluster_join() -> - ExtraNodes = application:get_env(eventhub, extra_db_nodes, []), - case ExtraNodes of - [] -> ok; - Nodes -> - ok = mnesia:change_config(extra_db_nodes, Nodes), - case lists:member(node(), mnesia:table_info(schema, disc_copies)) of - false -> mnesia:add_table_copy(schema, node(), disc_copies); - true -> ok - end +join_cluster(Nodes) -> + case mnesia:system_info(is_running) of + yes -> mnesia:stop(); + no -> ok + end, + application:set_env(mnesia, extra_db_nodes, Nodes), + mnesia:start(), + ensure_schema_disc(), + % Дожидаемся появления всех таблиц в локальной схеме + wait_for_tables_available(), + lists:foreach(fun add_local_disc_copy/1, ?DISC_TABLES). + +wait_for_tables_available() -> + lists:foreach(fun(Tab) -> + wait_for_table(Tab) + end, ?DISC_TABLES). + +wait_for_table(Tab) -> + case lists:member(Tab, mnesia:system_info(tables)) of + true -> ok; + false -> + timer:sleep(100), + wait_for_table(Tab) + end. + +ensure_schema_disc() -> + case lists:member(node(), mnesia:table_info(schema, disc_copies)) of + false -> + io:format("Changing schema copy to disc...~n"), + case mnesia:change_table_copy_type(schema, node(), disc_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, _, _}} -> ok; + {aborted, Reason} -> error({failed_schema_disc, Reason}) + end; + true -> ok + end. + +add_local_disc_copy(Tab) -> + case lists:member(node(), mnesia:table_info(Tab, disc_copies)) of + false -> + io:format("Adding local disc copy of table ~p...~n", [Tab]), + case mnesia:add_table_copy(Tab, node(), disc_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Reason} -> + io:format("Could not add disc copy for ~p: ~p~n", [Tab, Reason]) + end; + true -> ok end. %% =================================================================== -%% Создание таблиц +%% Создание / открытие таблиц %% =================================================================== create_table(Table) ->