; lib/blogimport/source.sx ; Live source adapter โ€” Q-M4 RESOLVED: import via the blog INTERNAL-DATA QUERY ; surface (decoupled), not direct Postgres. Reuses the existing query contracts ; (blog/queries.sx: post-by-id/post-by-slug/posts-by-ids) and keeps the importer in ; the SX/host world (plans/migration/data-migration.md ยง7 recommended default). ; ; TRANSPORT SEAM (hexagonal, like every other subsystem): a `fetch-fn` port is ; INJECTED. Contract: ; (fetch-fn query-name params-dict) -> response-data ; In production `fetch-fn` is the host's HMAC-signed fetch_data wrapper ; (GET /internal/data/{query}); in tests it's a mock. The importer never knows how ; the bytes arrive. ; ; RESPONSE CONTRACT (one published-post row), the blog `get-post-by-*` data handler: ; {:uuid|:id :slug :title :status :visibility :tags :authors :lexical} ; :lexical is the Ghost body as a JSON STRING (the Post.lexical DB column) โ€” parsed ; here with dream-json-parse into the SX dict shape blogimport/lex-blocks expects. ; (If a handler returns :lexical already-structured, it is used as-is.) ; ; REQUIRED BLOG-SIDE ADDITION (the one gap โ€” draft in drafts/published-posts.sx): ; the migration needs a `published-posts` query that returns full published-post ROWS ; INCLUDING the raw `:lexical` body. The existing post-by-id/slug providers return a ; PostDTO that carries sx_content/html but NOT lexical (blog/services/__init__.py ; _post_to_dto), so they cannot feed the canonical lexical->blocks converter. One new ; provider (Python list_published_posts over list_posts(status="published"), ; blog/bp/blog/ghost_db.py:102) covers both enumeration AND bodies in one batch call. ; Mocked here against that contract; see drafts/ for the paste-ready blog-side change. (define blogimport/dep-json-parse dream-json-parse) ; --- lexical field -> SX dict (string from DB column, or already structured) ----- (define blogimport/parse-lexical (fn (lx) (cond ((equal? lx nil) {:root {:children (list)}}) ((string? lx) (blogimport/dep-json-parse lx)) (else lx)))) ; --- service post-row -> importer `post` dict ----------------------------------- (define blogimport/parse-row (fn (row) {:id (or (get row :uuid) (get row :id)) :slug (or (get row :slug) "") :title (or (get row :title) "") :status (or (get row :status) "") :visibility (or (get row :visibility) "") :tags (or (get row :tags) (list)) :authors (or (get row :authors) (list)) :lexical (blogimport/parse-lexical (get row :lexical))})) ; --- the published-post rows from the live source (one batch query) ------------- (define blogimport/source-rows (fn (fetch-fn) (fetch-fn "published-posts" {}))) ; --- all published posts as importer `post` dicts ------------------------------- (define blogimport/source-posts (fn (fetch-fn) (map blogimport/parse-row (blogimport/source-rows fetch-fn)))) ; --- end-to-end drivers --------------------------------------------------------- ; backfill = enumerate+fetch -> genesis-import (idempotent). Re-runnable as the ; one-way DB->persist sync (data-migration.md Strategy 1). (define blogimport/backfill! (fn (b fetch-fn at) (blogimport/import-all! b (blogimport/source-posts fetch-fn) at))) ; partial backfill: client-side filter to a subset of ids (no extra blog query). (define blogimport/backfill-ids! (fn (b fetch-fn ids at) (blogimport/import-all! b (filter (fn (p) (contains? ids (get p :id))) (blogimport/source-posts fetch-fn)) at))) ; sync-verify = fetch -> shadow-diff the persisted streams at rest. (define blogimport/sync-verify (fn (b fetch-fn) (blogimport/verify-all b (blogimport/source-posts fetch-fn))))