Files
rose-ash/hosts/ocaml/lib/sx_persist_store.ml
giles c6c2cebf98
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 46s
host-persist: durable storage adapter for persist/* ops + acceptance
Sx_persist_store services every persist/* IO op against on-disk storage
(append-only log + separate monotonic .seq high-water + per-key kv files,
SX-serialized). Wired into the (eval) suspension loop, cek_run_with_io
bridge, and in-process _cek_io_resolver. Data-loss repro now (3 3 3).
New persist_durable_test.sh: durable + monotonic-seq + streams + kv +
real process restart all green (5/5).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 22:32:16 +00:00

261 lines
9.4 KiB
OCaml

(* sx_persist_store — host durable-storage adapter for lib/persist.
Production twin of `persist/serve` (lib/persist/durable.sx): it answers the
same `persist/...` IO ops, but backs them with real on-disk storage so writes
survive a process restart. Stateless-on-disk: every op reads/writes the
filesystem directly, so a fresh process recovers state with no warm-up — the
log on disk IS the state.
On-disk layout under the root dir (default ./persist-data, or $SX_PERSIST_DIR):
streams/<hex(stream)>.log append-only, one SX-serialized event per line
streams/<hex(stream)>.seq per-stream monotonic high-water counter (int)
kv/<hex(key)> one SX-serialized value per key
Invariants honoured (see plans/persist-on-sx.md Blocker spec):
1. last-seq is a per-stream monotonic counter stored in .seq, SEPARATE from
the rows — it keeps climbing across truncate, so a compacted stream never
reassigns a seq.
2. append never renumbers — the event already carries its :seq (log.sx does
last-seq+1); the host only bumps the high-water mark to max(hw, seq).
3. read returns surviving events in append order with :seq intact.
4. streams is the set of streams that ever had an append — keyed off the .seq
file, which truncate never deletes, so it survives full compaction.
5. values round-trip structurally via the SX serializer/parser. *)
open Sx_types
(* ---- root dir ---------------------------------------------------------- *)
let _root : string option ref = ref None
let set_root dir = _root := Some dir
let root_dir () =
match !_root with
| Some d -> d
| None -> (try Sys.getenv "SX_PERSIST_DIR" with Not_found -> "persist-data")
(* ---- filesystem helpers ------------------------------------------------ *)
let rec ensure_dir dir =
if dir = "" || dir = "." || dir = "/" || Sys.file_exists dir then ()
else begin
ensure_dir (Filename.dirname dir);
(try Unix.mkdir dir 0o755 with Unix.Unix_error (Unix.EEXIST, _, _) -> ())
end
let streams_dir () = Filename.concat (root_dir ()) "streams"
let kv_dir () = Filename.concat (root_dir ()) "kv"
let read_file path =
let ic = open_in_bin path in
let n = in_channel_length ic in
let s = really_input_string ic n in
close_in ic;
s
(* Atomic write: temp file in the same dir then rename over the target. *)
let write_file_atomic path contents =
ensure_dir (Filename.dirname path);
let tmp = path ^ ".tmp" in
let oc = open_out_bin tmp in
output_string oc contents;
flush oc;
close_out oc;
Sys.rename tmp path
let append_line path line =
ensure_dir (Filename.dirname path);
let oc = open_out_gen [Open_append; Open_creat; Open_wronly] 0o644 path in
output_string oc line;
output_char oc '\n';
close_out oc
(* ---- name <-> filename (hex, reversible, fs-safe) ---------------------- *)
let hex_encode s =
let b = Buffer.create (String.length s * 2) in
String.iter (fun c -> Buffer.add_string b (Printf.sprintf "%02x" (Char.code c))) s;
Buffer.contents b
let hex_decode s =
let n = String.length s / 2 in
String.init n (fun i -> Char.chr (int_of_string ("0x" ^ String.sub s (i * 2) 2)))
let stream_log stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".log")
let stream_seq stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".seq")
let kv_path key = Filename.concat (kv_dir ()) (hex_encode key)
(* ---- value <-> SX text (round-trips through Sx_parser) ----------------- *)
let escape_str s =
let len = String.length s in
let buf = Buffer.create (len + 16) in
for i = 0 to len - 1 do
match s.[i] with
| '"' -> Buffer.add_string buf "\\\""
| '\\' -> Buffer.add_string buf "\\\\"
| '\n' -> Buffer.add_string buf "\\n"
| '\r' -> Buffer.add_string buf "\\r"
| '\t' -> Buffer.add_string buf "\\t"
| c -> Buffer.add_char buf c
done;
Buffer.contents buf
let rec serialize = function
| Nil -> "nil"
| Bool true -> "true"
| Bool false -> "false"
| Integer n -> string_of_int n
| Number n -> format_number n
| String s -> "\"" ^ escape_str s ^ "\""
| Symbol s -> "(quote " ^ s ^ ")"
| Keyword k -> ":" ^ k
| List items | ListRef { contents = items } ->
"(list" ^ (List.fold_left (fun acc v -> acc ^ " " ^ serialize v) "" items) ^ ")"
| Dict d ->
let pairs = Hashtbl.fold (fun k v acc ->
(Printf.sprintf ":%s %s" k (serialize v)) :: acc) d [] in
"{" ^ String.concat " " (List.sort String.compare pairs) ^ "}"
| _ -> "nil"
(* Parse one serialized value back. Empty / blank -> Nil. *)
let rec deserialize line =
let line = String.trim line in
if line = "" then Nil
else match Sx_parser.parse_all line with
| v :: _ -> eval_quote_lists v
| [] -> Nil
(* serialize emits lists as `(list ...)` and symbols as `(quote s)` so the
parser yields data, not a call — but the parser leaves those as AST. Walk
the parsed AST and collapse `(list ...)`/`(quote s)` back to values. *)
and eval_quote_lists v =
match v with
| List (Symbol "quote" :: x :: []) -> x
| List (Symbol "list" :: rest) -> List (List.map eval_quote_lists rest)
| List items -> List (List.map eval_quote_lists items)
| ListRef { contents = items } -> List (List.map eval_quote_lists items)
| Dict d ->
let d' = Hashtbl.create (Hashtbl.length d) in
Hashtbl.iter (fun k v -> Hashtbl.replace d' k (eval_quote_lists v)) d;
Dict d'
| other -> other
(* ---- seq counter ------------------------------------------------------- *)
let read_seq stream =
let p = stream_seq stream in
if Sys.file_exists p then (try int_of_string (String.trim (read_file p)) with _ -> 0)
else 0
let write_seq stream n = write_file_atomic (stream_seq stream) (string_of_int n)
let value_to_int = function
| Integer n -> n
| Number n -> int_of_float n
| _ -> 0
let event_seq ev =
match ev with
| Dict d -> (match Hashtbl.find_opt d "seq" with Some v -> value_to_int v | None -> 0)
| _ -> 0
(* ---- ops --------------------------------------------------------------- *)
let do_append stream ev =
ensure_dir (streams_dir ());
(* bump the monotonic high-water mark; create .seq on first append so the
stream shows up in `streams` and survives later truncation. *)
let hw = read_seq stream in
let s = event_seq ev in
write_seq stream (max hw s);
append_line (stream_log stream) (serialize ev)
let do_read stream =
let p = stream_log stream in
if not (Sys.file_exists p) then List []
else begin
let content = read_file p in
let lines = String.split_on_char '\n' content in
let evs = List.filter_map (fun l ->
if String.trim l = "" then None else Some (deserialize l)) lines in
List evs
end
let do_last_seq stream = Number (float_of_int (read_seq stream))
let list_dir_suffix dir suffix =
if not (Sys.file_exists dir) then []
else
Array.to_list (Sys.readdir dir)
|> List.filter (fun f -> Filename.check_suffix f suffix)
|> List.map (fun f -> hex_decode (Filename.chop_suffix f suffix))
|> List.sort String.compare
let do_streams () = List (List.map (fun s -> String s) (list_dir_suffix (streams_dir ()) ".seq"))
(* drop events with seq <= n; the .seq high-water counter is untouched. *)
let do_truncate stream n =
let p = stream_log stream in
if Sys.file_exists p then begin
let evs = match do_read stream with List l -> l | _ -> [] in
let kept = List.filter (fun ev -> event_seq ev > n) evs in
let body = String.concat "" (List.map (fun ev -> serialize ev ^ "\n") kept) in
write_file_atomic p body
end
let do_kv_get key =
let p = kv_path key in
if Sys.file_exists p then deserialize (read_file p) else Nil
let do_kv_put key v =
ensure_dir (kv_dir ());
write_file_atomic (kv_path key) (serialize v)
let do_kv_delete key =
let p = kv_path key in
if Sys.file_exists p then (try Sys.remove p with _ -> ())
let do_kv_has key = Bool (Sys.file_exists (kv_path key))
let do_kv_keys () =
if not (Sys.file_exists (kv_dir ())) then List []
else
List (
Array.to_list (Sys.readdir (kv_dir ()))
|> List.map hex_decode
|> List.sort String.compare
|> List.map (fun s -> String s))
(* ---- dispatch ---------------------------------------------------------- *)
let arglist = function
| List l | ListRef { contents = l } -> l
| Nil -> []
| v -> [v]
(* Returns Some response if op is a persist op this store owns, None otherwise. *)
let handle_op op args =
let a = arglist args in
let str = function String s -> s | v -> value_to_string v in
match op with
| "persist/append" ->
(match a with stream :: ev :: _ -> do_append (str stream) ev | _ -> ()); Some Nil
| "persist/read" ->
(match a with stream :: _ -> Some (do_read (str stream)) | _ -> Some (List []))
| "persist/last-seq" ->
(match a with stream :: _ -> Some (do_last_seq (str stream)) | _ -> Some (Number 0.0))
| "persist/streams" -> Some (do_streams ())
| "persist/truncate" ->
(match a with stream :: n :: _ -> do_truncate (str stream) (value_to_int n) | _ -> ()); Some Nil
| "persist/kv-get" ->
(match a with key :: _ -> Some (do_kv_get (str key)) | _ -> Some Nil)
| "persist/kv-put" ->
(match a with key :: v :: _ -> do_kv_put (str key) v | _ -> ()); Some Nil
| "persist/kv-delete" ->
(match a with key :: _ -> do_kv_delete (str key) | _ -> ()); Some Nil
| "persist/kv-has?" ->
(match a with key :: _ -> Some (do_kv_has (str key)) | _ -> Some (Bool false))
| "persist/kv-keys" -> Some (do_kv_keys ())
| _ -> None