(* sx_persist_store — host durable-storage adapter for lib/persist. Production twin of `persist/serve` (lib/persist/durable.sx): it answers the same `persist/...` IO ops, but backs them with real on-disk storage so writes survive a process restart. Stateless-on-disk: every op reads/writes the filesystem directly, so a fresh process recovers state with no warm-up — the log on disk IS the state. On-disk layout under the root dir (default ./persist-data, or $SX_PERSIST_DIR): streams/.log append-only, one SX-serialized event per line streams/.seq per-stream monotonic high-water counter (int) kv/ one SX-serialized value per key Invariants honoured (see plans/persist-on-sx.md Blocker spec): 1. last-seq is a per-stream monotonic counter stored in .seq, SEPARATE from the rows — it keeps climbing across truncate, so a compacted stream never reassigns a seq. 2. append never renumbers — the event already carries its :seq (log.sx does last-seq+1); the host only bumps the high-water mark to max(hw, seq). 3. read returns surviving events in append order with :seq intact. 4. streams is the set of streams that ever had an append — keyed off the .seq file, which truncate never deletes, so it survives full compaction. 5. values round-trip structurally via the SX serializer/parser. *) open Sx_types (* ---- root dir ---------------------------------------------------------- *) let _root : string option ref = ref None let set_root dir = _root := Some dir let root_dir () = match !_root with | Some d -> d | None -> (try Sys.getenv "SX_PERSIST_DIR" with Not_found -> "persist-data") (* ---- filesystem helpers ------------------------------------------------ *) let rec ensure_dir dir = if dir = "" || dir = "." || dir = "/" || Sys.file_exists dir then () else begin ensure_dir (Filename.dirname dir); (try Unix.mkdir dir 0o755 with Unix.Unix_error (Unix.EEXIST, _, _) -> ()) end let streams_dir () = Filename.concat (root_dir ()) "streams" let kv_dir () = Filename.concat (root_dir ()) "kv" let blobs_dir () = Filename.concat (root_dir ()) "blobs" let read_file path = let ic = open_in_bin path in let n = in_channel_length ic in let s = really_input_string ic n in close_in ic; s (* Atomic write: temp file in the same dir then rename over the target. *) let write_file_atomic path contents = ensure_dir (Filename.dirname path); let tmp = path ^ ".tmp" in let oc = open_out_bin tmp in output_string oc contents; flush oc; close_out oc; Sys.rename tmp path let append_line path line = ensure_dir (Filename.dirname path); let oc = open_out_gen [Open_append; Open_creat; Open_wronly] 0o644 path in output_string oc line; output_char oc '\n'; close_out oc (* ---- name <-> filename (hex, reversible, fs-safe) ---------------------- *) let hex_encode s = let b = Buffer.create (String.length s * 2) in String.iter (fun c -> Buffer.add_string b (Printf.sprintf "%02x" (Char.code c))) s; Buffer.contents b let hex_decode s = let n = String.length s / 2 in String.init n (fun i -> Char.chr (int_of_string ("0x" ^ String.sub s (i * 2) 2))) let stream_log stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".log") let stream_seq stream = Filename.concat (streams_dir ()) (hex_encode stream ^ ".seq") let kv_path key = Filename.concat (kv_dir ()) (hex_encode key) (* ---- value <-> SX text (round-trips through Sx_parser) ----------------- *) let escape_str s = let len = String.length s in let buf = Buffer.create (len + 16) in for i = 0 to len - 1 do match s.[i] with | '"' -> Buffer.add_string buf "\\\"" | '\\' -> Buffer.add_string buf "\\\\" | '\n' -> Buffer.add_string buf "\\n" | '\r' -> Buffer.add_string buf "\\r" | '\t' -> Buffer.add_string buf "\\t" | c -> Buffer.add_char buf c done; Buffer.contents buf let rec serialize = function | Nil -> "nil" | Bool true -> "true" | Bool false -> "false" | Integer n -> string_of_int n | Number n -> format_number n | String s -> "\"" ^ escape_str s ^ "\"" | Symbol s -> "(quote " ^ s ^ ")" | Keyword k -> ":" ^ k | List items | ListRef { contents = items } -> "(list" ^ (List.fold_left (fun acc v -> acc ^ " " ^ serialize v) "" items) ^ ")" | Dict d -> let pairs = Hashtbl.fold (fun k v acc -> (Printf.sprintf ":%s %s" k (serialize v)) :: acc) d [] in "{" ^ String.concat " " (List.sort String.compare pairs) ^ "}" | _ -> "nil" (* Parse one serialized value back. Empty / blank -> Nil. *) let rec deserialize line = let line = String.trim line in if line = "" then Nil else match Sx_parser.parse_all line with | v :: _ -> eval_quote_lists v | [] -> Nil (* serialize emits lists as `(list ...)` and symbols as `(quote s)` so the parser yields data, not a call — but the parser leaves those as AST. Walk the parsed AST and collapse `(list ...)`/`(quote s)` back to values. *) and eval_quote_lists v = match v with | List (Symbol "quote" :: x :: []) -> x | List (Symbol "list" :: rest) -> List (List.map eval_quote_lists rest) | List items -> List (List.map eval_quote_lists items) | ListRef { contents = items } -> List (List.map eval_quote_lists items) | Dict d -> let d' = Hashtbl.create (Hashtbl.length d) in Hashtbl.iter (fun k v -> Hashtbl.replace d' k (eval_quote_lists v)) d; Dict d' | other -> other (* ---- seq counter ------------------------------------------------------- *) let read_seq stream = let p = stream_seq stream in if Sys.file_exists p then (try int_of_string (String.trim (read_file p)) with _ -> 0) else 0 let write_seq stream n = write_file_atomic (stream_seq stream) (string_of_int n) let value_to_int = function | Integer n -> n | Number n -> int_of_float n | _ -> 0 let event_seq ev = match ev with | Dict d -> (match Hashtbl.find_opt d "seq" with Some v -> value_to_int v | None -> 0) | _ -> 0 (* ---- ops --------------------------------------------------------------- *) let do_append stream ev = ensure_dir (streams_dir ()); (* bump the monotonic high-water mark; create .seq on first append so the stream shows up in `streams` and survives later truncation. *) let hw = read_seq stream in let s = event_seq ev in write_seq stream (max hw s); append_line (stream_log stream) (serialize ev) let do_read stream = let p = stream_log stream in if not (Sys.file_exists p) then List [] else begin let content = read_file p in let lines = String.split_on_char '\n' content in let evs = List.filter_map (fun l -> if String.trim l = "" then None else Some (deserialize l)) lines in List evs end let do_last_seq stream = Number (float_of_int (read_seq stream)) let list_dir_suffix dir suffix = if not (Sys.file_exists dir) then [] else Array.to_list (Sys.readdir dir) |> List.filter (fun f -> Filename.check_suffix f suffix) |> List.map (fun f -> hex_decode (Filename.chop_suffix f suffix)) |> List.sort String.compare let do_streams () = List (List.map (fun s -> String s) (list_dir_suffix (streams_dir ()) ".seq")) (* drop events with seq <= n; the .seq high-water counter is untouched. *) let do_truncate stream n = let p = stream_log stream in if Sys.file_exists p then begin let evs = match do_read stream with List l -> l | _ -> [] in let kept = List.filter (fun ev -> event_seq ev > n) evs in let body = String.concat "" (List.map (fun ev -> serialize ev ^ "\n") kept) in write_file_atomic p body end let do_kv_get key = let p = kv_path key in if Sys.file_exists p then deserialize (read_file p) else Nil let do_kv_put key v = ensure_dir (kv_dir ()); write_file_atomic (kv_path key) (serialize v) let do_kv_delete key = let p = kv_path key in if Sys.file_exists p then (try Sys.remove p with _ -> ()) let do_kv_has key = Bool (Sys.file_exists (kv_path key)) let do_kv_keys () = if not (Sys.file_exists (kv_dir ())) then List [] else List ( Array.to_list (Sys.readdir (kv_dir ())) |> List.map hex_decode |> List.sort String.compare |> List.map (fun s -> String s)) (* ---- blob store (content-addressed) ------------------------------------ *) (* Same pattern as the persist ops, but a SEPARATE adapter: large objects live in a content-addressed directory keyed by a CIDv1 (raw codec, sha2-256). persist only ever stores the returned ref ({:cid :size :mime}), never bytes. blob/put is idempotent — identical bytes hash to the same cid + same file. *) let codec_raw = 0x55 let blob_cid bytes = let digest = Sx_cid.unhex (Sx_sha2.sha256_hex bytes) in Sx_cid.cidv1 codec_raw (Sx_cid.multihash Sx_cid.mh_sha2_256 digest) let blob_path cid = Filename.concat (blobs_dir ()) cid let do_blob_put bytes = let cid = blob_cid bytes in let p = blob_path cid in if not (Sys.file_exists p) then write_file_atomic p bytes; String cid let do_blob_get cid = let p = blob_path cid in if Sys.file_exists p then String (read_file p) else Nil let do_blob_has cid = Bool (Sys.file_exists (blob_path cid)) (* ---- dispatch ---------------------------------------------------------- *) let arglist = function | List l | ListRef { contents = l } -> l | Nil -> [] | v -> [v] (* Returns Some response if op is a persist op this store owns, None otherwise. *) let handle_op op args = let a = arglist args in let str = function String s -> s | v -> value_to_string v in match op with | "persist/append" -> (match a with stream :: ev :: _ -> do_append (str stream) ev | _ -> ()); Some Nil | "persist/read" -> (match a with stream :: _ -> Some (do_read (str stream)) | _ -> Some (List [])) | "persist/last-seq" -> (match a with stream :: _ -> Some (do_last_seq (str stream)) | _ -> Some (Number 0.0)) | "persist/streams" -> Some (do_streams ()) | "persist/truncate" -> (match a with stream :: n :: _ -> do_truncate (str stream) (value_to_int n) | _ -> ()); Some Nil | "persist/kv-get" -> (match a with key :: _ -> Some (do_kv_get (str key)) | _ -> Some Nil) | "persist/kv-put" -> (match a with key :: v :: _ -> do_kv_put (str key) v | _ -> ()); Some Nil | "persist/kv-delete" -> (match a with key :: _ -> do_kv_delete (str key) | _ -> ()); Some Nil | "persist/kv-has?" -> (match a with key :: _ -> Some (do_kv_has (str key)) | _ -> Some (Bool false)) | "persist/kv-keys" -> Some (do_kv_keys ()) | "blob/put" -> (match a with bytes :: _ -> Some (do_blob_put (str bytes)) | _ -> Some Nil) | "blob/get" -> (match a with cid :: _ -> Some (do_blob_get (str cid)) | _ -> Some Nil) | "blob/has?" -> (match a with cid :: _ -> Some (do_blob_has (str cid)) | _ -> Some (Bool false)) | _ -> None