from __future__ import annotations import path_setup # noqa: F401 # adds shared/ to sys.path from pathlib import Path from quart import g, abort, request from jinja2 import FileSystemLoader, ChoiceLoader from sqlalchemy import select from shared.infrastructure.factory import create_base_app from shared.config import config from bp import register_market_bp, register_all_markets, register_page_markets, register_fragments async def market_context() -> dict: """ Market app context processor. - nav_tree_html: fetched from blog as fragment - cart_count/cart_total: via cart service (includes calendar entries) - cart: direct ORM query (templates need .product relationship) """ from shared.infrastructure.context import base_context from shared.services.navigation import get_navigation_tree from shared.services.registry import services from shared.infrastructure.cart_identity import current_cart_identity from shared.infrastructure.fragments import fetch_fragment from shared.models.market import CartItem from sqlalchemy.orm import selectinload ctx = await base_context() ctx["nav_tree_html"] = await fetch_fragment( "blog", "nav-tree", params={"app_name": "market", "path": request.path}, ) # Fallback for _nav.html when nav-tree fragment fetch fails ctx["menu_items"] = await get_navigation_tree(g.s) ident = current_cart_identity() # cart_count/cart_total via service (consistent with blog/events apps) summary = await services.cart.cart_summary( g.s, user_id=ident["user_id"], session_id=ident["session_id"], ) ctx["cart_count"] = summary.count + summary.calendar_count ctx["cart_total"] = float(summary.total + summary.calendar_total) # ORM cart items for product templates (need .product relationship) filters = [CartItem.deleted_at.is_(None)] if ident["user_id"] is not None: filters.append(CartItem.user_id == ident["user_id"]) elif ident["session_id"] is not None: filters.append(CartItem.session_id == ident["session_id"]) else: ctx["cart"] = [] return ctx result = await g.s.execute( select(CartItem).where(*filters).options(selectinload(CartItem.product)) ) ctx["cart"] = list(result.scalars().all()) return ctx def create_app() -> "Quart": from models.market_place import MarketPlace from shared.services.registry import services from services import register_domain_services app = create_base_app( "market", context_fn=market_context, domain_services_fn=register_domain_services, ) # App-specific templates override shared templates app_templates = str(Path(__file__).resolve().parent / "templates") app.jinja_loader = ChoiceLoader([ FileSystemLoader(app_templates), app.jinja_loader, ]) # All markets: / — global view across all pages app.register_blueprint( register_all_markets(), url_prefix="/", ) # Page markets: // — markets for a single page app.register_blueprint( register_page_markets(), url_prefix="/", ) # Market blueprint nested under post slug: /// app.register_blueprint( register_market_bp( url_prefix="/", title=config()["market_title"], ), url_prefix="//", ) app.register_blueprint(register_fragments()) # --- Auto-inject slugs into url_for() calls --- @app.url_value_preprocessor def pull_slugs(endpoint, values): if values: # page_markets blueprint uses "slug" if "slug" in values: g.post_slug = values.pop("slug") # market blueprint uses "page_slug" / "market_slug" if "page_slug" in values: g.post_slug = values.pop("page_slug") if "market_slug" in values: g.market_slug = values.pop("market_slug") @app.url_defaults def inject_slugs(endpoint, values): slug = g.get("post_slug") if slug: for param in ("slug", "page_slug"): if param not in values and app.url_map.is_endpoint_expecting(endpoint, param): values[param] = slug market_slug = g.get("market_slug") if market_slug and "market_slug" not in values: if app.url_map.is_endpoint_expecting(endpoint, "market_slug"): values["market_slug"] = market_slug # --- Load post and market data --- @app.before_request async def hydrate_market(): post_slug = getattr(g, "post_slug", None) market_slug = getattr(g, "market_slug", None) if not post_slug: return # Load post by slug via blog service post = await services.blog.get_post_by_slug(g.s, post_slug) if not post: abort(404) g.post_data = { "post": { "id": post.id, "title": post.title, "slug": post.slug, "feature_image": post.feature_image, "html": post.html, "status": post.status, "visibility": post.visibility, "is_page": post.is_page, }, } # Only load market when market_slug is present (///) if not market_slug: return market = ( await g.s.execute( select(MarketPlace).where( MarketPlace.slug == market_slug, MarketPlace.container_type == "page", MarketPlace.container_id == post.id, MarketPlace.deleted_at.is_(None), ) ) ).scalar_one_or_none() if not market: abort(404) g.market = market @app.context_processor async def inject_post(): post_data = getattr(g, "post_data", None) if not post_data: return {} return {**post_data} return app app = create_app()