This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/services/stubs.py
giles 6e438dbfdc Add upcoming_entries_for_container to CalendarService
New paginated query for upcoming confirmed entries across all calendars
belonging to a container (page). Used by the events page summary view.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 22:28:18 +00:00

304 lines
9.5 KiB
Python

"""No-op stub services for absent domains.
When an app starts without a particular domain, it registers the stub
so that ``services.X.method()`` returns empty/None rather than crashing.
"""
from __future__ import annotations
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from shared.contracts.dtos import (
PostDTO,
CalendarDTO,
CalendarEntryDTO,
TicketDTO,
MarketPlaceDTO,
ProductDTO,
CartItemDTO,
CartSummaryDTO,
ActorProfileDTO,
APActivityDTO,
APFollowerDTO,
)
class StubBlogService:
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
return None
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
return None
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
return []
async def search_posts(self, session, query, page=1, per_page=10):
return [], 0
class StubCalendarService:
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
return []
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
return []
async def entries_for_page(
self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
return []
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
return None
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
return [], False
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
return False
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
pass
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
pass
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
return []
async def user_tickets(
self, session: AsyncSession, *, user_id: int,
) -> list[TicketDTO]:
return []
async def user_bookings(
self, session: AsyncSession, *, user_id: int,
) -> list[CalendarEntryDTO]:
return []
async def confirmed_entries_for_posts(
self, session: AsyncSession, post_ids: list[int],
) -> dict[int, list[CalendarEntryDTO]]:
return {}
async def pending_tickets(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[TicketDTO]:
return []
async def tickets_for_page(
self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None,
) -> list[TicketDTO]:
return []
async def claim_tickets_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
pass
async def confirm_tickets_for_order(
self, session: AsyncSession, order_id: int,
) -> None:
pass
async def get_tickets_for_order(
self, session: AsyncSession, order_id: int,
) -> list[TicketDTO]:
return []
async def adopt_tickets_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass
async def adjust_ticket_quantity(
self, session, entry_id, count, *, user_id, session_id, ticket_type_id=None,
) -> int:
return 0
async def upcoming_entries_for_container(self, session, container_type, container_id, *, page=1, per_page=20):
return [], False
async def entry_ids_for_content(self, session, content_type, content_id):
return set()
async def visible_entries_for_period(self, session, calendar_id, period_start, period_end, *, user_id, is_admin, session_id):
return []
class StubMarketService:
async def marketplaces_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[MarketPlaceDTO]:
return []
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None:
return None
async def create_marketplace(
self, session: AsyncSession, container_type: str, container_id: int,
name: str, slug: str,
) -> MarketPlaceDTO:
raise RuntimeError("MarketService not available")
async def soft_delete_marketplace(
self, session: AsyncSession, container_type: str, container_id: int,
slug: str,
) -> bool:
return False
class StubCartService:
async def cart_summary(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
page_slug: str | None = None,
) -> CartSummaryDTO:
return CartSummaryDTO()
async def cart_items(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CartItemDTO]:
return []
async def adopt_cart_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass
class StubFederationService:
"""No-op federation stub for apps that don't own federation."""
async def get_actor_by_username(self, session, username):
return None
async def get_actor_by_user_id(self, session, user_id):
return None
async def create_actor(self, session, user_id, preferred_username,
display_name=None, summary=None):
raise RuntimeError("FederationService not available")
async def username_available(self, session, username):
return False
async def publish_activity(self, session, *, actor_user_id, activity_type,
object_type, object_data, source_type=None,
source_id=None):
return None
async def get_activity(self, session, activity_id):
return None
async def get_outbox(self, session, username, page=1, per_page=20):
return [], 0
async def get_activity_for_source(self, session, source_type, source_id):
return None
async def count_activities_for_source(self, session, source_type, source_id, *, activity_type):
return 0
async def get_followers(self, session, username):
return []
async def add_follower(self, session, username, follower_acct, follower_inbox,
follower_actor_url, follower_public_key=None):
raise RuntimeError("FederationService not available")
async def remove_follower(self, session, username, follower_acct):
return False
async def get_or_fetch_remote_actor(self, session, actor_url):
return None
async def search_remote_actor(self, session, acct):
return None
async def send_follow(self, session, local_username, remote_actor_url):
raise RuntimeError("FederationService not available")
async def get_following(self, session, username, page=1, per_page=20):
return [], 0
async def get_followers_paginated(self, session, username, page=1, per_page=20):
return [], 0
async def accept_follow_response(self, session, local_username, remote_actor_url):
pass
async def unfollow(self, session, local_username, remote_actor_url):
pass
async def ingest_remote_post(self, session, remote_actor_id, activity_json, object_json):
pass
async def delete_remote_post(self, session, object_id):
pass
async def get_remote_post(self, session, object_id):
return None
async def get_home_timeline(self, session, actor_profile_id, before=None, limit=20):
return []
async def get_public_timeline(self, session, before=None, limit=20):
return []
async def get_actor_timeline(self, session, remote_actor_id, before=None, limit=20):
return []
async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None):
raise RuntimeError("FederationService not available")
async def delete_local_post(self, session, actor_profile_id, post_id):
raise RuntimeError("FederationService not available")
async def like_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def unlike_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def boost_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def unboost_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def get_notifications(self, session, actor_profile_id, before=None, limit=20):
return []
async def unread_notification_count(self, session, actor_profile_id):
return 0
async def mark_notifications_read(self, session, actor_profile_id):
pass
async def get_stats(self, session):
return {"actors": 0, "activities": 0, "followers": 0}