From 12a9f5a1a6ae9504c4b9150de025ec034e21f948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=A1=D0=B0?= =?UTF-8?q?=D0=B1=D0=B8=D0=BB=D0=B8=D0=BD?= Date: Mon, 4 May 2026 23:15:59 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D1=81=D1=85=D0=B5=D0=BC=D1=8B=20=D0=B4?= =?UTF-8?q?=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=B1=D0=B5=D0=B7=20=D0=BF?= =?UTF-8?q?=D0=BE=D1=82=D0=B5=D1=80=D1=8C=20https://git.sabilin.com/EventH?= =?UTF-8?q?ub/EventHubBack/issues/17?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/records.hrl | 5 + src/infra/infra_mnesia.erl | 5 +- src/infra/infra_sup.erl | 8 +- src/infra/migration_engine.erl | 120 ++++++++++++++++++ src/migrations/20260501120000_base_schema.erl | 6 + .../20260504150000_test_migration.erl | 11 ++ src/migrations/README.md | 23 ++++ 7 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 src/infra/migration_engine.erl create mode 100644 src/migrations/20260501120000_base_schema.erl create mode 100644 src/migrations/20260504150000_test_migration.erl create mode 100644 src/migrations/README.md diff --git a/include/records.hrl b/include/records.hrl index ebc66a5..a069897 100644 --- a/include/records.hrl +++ b/include/records.hrl @@ -241,4 +241,9 @@ entity_id :: binary(), value :: integer() | {integer(), integer()}, timestamp :: calendar:datetime() +}). + +-record(schema_migration, { + version :: string(), + applied_at :: calendar:datetime() }). \ No newline at end of file diff --git a/src/infra/infra_mnesia.erl b/src/infra/infra_mnesia.erl index c611a07..5c93eeb 100644 --- a/src/infra/infra_mnesia.erl +++ b/src/infra/infra_mnesia.erl @@ -19,7 +19,7 @@ review, report, banned_word, ticket, subscription, admin_audit, notification, - stats + stats, schema_migration ]). -define(DISC_TABLES, ?TABLES -- [session, admin_session]). @@ -61,6 +61,8 @@ handle_call(init_tables, _From, State) -> ok = create_indices(), ok = stats_collector:subscribe(), ok = start_cleanup_timer(), + ok = migration_engine:init_migrations_table(), + _ = migration_engine:apply_pending(), {reply, ok, State}; handle_call({add_nodes, Nodes}, _From, State) -> @@ -215,6 +217,7 @@ table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(f table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}]; table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}]; table_opts(stats) -> [{disc_copies, [node()]}, {attributes, record_info(fields, stats)}]; +table_opts(schema_migration) -> [{disc_copies, [node()]}, {attributes, record_info(fields, schema_migration)}]; table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}]; table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}]. diff --git a/src/infra/infra_sup.erl b/src/infra/infra_sup.erl index f73deb5..8215a99 100644 --- a/src/infra/infra_sup.erl +++ b/src/infra/infra_sup.erl @@ -32,6 +32,12 @@ init([]) -> restart => permanent, shutdown => 5000, type => worker, - modules => [archive_manager]} + modules => [archive_manager]}, + #{id => migration_engine, + start => {migration_engine, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [migration_engine]} ], {ok, {SupFlags, Children}}. \ No newline at end of file diff --git a/src/infra/migration_engine.erl b/src/infra/migration_engine.erl new file mode 100644 index 0000000..f1499cb --- /dev/null +++ b/src/infra/migration_engine.erl @@ -0,0 +1,120 @@ +-module(migration_engine). +-behaviour(gen_server). + +-include("records.hrl"). + +%% API +-export([start_link/0, init_migrations_table/0, apply_pending/0, + rollback/1, status/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(TABLE, schema_migrations). + +%% ------------------------------ +%% API +%% ------------------------------ + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init_migrations_table() -> + gen_server:call(?MODULE, init_table). + +apply_pending() -> + gen_server:call(?MODULE, apply_pending). + +rollback(Version) -> + gen_server:call(?MODULE, {rollback, Version}). + +status() -> + gen_server:call(?MODULE, status). + +%% ------------------------------ +%% gen_server callbacks +%% ------------------------------ + +init([]) -> + {ok, #{}}. + +handle_call(init_table, _From, State) -> + case lists:member(?TABLE, mnesia:system_info(tables)) of + true -> ok; + false -> + mnesia:create_table(?TABLE, [ + {disc_copies, [node()]}, + {attributes, record_info(fields, schema_migration)}, + {type, set} + ]) + end, + {reply, ok, State}; + +handle_call(apply_pending, _From, State) -> + Result = do_apply_pending(), + {reply, Result, State}; + +handle_call({rollback, Version}, _From, State) -> + Result = do_rollback(Version), + {reply, Result, State}; + +handle_call(status, _From, State) -> + Applied = applied_versions(), + Pending = pending_versions() -- Applied, + {reply, #{applied => lists:map(fun atom_to_list/1, Applied), + pending => lists:map(fun atom_to_list/1, Pending)}, State}. + +handle_cast(_Msg, State) -> {noreply, State}. +handle_info(_Msg, State) -> {noreply, State}. +terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%% ------------------------------ +%% Внутренняя логика +%% ------------------------------ + +do_apply_pending() -> + Pending = pending_versions() -- applied_versions(), + lists:foreach(fun(Module) -> code:ensure_loaded(Module) end, Pending), + lists:foldl(fun(Version, Acc) -> + try Version:up() of + _ -> mark_applied(Version), Acc + catch _:Reason -> + [{error, atom_to_list(Version), Reason} | Acc] + end + end, [], Pending). + +do_rollback(TargetStr) -> + Target = list_to_atom(TargetStr), + Applied = applied_versions(), + ToRollback = lists:sort(fun(A,B) -> A > B end, + [V || V <- Applied, V > Target]), + lists:foreach(fun(Version) -> + code:ensure_loaded(Version), + try Version:down() of + _ -> unmark_applied(Version) + catch _:Reason -> + throw({rollback_failed, atom_to_list(Version), Reason}) + end + end, ToRollback). + +applied_versions() -> + [list_to_atom(V) || #schema_migration{version = V} <- + mnesia:dirty_match_object(#schema_migration{_ = '_'})]. + +pending_versions() -> + AllMods = code:all_available(), + [list_to_atom(Module) || Module <- extract_module_names(AllMods), lists:prefix("20", Module)]. + +extract_module_names(ModInfoList) -> + [Name || {Name, _, _} <- ModInfoList]. + +mark_applied(Version) -> + mnesia:dirty_write(#schema_migration{ + version = atom_to_list(Version), + applied_at = calendar:local_time() + }). + +unmark_applied(Version) -> + mnesia:dirty_delete({?TABLE, atom_to_list(Version)}). \ No newline at end of file diff --git a/src/migrations/20260501120000_base_schema.erl b/src/migrations/20260501120000_base_schema.erl new file mode 100644 index 0000000..3401443 --- /dev/null +++ b/src/migrations/20260501120000_base_schema.erl @@ -0,0 +1,6 @@ +-module('20260501120000_base_schema'). + +-export([up/0, down/0]). + +up() -> ok. +down() -> ok. \ No newline at end of file diff --git a/src/migrations/20260504150000_test_migration.erl b/src/migrations/20260504150000_test_migration.erl new file mode 100644 index 0000000..f0c4948 --- /dev/null +++ b/src/migrations/20260504150000_test_migration.erl @@ -0,0 +1,11 @@ +-module('20260504150000_test_migration'). + +-export([up/0, down/0]). + +up() -> + io:format("Test migration UP applied!~n"), + ok. + +down() -> + io:format("Test migration DOWN applied!~n"), + ok. \ No newline at end of file diff --git a/src/migrations/README.md b/src/migrations/README.md new file mode 100644 index 0000000..8a9dff9 --- /dev/null +++ b/src/migrations/README.md @@ -0,0 +1,23 @@ +# Миграции схемы данных EventHub + +## Применение миграций +При старте приложения автоматически выполняются все неприменённые миграции. +Для ручного запуска можно вызвать: +migration_engine:apply_pending(). + +## Создание новой миграции +1. Создайте файл в `priv/migrations/` с именем вида `YYYYMMDDHHMMSS_описание.erl`. +2. Реализуйте поведение `db_migration` (функции `up/0` и `down/0`). +3. При очередном запуске приложения миграция будет применена. + +## Откат миграций + migration_engine:rollback("20260501120000_base_schema"). + +## Плавающее обновление (rolling update) +1. Переведите узел в режим обслуживания. +2. Выполните бэкап: `mnesia:backup("backup_node.bak")`. +3. Обновите код приложения (git pull / rsync). +4. Перезапустите узел – миграции применятся автоматически. +5. Убедитесь в согласованности данных. +6. Верните узел в работу. +7. Повторите для остальных узлов. \ No newline at end of file