Files
rose-ash/next/kernel/log.erl
giles 897449cb35
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 21s
fed-sx-m1: Step 3c.a segment rotation — log:open_disk/3, <ActorId>-NNNNNN.log filename, threshold-driven rotation; 10/10 log_rotate tests
`next/kernel/log.erl` rewritten around a `seg_lens :: [N0, N1, ...]` per-segment entry-count list + a `seg_size` byte threshold. Filename
scheme moved from `<ActorId>.log` to `<ActorId>-NNNNNN.log` (6-digit zero-padded) so `file:list_dir`'s alphabetical sort coincides
with numeric order.

`open_disk/3(ActorId, BasePath, [{segment_size, N}])` opts a caller into a smaller rotation threshold; `open_disk/2` keeps a 1 GiB
default that effectively never rotates (preserves Step 3b acceptance — log_disk.sh unchanged in behaviour).

Rotation rule in `place_append/4`: if the active segment's pre-append encoded size is already >= threshold AND it holds at least one
entry, the new activity opens a fresh segment; otherwise it extends the current active segment. A single huge entry that exceeds
the threshold stays alone — never rotated recursively.

On reopen, `load_all_segments` lists the dir, filters `<ActorId>-NNNNNN.log`, sorts numerically (insertion sort — `lists:sort/1`
isn't registered in this port, only `lists:append/2`/`lists:reverse/1`/`lists:filter/2`/etc.), reads each via `try_read_segment`,
and concatenates the entries to rebuild flat `entries` + `seg_lens`.

Erlang-port gotchas worked around during this iteration:
(a) String literals like `"foo"` in this port are NOT charlists — `[H|T] = "foo"` badmatches and `length("foo")` errors as "not a
    proper list". `parse_segment_name` builds prefix/suffix from `atom_to_list/1` + explicit `[$-]` / `[$., $l, $o, $g]` cons.
(b) Cross-arg variable repetition (`strip_prefix([C | Rest], [C | PRest])`) was rewritten to explicit `case C =:= P` for robustness.
(c) `Pattern = Binding` syntax in a case clause (`[_|_] = Lst when length(Lst) > 1 -> ...`) errors as "unsupported pattern type
    'match'" — replaced with `Lst when is_list(Lst), length(Lst) > 1`.

Tests:
- new `next/tests/log_rotate.sh` (10 cases): no-opt single-seg-after-3, rotation-fires-on-threshold, rotated-chronological,
  reopen-rebuilds-history, reopen-rebuilds-same-seg-shape, huge-single-entry-stays-1-seg, append-after-huge-keeps-order,
  tip-monotonic-across-rotations.
- `next/tests/log_disk.sh` updated to the new filename (`corrupted-000000.log`); stays 12/12.
- Erlang conformance 761/761 unchanged (log.erl is in next/, not lib/erlang/).

3c.a ticked in plans/fed-sx-milestone-1.md; 3c.b (gen_server-mediated concurrent appends) is the next iteration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-05 07:40:48 +00:00

363 lines
13 KiB
Erlang

-module(log).
-export([open/2, open_disk/2, open_disk/3,
append/2, tip/1, replay/3, entries/1,
segments/1]).
%% Per-actor activity log — the canonical record of everything an
%% actor has emitted, in chronological order. Per design §15.2 this
%% lives on disk as numbered segment files; v1 started with an
%% in-memory backend (Step 3a) so the API + seq-number machinery
%% could be locked down before on-disk persistence (Step 3b) and
%% segment rotation (Step 3c.a — this revision).
%%
%% On-disk layout:
%% <BasePath>/<ActorId>-NNNNNN.log
%%
%% NNNNNN is a 6-digit zero-padded segment index (000000..999999) so
%% file:list_dir's alphabetical ordering coincides with numeric. Each
%% segment file is the concat of length-prefixed frames; each frame
%% is `<<Len:32/big>>` + `term_codec:encode(Activity)`.
%%
%% In-memory state (a property list):
%% [{actor, ActorId},
%% {base, BasePath}, %% binary | charlist
%% {seq, NextSeq}, %% next seq the log will assign
%% {entries, [Activity, ...]}, %% flat, append order, oldest first
%% {persisted, true|false}, %% does append write through?
%% {seg_size, MaxBytes}, %% rotate when active segment > this
%% {seg_lens, [N0, N1, ...]}] %% entry count per segment in order
%%
%% `seg_lens` is the sole bookkeeping needed to compute (a) which
%% segment any given seq lives in, and (b) which slice of `entries`
%% is the active segment's contents to rewrite on append. The last
%% element is the active segment's length.
%% In-memory only — atoms accepted as BasePath for back-compat with
%% Step 3a tests that just want the API surface.
open(ActorId, BasePath) ->
{ok, [{actor, ActorId}, {base, BasePath},
{seq, 0}, {entries, []},
{persisted, false}]}.
%% Disk-backed; default segment size = effectively unlimited (no
%% rotation). Use open_disk/3 with {segment_size, N} to enable.
open_disk(ActorId, BasePath) ->
open_disk(ActorId, BasePath, [{segment_size, 1073741824}]). %% 1 GiB
open_disk(ActorId, BasePath, Opts) ->
SegSize = proplist_get(segment_size, Opts, 1073741824),
case load_all_segments(ActorId, BasePath) of
{ok, SegEntries} ->
%% SegEntries :: [[Entry, ...]] in segment-index order
%% (empty list when no segments exist on disk).
Lens0 = [length(S) || S <- SegEntries],
%% Always have at least one active segment, even if empty.
Lens = case Lens0 of
[] -> [0];
_ -> Lens0
end,
Flat = flatten_segs(SegEntries),
State = [{actor, ActorId}, {base, BasePath},
{seq, length(Flat)},
{entries, Flat},
{persisted, true},
{seg_size, SegSize},
{seg_lens, Lens}],
{ok, State};
{error, _} = E ->
E
end.
append(LogState, Activity) ->
Seq = field(seq, LogState),
Entries = field(entries, LogState),
case lookup(persisted, LogState) of
true ->
SegLens = field(seg_lens, LogState),
SegSize = field(seg_size, LogState),
{NewSegLens, ActiveIdx, ActiveEntries} =
place_append(Entries, Activity, SegLens, SegSize),
Path = segment_path(field(actor, LogState),
field(base, LogState),
ActiveIdx),
ok = write_segment(Path, ActiveEntries),
NewState = replace_field(seq, Seq + 1,
replace_field(entries, Entries ++ [Activity],
replace_field(seg_lens, NewSegLens, LogState))),
{ok, NewState, Seq};
_ ->
NewState = replace_field(seq, Seq + 1,
replace_field(entries, Entries ++ [Activity],
LogState)),
{ok, NewState, Seq}
end.
tip(LogState) ->
field(seq, LogState).
replay(LogState, InitAcc, Fun) ->
Entries = field(entries, LogState),
replay_loop(Entries, 0, InitAcc, Fun).
entries(LogState) ->
field(entries, LogState).
%% Debug accessor: returns the in-memory seg_lens (count per segment
%% in index order). Used by rotation tests to assert that rotation
%% happened.
segments(LogState) ->
case lookup(seg_lens, LogState) of
undefined -> [];
L -> L
end.
%% --- internals ---
replay_loop([], _, Acc, _) -> Acc;
replay_loop([Act | Rest], Seq, Acc, Fun) ->
replay_loop(Rest, Seq + 1, Fun(Act, Seq, Acc), Fun).
%% place_append/4 decides whether the new Activity extends the current
%% active segment or opens a fresh one, returning the resulting
%% seg_lens, the active segment's index, and the active segment's
%% complete entry list (the slice that needs to be (re)written to
%% disk).
%%
%% Rotation rule: if the active segment already on disk is at or past
%% the size threshold (encoded_size(OldActive) >= SegSize) AND it
%% already holds at least one entry, the new Activity opens a new
%% segment. A single entry larger than the threshold therefore lives
%% on its own — we never recurse rotating a one-entry segment.
%%
%% This is decided BEFORE the append (looking at the pre-append size),
%% so each segment file is written exactly once per append cycle.
place_append(OldEntries, Activity, SegLens, SegSize) ->
{Pre, Last} = split_last(SegLens),
PreCount = sum(Pre),
OldActive = drop(PreCount, OldEntries),
OldActiveSize = encoded_size(OldActive),
case (OldActiveSize >= SegSize) andalso (Last >= 1) of
true ->
%% Rotate: new entry starts a brand-new segment.
NewSegLens = SegLens ++ [1],
NewActiveIdx = length(SegLens),
{NewSegLens, NewActiveIdx, [Activity]};
false ->
%% Stay: extend current active.
NewSegLens = Pre ++ [Last + 1],
NewActiveIdx = length(Pre),
{NewSegLens, NewActiveIdx, OldActive ++ [Activity]}
end.
split_last([X]) -> {[], X};
split_last([H | T]) ->
{Tl, Last} = split_last(T),
{[H | Tl], Last}.
sum(L) -> sum_(L, 0).
sum_([], A) -> A;
sum_([H | T], A) -> sum_(T, A + H).
drop(0, L) -> L;
drop(_, []) -> [];
drop(N, [_ | T]) -> drop(N - 1, T).
%% flatten_segs/1 — concat a list of segments (each itself a list of
%% entries) into a single flat list, preserving order. Used by
%% open_disk to assemble the on-disk activity history from per-
%% segment loads. Implemented locally because lists:append/1 isn't
%% registered in this port — only lists:append/2.
flatten_segs([]) -> [];
flatten_segs([Seg | Rest]) -> Seg ++ flatten_segs(Rest).
encoded_size(Entries) ->
byte_size(list_to_binary(
[frame(term_codec:encode(E)) || E <- Entries])).
%% Try to read every segment file under BasePath matching the actor.
%% Returns {ok, [[Entry, ...]]} where the outer list is in segment-
%% index order. Empty when no segments exist.
load_all_segments(ActorId, BasePath) ->
%% list_dir returns {ok, [Binary]} of entry names in sorted order
%% per fed-prims contract.
BaseChars = base_chars(BasePath),
case file:list_dir(BaseChars) of
{ok, Names} ->
%% Erlang string literals are NOT charlists in this port,
%% so build prefix/suffix as explicit char-code lists.
Prefix = atom_to_list(ActorId) ++ [$-],
Suffix = [$., $l, $o, $g],
Indices = collect_segment_indices(Names, Prefix, Suffix),
read_segments_in_order(Indices, ActorId, BasePath, []);
{error, enoent} ->
{ok, []};
{error, R} ->
{error, {read, R}}
end.
collect_segment_indices([], _, _) -> [];
collect_segment_indices([Name | Rest], Prefix, Suffix) ->
case parse_segment_name(Name, Prefix, Suffix) of
{ok, N} ->
[N | collect_segment_indices(Rest, Prefix, Suffix)];
not_ours ->
collect_segment_indices(Rest, Prefix, Suffix)
end.
parse_segment_name(NameBin, Prefix, Suffix) when is_binary(NameBin) ->
parse_segment_name(binary_to_list(NameBin), Prefix, Suffix);
parse_segment_name(Name, Prefix, Suffix) ->
case strip_prefix(Name, Prefix) of
{ok, Rest} ->
case strip_suffix(Rest, Suffix) of
{ok, NumStr} ->
case is_all_digits(NumStr) of
true -> {ok, list_to_integer(NumStr)};
false -> not_ours
end;
not_ours -> not_ours
end;
not_ours -> not_ours
end.
strip_prefix(Str, []) -> {ok, Str};
strip_prefix([C | Rest], [P | PRest]) ->
case C =:= P of
true -> strip_prefix(Rest, PRest);
false -> not_ours
end;
strip_prefix(_, _) -> not_ours.
strip_suffix(Str, Suffix) ->
SL = length(Str),
XL = length(Suffix),
case SL >= XL of
true ->
Head = take_n_pl(SL - XL, Str),
Tail = drop(SL - XL, Str),
case Tail =:= Suffix of
true -> {ok, Head};
false -> not_ours
end;
false -> not_ours
end.
take_n_pl(0, _) -> [];
take_n_pl(_, []) -> [];
take_n_pl(N, [H | T]) -> [H | take_n_pl(N - 1, T)].
is_all_digits([]) -> false;
is_all_digits(Chars) -> all_digits(Chars).
all_digits([]) -> true;
all_digits([C | Rest]) when C >= $0, C =< $9 -> all_digits(Rest);
all_digits(_) -> false.
%% read_segments_in_order/4 — fed-prims sorts list_dir alphabetically;
%% with 6-digit zero-padded names that coincides with numeric order.
%% But we also accept legacy unpadded names, so sort by index to be
%% defensive.
read_segments_in_order(Indices, ActorId, BasePath, Acc) ->
Sorted = isort(Indices),
read_each(Sorted, ActorId, BasePath, Acc).
read_each([], _, _, Acc) ->
{ok, lists:reverse(Acc)};
read_each([Idx | Rest], ActorId, BasePath, Acc) ->
Path = segment_path(ActorId, BasePath, Idx),
case try_read_segment(Path) of
{ok, Entries} ->
read_each(Rest, ActorId, BasePath, [Entries | Acc]);
{error, _} = E -> E
end.
%% Tiny insertion sort over a small list of integers.
isort([]) -> [];
isort([H | T]) -> insert(H, isort(T)).
insert(X, []) -> [X];
insert(X, [Y | Rest]) when X =< Y -> [X, Y | Rest];
insert(X, [Y | Rest]) -> [Y | insert(X, Rest)].
%% segment_path/3 — charlist path to the Idx'th segment file.
segment_path(ActorId, BasePath, Idx) ->
base_chars(BasePath) ++ [$/] ++ atom_to_list(ActorId)
++ [$-] ++ pad_int(Idx, 6) ++ [$., $l, $o, $g].
base_chars(B) when is_binary(B) -> binary_to_list(B);
base_chars(L) when is_list(L) -> L.
%% Zero-pad an integer to Width digits as a charlist.
pad_int(N, Width) ->
Cs = integer_to_list(N),
pad_left(Cs, Width).
pad_left(Cs, Width) ->
case length(Cs) >= Width of
true -> Cs;
false -> pad_left([$0 | Cs], Width)
end.
write_segment(Path, Entries) ->
Frames = [frame(term_codec:encode(E)) || E <- Entries],
file:write_file(Path, list_to_binary(Frames)).
%% frame/1 — prepend 4-byte big-endian length to Payload.
frame(Payload) when is_binary(Payload) ->
L = byte_size(Payload),
B3 = (L div 16777216) rem 256,
B2 = (L div 65536) rem 256,
B1 = (L div 256) rem 256,
B0 = L rem 256,
[B3, B2, B1, B0, Payload].
try_read_segment(Path) ->
case file:read_file(Path) of
{ok, Bin} ->
try {ok, decode_frames(binary_to_list(Bin), [])}
catch
throw:Reason -> {error, {corrupt, Reason}};
error:Reason -> {error, {corrupt, Reason}}
end;
{error, enoent} ->
{ok, []};
{error, R} ->
{error, {read, R}}
end.
decode_frames([], Acc) ->
lists:reverse(Acc);
decode_frames([B3, B2, B1, B0 | Rest], Acc) ->
Len = B3 * 16777216 + B2 * 65536 + B1 * 256 + B0,
{Payload, Rest2} = take_n(Len, Rest),
case term_codec:decode(list_to_binary(Payload)) of
{ok, Term, _} -> decode_frames(Rest2, [Term | Acc]);
{error, R} -> throw({decode, R})
end;
decode_frames(_, _) ->
throw(truncated_header).
take_n(0, R) -> {[], R};
take_n(N, [H | T]) ->
{Hs, Tl} = take_n(N - 1, T),
{[H | Hs], Tl};
take_n(_, []) ->
throw(truncated_body).
%% --- proplist helpers ---
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> erlang:error(badkey).
lookup(K, [{K, V} | _]) -> V;
lookup(K, [_ | Rest]) -> lookup(K, Rest);
lookup(_, []) -> undefined.
replace_field(K, V, []) -> [{K, V}];
replace_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
replace_field(K, V, [P | Rest]) -> [P | replace_field(K, V, Rest)].
proplist_get(K, [{K, V} | _], _) -> V;
proplist_get(K, [_ | Rest], Default) -> proplist_get(K, Rest, Default);
proplist_get(_, [], Default) -> Default.