Обновление схемы данных без потерь #17
This commit is contained in:
@@ -241,4 +241,9 @@
|
||||
entity_id :: binary(),
|
||||
value :: integer() | {integer(), integer()},
|
||||
timestamp :: calendar:datetime()
|
||||
}).
|
||||
|
||||
-record(schema_migration, {
|
||||
version :: string(),
|
||||
applied_at :: calendar:datetime()
|
||||
}).
|
||||
@@ -19,7 +19,7 @@
|
||||
review, report, banned_word,
|
||||
ticket, subscription,
|
||||
admin_audit, notification,
|
||||
stats
|
||||
stats, schema_migration
|
||||
]).
|
||||
|
||||
-define(DISC_TABLES, ?TABLES -- [session, admin_session]).
|
||||
@@ -61,6 +61,8 @@ handle_call(init_tables, _From, State) ->
|
||||
ok = create_indices(),
|
||||
ok = stats_collector:subscribe(),
|
||||
ok = start_cleanup_timer(),
|
||||
ok = migration_engine:init_migrations_table(),
|
||||
_ = migration_engine:apply_pending(),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call({add_nodes, Nodes}, _From, State) ->
|
||||
@@ -215,6 +217,7 @@ table_opts(subscription) -> [{disc_copies, [node()]}, {attributes, record_info(f
|
||||
table_opts(admin_audit) -> [{disc_copies, [node()]}, {attributes, record_info(fields, admin_audit)}];
|
||||
table_opts(notification) -> [{disc_copies, [node()]}, {attributes, record_info(fields, notification)}];
|
||||
table_opts(stats) -> [{disc_copies, [node()]}, {attributes, record_info(fields, stats)}];
|
||||
table_opts(schema_migration) -> [{disc_copies, [node()]}, {attributes, record_info(fields, schema_migration)}];
|
||||
table_opts(session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, session)}];
|
||||
table_opts(admin_session) -> [{ram_copies, [node()]}, {attributes, record_info(fields, admin_session)}].
|
||||
|
||||
|
||||
@@ -32,6 +32,12 @@ init([]) ->
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [archive_manager]}
|
||||
modules => [archive_manager]},
|
||||
#{id => migration_engine,
|
||||
start => {migration_engine, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [migration_engine]}
|
||||
],
|
||||
{ok, {SupFlags, Children}}.
|
||||
120
src/infra/migration_engine.erl
Normal file
120
src/infra/migration_engine.erl
Normal file
@@ -0,0 +1,120 @@
|
||||
-module(migration_engine).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("records.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/0, init_migrations_table/0, apply_pending/0,
|
||||
rollback/1, status/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-define(TABLE, schema_migrations).
|
||||
|
||||
%% ------------------------------
|
||||
%% API
|
||||
%% ------------------------------
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init_migrations_table() ->
|
||||
gen_server:call(?MODULE, init_table).
|
||||
|
||||
apply_pending() ->
|
||||
gen_server:call(?MODULE, apply_pending).
|
||||
|
||||
rollback(Version) ->
|
||||
gen_server:call(?MODULE, {rollback, Version}).
|
||||
|
||||
status() ->
|
||||
gen_server:call(?MODULE, status).
|
||||
|
||||
%% ------------------------------
|
||||
%% gen_server callbacks
|
||||
%% ------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call(init_table, _From, State) ->
|
||||
case lists:member(?TABLE, mnesia:system_info(tables)) of
|
||||
true -> ok;
|
||||
false ->
|
||||
mnesia:create_table(?TABLE, [
|
||||
{disc_copies, [node()]},
|
||||
{attributes, record_info(fields, schema_migration)},
|
||||
{type, set}
|
||||
])
|
||||
end,
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(apply_pending, _From, State) ->
|
||||
Result = do_apply_pending(),
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call({rollback, Version}, _From, State) ->
|
||||
Result = do_rollback(Version),
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call(status, _From, State) ->
|
||||
Applied = applied_versions(),
|
||||
Pending = pending_versions() -- Applied,
|
||||
{reply, #{applied => lists:map(fun atom_to_list/1, Applied),
|
||||
pending => lists:map(fun atom_to_list/1, Pending)}, State}.
|
||||
|
||||
handle_cast(_Msg, State) -> {noreply, State}.
|
||||
handle_info(_Msg, State) -> {noreply, State}.
|
||||
terminate(_Reason, _State) -> ok.
|
||||
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
||||
|
||||
%% ------------------------------
|
||||
%% Внутренняя логика
|
||||
%% ------------------------------
|
||||
|
||||
do_apply_pending() ->
|
||||
Pending = pending_versions() -- applied_versions(),
|
||||
lists:foreach(fun(Module) -> code:ensure_loaded(Module) end, Pending),
|
||||
lists:foldl(fun(Version, Acc) ->
|
||||
try Version:up() of
|
||||
_ -> mark_applied(Version), Acc
|
||||
catch _:Reason ->
|
||||
[{error, atom_to_list(Version), Reason} | Acc]
|
||||
end
|
||||
end, [], Pending).
|
||||
|
||||
do_rollback(TargetStr) ->
|
||||
Target = list_to_atom(TargetStr),
|
||||
Applied = applied_versions(),
|
||||
ToRollback = lists:sort(fun(A,B) -> A > B end,
|
||||
[V || V <- Applied, V > Target]),
|
||||
lists:foreach(fun(Version) ->
|
||||
code:ensure_loaded(Version),
|
||||
try Version:down() of
|
||||
_ -> unmark_applied(Version)
|
||||
catch _:Reason ->
|
||||
throw({rollback_failed, atom_to_list(Version), Reason})
|
||||
end
|
||||
end, ToRollback).
|
||||
|
||||
applied_versions() ->
|
||||
[list_to_atom(V) || #schema_migration{version = V} <-
|
||||
mnesia:dirty_match_object(#schema_migration{_ = '_'})].
|
||||
|
||||
pending_versions() ->
|
||||
AllMods = code:all_available(),
|
||||
[list_to_atom(Module) || Module <- extract_module_names(AllMods), lists:prefix("20", Module)].
|
||||
|
||||
extract_module_names(ModInfoList) ->
|
||||
[Name || {Name, _, _} <- ModInfoList].
|
||||
|
||||
mark_applied(Version) ->
|
||||
mnesia:dirty_write(#schema_migration{
|
||||
version = atom_to_list(Version),
|
||||
applied_at = calendar:local_time()
|
||||
}).
|
||||
|
||||
unmark_applied(Version) ->
|
||||
mnesia:dirty_delete({?TABLE, atom_to_list(Version)}).
|
||||
6
src/migrations/20260501120000_base_schema.erl
Normal file
6
src/migrations/20260501120000_base_schema.erl
Normal file
@@ -0,0 +1,6 @@
|
||||
-module('20260501120000_base_schema').
|
||||
|
||||
-export([up/0, down/0]).
|
||||
|
||||
up() -> ok.
|
||||
down() -> ok.
|
||||
11
src/migrations/20260504150000_test_migration.erl
Normal file
11
src/migrations/20260504150000_test_migration.erl
Normal file
@@ -0,0 +1,11 @@
|
||||
-module('20260504150000_test_migration').
|
||||
|
||||
-export([up/0, down/0]).
|
||||
|
||||
up() ->
|
||||
io:format("Test migration UP applied!~n"),
|
||||
ok.
|
||||
|
||||
down() ->
|
||||
io:format("Test migration DOWN applied!~n"),
|
||||
ok.
|
||||
23
src/migrations/README.md
Normal file
23
src/migrations/README.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Миграции схемы данных EventHub
|
||||
|
||||
## Применение миграций
|
||||
При старте приложения автоматически выполняются все неприменённые миграции.
|
||||
Для ручного запуска можно вызвать:
|
||||
migration_engine:apply_pending().
|
||||
|
||||
## Создание новой миграции
|
||||
1. Создайте файл в `priv/migrations/` с именем вида `YYYYMMDDHHMMSS_описание.erl`.
|
||||
2. Реализуйте поведение `db_migration` (функции `up/0` и `down/0`).
|
||||
3. При очередном запуске приложения миграция будет применена.
|
||||
|
||||
## Откат миграций
|
||||
migration_engine:rollback("20260501120000_base_schema").
|
||||
|
||||
## Плавающее обновление (rolling update)
|
||||
1. Переведите узел в режим обслуживания.
|
||||
2. Выполните бэкап: `mnesia:backup("backup_node.bak")`.
|
||||
3. Обновите код приложения (git pull / rsync).
|
||||
4. Перезапустите узел – миграции применятся автоматически.
|
||||
5. Убедитесь в согласованности данных.
|
||||
6. Верните узел в работу.
|
||||
7. Повторите для остальных узлов.
|
||||
Reference in New Issue
Block a user