fed-sx-m1: Step 3b on-disk log — open_disk/2 + write-through append/2 + length-framed segments; 12/12 log_disk tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 29s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 29s
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
-module(log).
|
||||
-export([open/2, append/2, tip/1, replay/3, entries/1]).
|
||||
-export([open/2, open_disk/2, append/2, tip/1, replay/3, entries/1]).
|
||||
|
||||
%% Per-actor activity log — the canonical record of everything an
|
||||
%% actor has emitted, in chronological order. Per design §15.2 this
|
||||
@@ -36,9 +36,112 @@ open(ActorId, BasePath) ->
|
||||
append(LogState, Activity) ->
|
||||
Seq = field(seq, LogState),
|
||||
Entries = field(entries, LogState),
|
||||
NewEntries = Entries ++ [Activity],
|
||||
NewState = replace_field(seq, Seq + 1,
|
||||
replace_field(entries, Entries ++ [Activity], LogState)),
|
||||
{ok, NewState, Seq}.
|
||||
replace_field(entries, NewEntries, LogState)),
|
||||
case persisted_path(LogState) of
|
||||
{persisted, Path} ->
|
||||
ok = write_segment(Path, NewEntries),
|
||||
{ok, NewState, Seq};
|
||||
not_persisted ->
|
||||
{ok, NewState, Seq}
|
||||
end.
|
||||
|
||||
%% open_disk/2 — disk-backed variant of open. Reads any existing
|
||||
%% segment file under BasePath, replays entries into memory state,
|
||||
%% and tags the state {persisted, true} so future append/2 calls
|
||||
%% write through. BasePath must be a binary or charlist (real path),
|
||||
%% not an atom — the in-memory open/2 still accepts atoms for tests.
|
||||
%%
|
||||
%% Segment format (per frame): 4-byte big-endian length + that many
|
||||
%% bytes of term_codec:encode(Activity). Whole file is the concat of
|
||||
%% all frames in append order; no header.
|
||||
%%
|
||||
%% Returns {ok, LogState} on success, {error, {corrupt, Reason}} if
|
||||
%% the segment is truncated/garbled, {error, {read, Reason}} on other
|
||||
%% file errors. Missing file is treated as an empty fresh log.
|
||||
open_disk(ActorId, BasePath) ->
|
||||
Path = segment_path(ActorId, BasePath),
|
||||
case try_read_segment(Path) of
|
||||
{ok, Entries} ->
|
||||
State = [{actor, ActorId}, {base, BasePath},
|
||||
{seq, length(Entries)},
|
||||
{entries, Entries},
|
||||
{persisted, true},
|
||||
{path, Path}],
|
||||
{ok, State};
|
||||
{error, _} = E ->
|
||||
E
|
||||
end.
|
||||
|
||||
persisted_path(LogState) ->
|
||||
case lookup(persisted, LogState) of
|
||||
true ->
|
||||
case lookup(path, LogState) of
|
||||
undefined -> not_persisted;
|
||||
P -> {persisted, P}
|
||||
end;
|
||||
_ -> not_persisted
|
||||
end.
|
||||
|
||||
%% segment_path/2 — returns the segment file path as a charlist (list
|
||||
%% of int char codes). BasePath may be a binary OR a charlist; we
|
||||
%% normalize to charlist via binary_to_list so the result is purely
|
||||
%% cons-based — this works around an iolist-walker quirk in
|
||||
%% er-source-to-string that surfaces when list_to_binary nests binaries
|
||||
%% built from charlists. file:read_file accepts charlists fine.
|
||||
segment_path(ActorId, BasePath) ->
|
||||
base_chars(BasePath) ++ [$/] ++ atom_to_list(ActorId)
|
||||
++ [$., $l, $o, $g].
|
||||
|
||||
base_chars(B) when is_binary(B) -> binary_to_list(B);
|
||||
base_chars(L) when is_list(L) -> L.
|
||||
|
||||
write_segment(Path, Entries) ->
|
||||
Frames = [frame(term_codec:encode(E)) || E <- Entries],
|
||||
file:write_file(Path, list_to_binary(Frames)).
|
||||
|
||||
%% frame/1 — prepend 4-byte big-endian length to Payload.
|
||||
frame(Payload) when is_binary(Payload) ->
|
||||
L = byte_size(Payload),
|
||||
B3 = (L div 16777216) rem 256,
|
||||
B2 = (L div 65536) rem 256,
|
||||
B1 = (L div 256) rem 256,
|
||||
B0 = L rem 256,
|
||||
[B3, B2, B1, B0, Payload].
|
||||
|
||||
try_read_segment(Path) ->
|
||||
case file:read_file(Path) of
|
||||
{ok, Bin} ->
|
||||
try {ok, decode_frames(binary_to_list(Bin), [])}
|
||||
catch
|
||||
throw:Reason -> {error, {corrupt, Reason}};
|
||||
error:Reason -> {error, {corrupt, Reason}}
|
||||
end;
|
||||
{error, enoent} ->
|
||||
{ok, []};
|
||||
{error, R} ->
|
||||
{error, {read, R}}
|
||||
end.
|
||||
|
||||
decode_frames([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
decode_frames([B3, B2, B1, B0 | Rest], Acc) ->
|
||||
Len = B3 * 16777216 + B2 * 65536 + B1 * 256 + B0,
|
||||
{Payload, Rest2} = take_n(Len, Rest),
|
||||
case term_codec:decode(list_to_binary(Payload)) of
|
||||
{ok, Term, _} -> decode_frames(Rest2, [Term | Acc]);
|
||||
{error, R} -> throw({decode, R})
|
||||
end;
|
||||
decode_frames(_, _) ->
|
||||
throw(truncated_header).
|
||||
|
||||
take_n(0, R) -> {[], R};
|
||||
take_n(N, [H | T]) ->
|
||||
{Hs, Tl} = take_n(N - 1, T),
|
||||
{[H | Hs], Tl};
|
||||
take_n(_, []) ->
|
||||
throw(truncated_body).
|
||||
|
||||
tip(LogState) ->
|
||||
field(seq, LogState).
|
||||
@@ -58,6 +161,12 @@ field(K, [{K, V} | _]) -> V;
|
||||
field(K, [_ | Rest]) -> field(K, Rest);
|
||||
field(_, []) -> erlang:error(badkey).
|
||||
|
||||
%% lookup/2 — like field but returns `undefined` for missing key
|
||||
%% (used by persisted_path/1 which probes optional state fields).
|
||||
lookup(K, [{K, V} | _]) -> V;
|
||||
lookup(K, [_ | Rest]) -> lookup(K, Rest);
|
||||
lookup(_, []) -> undefined.
|
||||
|
||||
replace_field(K, V, []) -> [{K, V}];
|
||||
replace_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
||||
replace_field(K, V, [P | Rest]) -> [P | replace_field(K, V, Rest)].
|
||||
|
||||
Reference in New Issue
Block a user