-module(log). -export([open/2, open_disk/2, open_disk/3, append/2, tip/1, replay/3, entries/1, segments/1]). %% Per-actor activity log — the canonical record of everything an %% actor has emitted, in chronological order. Per design §15.2 this %% lives on disk as numbered segment files; v1 started with an %% in-memory backend (Step 3a) so the API + seq-number machinery %% could be locked down before on-disk persistence (Step 3b) and %% segment rotation (Step 3c.a — this revision). %% %% On-disk layout: %% /-NNNNNN.log %% %% NNNNNN is a 6-digit zero-padded segment index (000000..999999) so %% file:list_dir's alphabetical ordering coincides with numeric. Each %% segment file is the concat of length-prefixed frames; each frame %% is `<>` + `term_codec:encode(Activity)`. %% %% In-memory state (a property list): %% [{actor, ActorId}, %% {base, BasePath}, %% binary | charlist %% {seq, NextSeq}, %% next seq the log will assign %% {entries, [Activity, ...]}, %% flat, append order, oldest first %% {persisted, true|false}, %% does append write through? %% {seg_size, MaxBytes}, %% rotate when active segment > this %% {seg_lens, [N0, N1, ...]}] %% entry count per segment in order %% %% `seg_lens` is the sole bookkeeping needed to compute (a) which %% segment any given seq lives in, and (b) which slice of `entries` %% is the active segment's contents to rewrite on append. The last %% element is the active segment's length. %% In-memory only — atoms accepted as BasePath for back-compat with %% Step 3a tests that just want the API surface. open(ActorId, BasePath) -> {ok, [{actor, ActorId}, {base, BasePath}, {seq, 0}, {entries, []}, {persisted, false}]}. %% Disk-backed; default segment size = effectively unlimited (no %% rotation). Use open_disk/3 with {segment_size, N} to enable. open_disk(ActorId, BasePath) -> open_disk(ActorId, BasePath, [{segment_size, 1073741824}]). %% 1 GiB open_disk(ActorId, BasePath, Opts) -> SegSize = proplist_get(segment_size, Opts, 1073741824), case load_all_segments(ActorId, BasePath) of {ok, SegEntries} -> %% SegEntries :: [[Entry, ...]] in segment-index order %% (empty list when no segments exist on disk). Lens0 = [length(S) || S <- SegEntries], %% Always have at least one active segment, even if empty. Lens = case Lens0 of [] -> [0]; _ -> Lens0 end, Flat = flatten_segs(SegEntries), State = [{actor, ActorId}, {base, BasePath}, {seq, length(Flat)}, {entries, Flat}, {persisted, true}, {seg_size, SegSize}, {seg_lens, Lens}], {ok, State}; {error, _} = E -> E end. append(LogState, Activity) -> Seq = field(seq, LogState), Entries = field(entries, LogState), case lookup(persisted, LogState) of true -> SegLens = field(seg_lens, LogState), SegSize = field(seg_size, LogState), {NewSegLens, ActiveIdx, ActiveEntries} = place_append(Entries, Activity, SegLens, SegSize), Path = segment_path(field(actor, LogState), field(base, LogState), ActiveIdx), ok = write_segment(Path, ActiveEntries), NewState = replace_field(seq, Seq + 1, replace_field(entries, Entries ++ [Activity], replace_field(seg_lens, NewSegLens, LogState))), {ok, NewState, Seq}; _ -> NewState = replace_field(seq, Seq + 1, replace_field(entries, Entries ++ [Activity], LogState)), {ok, NewState, Seq} end. tip(LogState) -> field(seq, LogState). replay(LogState, InitAcc, Fun) -> Entries = field(entries, LogState), replay_loop(Entries, 0, InitAcc, Fun). entries(LogState) -> field(entries, LogState). %% Debug accessor: returns the in-memory seg_lens (count per segment %% in index order). Used by rotation tests to assert that rotation %% happened. segments(LogState) -> case lookup(seg_lens, LogState) of undefined -> []; L -> L end. %% --- internals --- replay_loop([], _, Acc, _) -> Acc; replay_loop([Act | Rest], Seq, Acc, Fun) -> replay_loop(Rest, Seq + 1, Fun(Act, Seq, Acc), Fun). %% place_append/4 decides whether the new Activity extends the current %% active segment or opens a fresh one, returning the resulting %% seg_lens, the active segment's index, and the active segment's %% complete entry list (the slice that needs to be (re)written to %% disk). %% %% Rotation rule: if the active segment already on disk is at or past %% the size threshold (encoded_size(OldActive) >= SegSize) AND it %% already holds at least one entry, the new Activity opens a new %% segment. A single entry larger than the threshold therefore lives %% on its own — we never recurse rotating a one-entry segment. %% %% This is decided BEFORE the append (looking at the pre-append size), %% so each segment file is written exactly once per append cycle. place_append(OldEntries, Activity, SegLens, SegSize) -> {Pre, Last} = split_last(SegLens), PreCount = sum(Pre), OldActive = drop(PreCount, OldEntries), OldActiveSize = encoded_size(OldActive), case (OldActiveSize >= SegSize) andalso (Last >= 1) of true -> %% Rotate: new entry starts a brand-new segment. NewSegLens = SegLens ++ [1], NewActiveIdx = length(SegLens), {NewSegLens, NewActiveIdx, [Activity]}; false -> %% Stay: extend current active. NewSegLens = Pre ++ [Last + 1], NewActiveIdx = length(Pre), {NewSegLens, NewActiveIdx, OldActive ++ [Activity]} end. split_last([X]) -> {[], X}; split_last([H | T]) -> {Tl, Last} = split_last(T), {[H | Tl], Last}. sum(L) -> sum_(L, 0). sum_([], A) -> A; sum_([H | T], A) -> sum_(T, A + H). drop(0, L) -> L; drop(_, []) -> []; drop(N, [_ | T]) -> drop(N - 1, T). %% flatten_segs/1 — concat a list of segments (each itself a list of %% entries) into a single flat list, preserving order. Used by %% open_disk to assemble the on-disk activity history from per- %% segment loads. Implemented locally because lists:append/1 isn't %% registered in this port — only lists:append/2. flatten_segs([]) -> []; flatten_segs([Seg | Rest]) -> Seg ++ flatten_segs(Rest). encoded_size(Entries) -> byte_size(list_to_binary( [frame(term_codec:encode(E)) || E <- Entries])). %% Try to read every segment file under BasePath matching the actor. %% Returns {ok, [[Entry, ...]]} where the outer list is in segment- %% index order. Empty when no segments exist. load_all_segments(ActorId, BasePath) -> %% list_dir returns {ok, [Binary]} of entry names in sorted order %% per fed-prims contract. BaseChars = base_chars(BasePath), case file:list_dir(BaseChars) of {ok, Names} -> %% Erlang string literals are NOT charlists in this port, %% so build prefix/suffix as explicit char-code lists. Prefix = atom_to_list(ActorId) ++ [$-], Suffix = [$., $l, $o, $g], Indices = collect_segment_indices(Names, Prefix, Suffix), read_segments_in_order(Indices, ActorId, BasePath, []); {error, enoent} -> {ok, []}; {error, R} -> {error, {read, R}} end. collect_segment_indices([], _, _) -> []; collect_segment_indices([Name | Rest], Prefix, Suffix) -> case parse_segment_name(Name, Prefix, Suffix) of {ok, N} -> [N | collect_segment_indices(Rest, Prefix, Suffix)]; not_ours -> collect_segment_indices(Rest, Prefix, Suffix) end. parse_segment_name(NameBin, Prefix, Suffix) when is_binary(NameBin) -> parse_segment_name(binary_to_list(NameBin), Prefix, Suffix); parse_segment_name(Name, Prefix, Suffix) -> case strip_prefix(Name, Prefix) of {ok, Rest} -> case strip_suffix(Rest, Suffix) of {ok, NumStr} -> case is_all_digits(NumStr) of true -> {ok, list_to_integer(NumStr)}; false -> not_ours end; not_ours -> not_ours end; not_ours -> not_ours end. strip_prefix(Str, []) -> {ok, Str}; strip_prefix([C | Rest], [P | PRest]) -> case C =:= P of true -> strip_prefix(Rest, PRest); false -> not_ours end; strip_prefix(_, _) -> not_ours. strip_suffix(Str, Suffix) -> SL = length(Str), XL = length(Suffix), case SL >= XL of true -> Head = take_n_pl(SL - XL, Str), Tail = drop(SL - XL, Str), case Tail =:= Suffix of true -> {ok, Head}; false -> not_ours end; false -> not_ours end. take_n_pl(0, _) -> []; take_n_pl(_, []) -> []; take_n_pl(N, [H | T]) -> [H | take_n_pl(N - 1, T)]. is_all_digits([]) -> false; is_all_digits(Chars) -> all_digits(Chars). all_digits([]) -> true; all_digits([C | Rest]) when C >= $0, C =< $9 -> all_digits(Rest); all_digits(_) -> false. %% read_segments_in_order/4 — fed-prims sorts list_dir alphabetically; %% with 6-digit zero-padded names that coincides with numeric order. %% But we also accept legacy unpadded names, so sort by index to be %% defensive. read_segments_in_order(Indices, ActorId, BasePath, Acc) -> Sorted = isort(Indices), read_each(Sorted, ActorId, BasePath, Acc). read_each([], _, _, Acc) -> {ok, lists:reverse(Acc)}; read_each([Idx | Rest], ActorId, BasePath, Acc) -> Path = segment_path(ActorId, BasePath, Idx), case try_read_segment(Path) of {ok, Entries} -> read_each(Rest, ActorId, BasePath, [Entries | Acc]); {error, _} = E -> E end. %% Tiny insertion sort over a small list of integers. isort([]) -> []; isort([H | T]) -> insert(H, isort(T)). insert(X, []) -> [X]; insert(X, [Y | Rest]) when X =< Y -> [X, Y | Rest]; insert(X, [Y | Rest]) -> [Y | insert(X, Rest)]. %% segment_path/3 — charlist path to the Idx'th segment file. segment_path(ActorId, BasePath, Idx) -> base_chars(BasePath) ++ [$/] ++ atom_to_list(ActorId) ++ [$-] ++ pad_int(Idx, 6) ++ [$., $l, $o, $g]. base_chars(B) when is_binary(B) -> binary_to_list(B); base_chars(L) when is_list(L) -> L. %% Zero-pad an integer to Width digits as a charlist. pad_int(N, Width) -> Cs = integer_to_list(N), pad_left(Cs, Width). pad_left(Cs, Width) -> case length(Cs) >= Width of true -> Cs; false -> pad_left([$0 | Cs], Width) end. write_segment(Path, Entries) -> Frames = [frame(term_codec:encode(E)) || E <- Entries], file:write_file(Path, list_to_binary(Frames)). %% frame/1 — prepend 4-byte big-endian length to Payload. frame(Payload) when is_binary(Payload) -> L = byte_size(Payload), B3 = (L div 16777216) rem 256, B2 = (L div 65536) rem 256, B1 = (L div 256) rem 256, B0 = L rem 256, [B3, B2, B1, B0, Payload]. try_read_segment(Path) -> case file:read_file(Path) of {ok, Bin} -> try {ok, decode_frames(binary_to_list(Bin), [])} catch throw:Reason -> {error, {corrupt, Reason}}; error:Reason -> {error, {corrupt, Reason}} end; {error, enoent} -> {ok, []}; {error, R} -> {error, {read, R}} end. decode_frames([], Acc) -> lists:reverse(Acc); decode_frames([B3, B2, B1, B0 | Rest], Acc) -> Len = B3 * 16777216 + B2 * 65536 + B1 * 256 + B0, {Payload, Rest2} = take_n(Len, Rest), case term_codec:decode(list_to_binary(Payload)) of {ok, Term, _} -> decode_frames(Rest2, [Term | Acc]); {error, R} -> throw({decode, R}) end; decode_frames(_, _) -> throw(truncated_header). take_n(0, R) -> {[], R}; take_n(N, [H | T]) -> {Hs, Tl} = take_n(N - 1, T), {[H | Hs], Tl}; take_n(_, []) -> throw(truncated_body). %% --- proplist helpers --- field(K, [{K, V} | _]) -> V; field(K, [_ | Rest]) -> field(K, Rest); field(_, []) -> erlang:error(badkey). lookup(K, [{K, V} | _]) -> V; lookup(K, [_ | Rest]) -> lookup(K, Rest); lookup(_, []) -> undefined. replace_field(K, V, []) -> [{K, V}]; replace_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; replace_field(K, V, [P | Rest]) -> [P | replace_field(K, V, Rest)]. proplist_get(K, [{K, V} | _], _) -> V; proplist_get(K, [_ | Rest], Default) -> proplist_get(K, Rest, Default); proplist_get(_, [], Default) -> Default.