diff --git a/hosts/ocaml/bin/sx_server.ml b/hosts/ocaml/bin/sx_server.ml index bbeb10c2..f2db0088 100644 --- a/hosts/ocaml/bin/sx_server.ml +++ b/hosts/ocaml/bin/sx_server.ml @@ -571,9 +571,12 @@ and cek_run_with_io state = Hashtbl.replace d "descent" (Number desc); Dict d | _ -> - let args = let a = Sx_runtime.get_val request (String "args") in - (match a with List l -> l | _ -> [a]) in - io_request op args + let argsv = Sx_runtime.get_val request (String "args") in + (match Sx_persist_store.handle_op op argsv with + | Some resp -> resp + | None -> + let args = (match argsv with List l -> l | _ -> [argsv]) in + io_request op args) in s := Sx_ref.cek_resume !s response; loop () @@ -1540,7 +1543,12 @@ let rec dispatch env cmd = | Some path -> load_library_file path | None -> ()); Nil end - end else Nil (* non-import IO: resume with nil *) in + end else + (* durable-storage ops: service against on-disk store *) + let args = Sx_runtime.get_val request (String "args") in + (match Sx_persist_store.handle_op op args with + | Some resp -> resp + | None -> Nil (* non-import IO: resume with nil *)) in s := Sx_ref.cek_resume !s response done; Sx_ref.cek_value !s @@ -3893,7 +3901,10 @@ let http_mode port = Dict d | "io-sleep" | "sleep" -> Nil | "import" -> Nil - | _ -> Nil); + | _ -> + (match Sx_persist_store.handle_op op args with + | Some resp -> resp + | None -> Nil)); (* Response cache — path → full HTTP response string. Populated during pre-warm, serves cached responses in <0.1ms. Thread-safe: reads are lock-free (Hashtbl.find_opt is atomic for diff --git a/hosts/ocaml/lib/sx_persist_store.ml b/hosts/ocaml/lib/sx_persist_store.ml new file mode 100644 index 00000000..2a7086f7 --- /dev/null +++ b/hosts/ocaml/lib/sx_persist_store.ml @@ -0,0 +1,260 @@ +(* sx_persist_store — host durable-storage adapter for lib/persist. + Production twin of `persist/serve` (lib/persist/durable.sx): it answers the + same `persist/...` IO ops, but backs them with real on-disk storage so writes + survive a process restart. Stateless-on-disk: every op reads/writes the + filesystem directly, so a fresh process recovers state with no warm-up — the + log on disk IS the state. + + On-disk layout under the root dir (default ./persist-data, or $SX_PERSIST_DIR): + streams/.log append-only, one SX-serialized event per line + streams/.seq per-stream monotonic high-water counter (int) + kv/ one SX-serialized value per key + + Invariants honoured (see plans/persist-on-sx.md Blocker spec): + 1. last-seq is a per-stream monotonic counter stored in .seq, SEPARATE from + the rows — it keeps climbing across truncate, so a compacted stream never + reassigns a seq. + 2. append never renumbers — the event already carries its :seq (log.sx does + last-seq+1); the host only bumps the high-water mark to max(hw, seq). + 3. read returns surviving events in append order with :seq intact. + 4. streams is the set of streams that ever had an append — keyed off the .seq + file, which truncate never deletes, so it survives full compaction. + 5. values round-trip structurally via the SX serializer/parser. *) + +open Sx_types + +(* ---- root dir ---------------------------------------------------------- *) + +let _root : string option ref = ref None + +let set_root dir = _root := Some dir + +let root_dir () = + match !_root with + | Some d -> d + | None -> (try Sys.getenv "SX_PERSIST_DIR" with Not_found -> "persist-data") + +(* ---- filesystem helpers ------------------------------------------------ *) + +let rec ensure_dir dir = + if dir = "" || dir = "." || dir = "/" || Sys.file_exists dir then () + else begin + ensure_dir (Filename.dirname dir); + (try Unix.mkdir dir 0o755 with Unix.Unix_error (Unix.EEXIST, _, _) -> ()) + end + +let streams_dir () = Filename.concat (root_dir ()) "streams" +let kv_dir () = Filename.concat (root_dir ()) "kv" + +let read_file path = + let ic = open_in_bin path in + let n = in_channel_length ic in + let s = really_input_string ic n in + close_in ic; + s + +(* Atomic write: temp file in the same dir then rename over the target. *) +let write_file_atomic path contents = + ensure_dir (Filename.dirname path); + let tmp = path ^ ".tmp" in + let oc = open_out_bin tmp in + output_string oc contents; + flush oc; + close_out oc; + Sys.rename tmp path + +let append_line path line = + ensure_dir (Filename.dirname path); + let oc = open_out_gen [Open_append; Open_creat; Open_wronly] 0o644 path in + output_string oc line; + output_char oc '\n'; + close_out oc + +(* ---- name <-> filename (hex, reversible, fs-safe) ---------------------- *) + +let hex_encode s = + let b = Buffer.create (String.length s * 2) in + String.iter (fun c -> Buffer.add_string b (Printf.sprintf "%02x" (Char.code c))) s; + Buffer.contents b + +let hex_decode s = + let n = String.length s / 2 in + String.init n (fun i -> Char.chr (int_of_string ("0x" ^ String.sub s (i * 2) 2))) + +let stream_log stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".log") +let stream_seq stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".seq") +let kv_path key = Filename.concat (kv_dir ()) (hex_encode key) + +(* ---- value <-> SX text (round-trips through Sx_parser) ----------------- *) + +let escape_str s = + let len = String.length s in + let buf = Buffer.create (len + 16) in + for i = 0 to len - 1 do + match s.[i] with + | '"' -> Buffer.add_string buf "\\\"" + | '\\' -> Buffer.add_string buf "\\\\" + | '\n' -> Buffer.add_string buf "\\n" + | '\r' -> Buffer.add_string buf "\\r" + | '\t' -> Buffer.add_string buf "\\t" + | c -> Buffer.add_char buf c + done; + Buffer.contents buf + +let rec serialize = function + | Nil -> "nil" + | Bool true -> "true" + | Bool false -> "false" + | Integer n -> string_of_int n + | Number n -> format_number n + | String s -> "\"" ^ escape_str s ^ "\"" + | Symbol s -> "(quote " ^ s ^ ")" + | Keyword k -> ":" ^ k + | List items | ListRef { contents = items } -> + "(list" ^ (List.fold_left (fun acc v -> acc ^ " " ^ serialize v) "" items) ^ ")" + | Dict d -> + let pairs = Hashtbl.fold (fun k v acc -> + (Printf.sprintf ":%s %s" k (serialize v)) :: acc) d [] in + "{" ^ String.concat " " (List.sort String.compare pairs) ^ "}" + | _ -> "nil" + +(* Parse one serialized value back. Empty / blank -> Nil. *) +let rec deserialize line = + let line = String.trim line in + if line = "" then Nil + else match Sx_parser.parse_all line with + | v :: _ -> eval_quote_lists v + | [] -> Nil + +(* serialize emits lists as `(list ...)` and symbols as `(quote s)` so the + parser yields data, not a call — but the parser leaves those as AST. Walk + the parsed AST and collapse `(list ...)`/`(quote s)` back to values. *) +and eval_quote_lists v = + match v with + | List (Symbol "quote" :: x :: []) -> x + | List (Symbol "list" :: rest) -> List (List.map eval_quote_lists rest) + | List items -> List (List.map eval_quote_lists items) + | ListRef { contents = items } -> List (List.map eval_quote_lists items) + | Dict d -> + let d' = Hashtbl.create (Hashtbl.length d) in + Hashtbl.iter (fun k v -> Hashtbl.replace d' k (eval_quote_lists v)) d; + Dict d' + | other -> other + +(* ---- seq counter ------------------------------------------------------- *) + +let read_seq stream = + let p = stream_seq stream in + if Sys.file_exists p then (try int_of_string (String.trim (read_file p)) with _ -> 0) + else 0 + +let write_seq stream n = write_file_atomic (stream_seq stream) (string_of_int n) + +let value_to_int = function + | Integer n -> n + | Number n -> int_of_float n + | _ -> 0 + +let event_seq ev = + match ev with + | Dict d -> (match Hashtbl.find_opt d "seq" with Some v -> value_to_int v | None -> 0) + | _ -> 0 + +(* ---- ops --------------------------------------------------------------- *) + +let do_append stream ev = + ensure_dir (streams_dir ()); + (* bump the monotonic high-water mark; create .seq on first append so the + stream shows up in `streams` and survives later truncation. *) + let hw = read_seq stream in + let s = event_seq ev in + write_seq stream (max hw s); + append_line (stream_log stream) (serialize ev) + +let do_read stream = + let p = stream_log stream in + if not (Sys.file_exists p) then List [] + else begin + let content = read_file p in + let lines = String.split_on_char '\n' content in + let evs = List.filter_map (fun l -> + if String.trim l = "" then None else Some (deserialize l)) lines in + List evs + end + +let do_last_seq stream = Number (float_of_int (read_seq stream)) + +let list_dir_suffix dir suffix = + if not (Sys.file_exists dir) then [] + else + Array.to_list (Sys.readdir dir) + |> List.filter (fun f -> Filename.check_suffix f suffix) + |> List.map (fun f -> hex_decode (Filename.chop_suffix f suffix)) + |> List.sort String.compare + +let do_streams () = List (List.map (fun s -> String s) (list_dir_suffix (streams_dir ()) ".seq")) + +(* drop events with seq <= n; the .seq high-water counter is untouched. *) +let do_truncate stream n = + let p = stream_log stream in + if Sys.file_exists p then begin + let evs = match do_read stream with List l -> l | _ -> [] in + let kept = List.filter (fun ev -> event_seq ev > n) evs in + let body = String.concat "" (List.map (fun ev -> serialize ev ^ "\n") kept) in + write_file_atomic p body + end + +let do_kv_get key = + let p = kv_path key in + if Sys.file_exists p then deserialize (read_file p) else Nil + +let do_kv_put key v = + ensure_dir (kv_dir ()); + write_file_atomic (kv_path key) (serialize v) + +let do_kv_delete key = + let p = kv_path key in + if Sys.file_exists p then (try Sys.remove p with _ -> ()) + +let do_kv_has key = Bool (Sys.file_exists (kv_path key)) + +let do_kv_keys () = + if not (Sys.file_exists (kv_dir ())) then List [] + else + List ( + Array.to_list (Sys.readdir (kv_dir ())) + |> List.map hex_decode + |> List.sort String.compare + |> List.map (fun s -> String s)) + +(* ---- dispatch ---------------------------------------------------------- *) + +let arglist = function + | List l | ListRef { contents = l } -> l + | Nil -> [] + | v -> [v] + +(* Returns Some response if op is a persist op this store owns, None otherwise. *) +let handle_op op args = + let a = arglist args in + let str = function String s -> s | v -> value_to_string v in + match op with + | "persist/append" -> + (match a with stream :: ev :: _ -> do_append (str stream) ev | _ -> ()); Some Nil + | "persist/read" -> + (match a with stream :: _ -> Some (do_read (str stream)) | _ -> Some (List [])) + | "persist/last-seq" -> + (match a with stream :: _ -> Some (do_last_seq (str stream)) | _ -> Some (Number 0.0)) + | "persist/streams" -> Some (do_streams ()) + | "persist/truncate" -> + (match a with stream :: n :: _ -> do_truncate (str stream) (value_to_int n) | _ -> ()); Some Nil + | "persist/kv-get" -> + (match a with key :: _ -> Some (do_kv_get (str key)) | _ -> Some Nil) + | "persist/kv-put" -> + (match a with key :: v :: _ -> do_kv_put (str key) v | _ -> ()); Some Nil + | "persist/kv-delete" -> + (match a with key :: _ -> do_kv_delete (str key) | _ -> ()); Some Nil + | "persist/kv-has?" -> + (match a with key :: _ -> Some (do_kv_has (str key)) | _ -> Some (Bool false)) + | "persist/kv-keys" -> Some (do_kv_keys ()) + | _ -> None diff --git a/hosts/ocaml/test/persist_durable_test.sh b/hosts/ocaml/test/persist_durable_test.sh new file mode 100755 index 00000000..45b3fcbd --- /dev/null +++ b/hosts/ocaml/test/persist_durable_test.sh @@ -0,0 +1,118 @@ +#!/usr/bin/env bash +# hosts/ocaml/test/persist_durable_test.sh +# Acceptance test for the host durable-storage adapter (Sx_persist_store). +# +# Exercises `persist/durable-backend` (REAL `perform`, not the mock) under the +# WORKTREE-built sx_server.exe, and asserts: +# 1. durable: writes land on disk and read back (the silent-data-loss repro +# from plans/persist-on-sx.md now returns correct values). +# 2. last-seq is monotonic across truncate (compaction never reassigns a seq). +# 3. kv ops round-trip and delete. +# 4. recovery: a REAL process restart (write, exit, fresh process, replay) +# recovers state from disk. +# +# Run from repo root or anywhere; locates the worktree binary relative to itself. +set -uo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT="$(cd "$HERE/../../.." && pwd)" # repo/worktree root +cd "$ROOT" + +SX="hosts/ocaml/_build/default/bin/sx_server.exe" +if [ ! -x "$SX" ]; then + echo "ERROR: worktree binary not found at $SX — build it first:" >&2 + echo " (cd hosts/ocaml && dune build bin/sx_server.exe)" >&2 + exit 1 +fi + +DATADIR="$(mktemp -d)" +trap 'rm -rf "$DATADIR"' EXIT + +PASS=0 +FAIL=0 +check() { # check