from __future__ import annotations import path_setup # noqa: F401 # adds shared_lib to sys.path from pathlib import Path from quart import g, abort, render_template, make_response from jinja2 import FileSystemLoader, ChoiceLoader from sqlalchemy import select from shared.infrastructure.factory import create_base_app from shared.config import config from bp import register_market_bp async def market_context() -> dict: """ Market app context processor. - menu_items: direct DB query - cart/cart_count/cart_total: direct DB (market owns CartItem/Product) """ from shared.infrastructure.context import base_context from shared.services.navigation import get_navigation_tree from shared.infrastructure.cart_identity import current_cart_identity from shared.models.market import CartItem from sqlalchemy.orm import selectinload ctx = await base_context() ctx["menu_items"] = await get_navigation_tree(g.s) # Market owns CartItem/Product — query directly for template compat ident = current_cart_identity() filters = [CartItem.deleted_at.is_(None)] if ident["user_id"]: filters.append(CartItem.user_id == ident["user_id"]) elif ident["session_id"]: filters.append(CartItem.session_id == ident["session_id"]) else: ctx["cart"] = [] ctx["cart_count"] = 0 ctx["cart_total"] = 0 return ctx result = await g.s.execute( select(CartItem).where(*filters).options(selectinload(CartItem.product)) ) cart_items = list(result.scalars().all()) ctx["cart"] = cart_items ctx["cart_count"] = sum(ci.quantity for ci in cart_items) ctx["cart_total"] = float(sum( (ci.product.special_price or ci.product.regular_price or 0) * ci.quantity for ci in cart_items if ci.product )) return ctx def create_app() -> "Quart": from models.market_place import MarketPlace from shared.services.registry import services from services import register_domain_services app = create_base_app( "market", context_fn=market_context, domain_services_fn=register_domain_services, ) # App-specific templates override shared templates app_templates = str(Path(__file__).resolve().parent / "templates") app.jinja_loader = ChoiceLoader([ FileSystemLoader(app_templates), app.jinja_loader, ]) # Market blueprint nested under post slug: /// app.register_blueprint( register_market_bp( url_prefix="/", title=config()["coop_title"], ), url_prefix="//", ) # --- Auto-inject page_slug and market_slug into url_for() calls --- @app.url_value_preprocessor def pull_slugs(endpoint, values): if values: if "page_slug" in values: g.post_slug = values.pop("page_slug") if "market_slug" in values: g.market_slug = values.pop("market_slug") @app.url_defaults def inject_slugs(endpoint, values): for attr, param in [("post_slug", "page_slug"), ("market_slug", "market_slug")]: val = g.get(attr) if val and param not in values: if app.url_map.is_endpoint_expecting(endpoint, param): values[param] = val # --- Load post and market data --- @app.before_request async def hydrate_market(): post_slug = getattr(g, "post_slug", None) market_slug = getattr(g, "market_slug", None) if not post_slug or not market_slug: return # Load post by slug via blog service post = await services.blog.get_post_by_slug(g.s, post_slug) if not post: abort(404) g.post_data = { "post": { "id": post.id, "title": post.title, "slug": post.slug, "feature_image": post.feature_image, "html": post.html, "status": post.status, "visibility": post.visibility, "is_page": post.is_page, }, } # Load market scoped to post (container pattern) market = ( await g.s.execute( select(MarketPlace).where( MarketPlace.slug == market_slug, MarketPlace.container_type == "page", MarketPlace.container_id == post.id, MarketPlace.deleted_at.is_(None), ) ) ).scalar_one_or_none() if not market: abort(404) g.market = market @app.context_processor async def inject_post(): post_data = getattr(g, "post_data", None) if not post_data: return {} return {**post_data} # --- Root route: market listing --- @app.get("/") async def markets_listing(): result = await g.s.execute( select(MarketPlace) .where(MarketPlace.deleted_at.is_(None), MarketPlace.container_type == "page") .order_by(MarketPlace.name) ) all_markets = result.scalars().all() # Resolve page posts via blog service post_ids = list({m.container_id for m in all_markets}) posts_by_id = { p.id: p for p in await services.blog.get_posts_by_ids(g.s, post_ids) } markets = [] for market in all_markets: market.page = posts_by_id.get(market.container_id) markets.append(market) html = await render_template( "_types/market/markets_listing.html", markets=markets, ) return await make_response(html) return app app = create_app()