Development Preview · PR #2178 · 33265e7 · built
Skip to content

API Layer

Litestar REST + WebSocket API: controllers, authentication, guards, and channels.

App

app

Litestar application factory.

Creates and configures the Litestar application with all controllers, middleware, exception handlers, plugins, and lifecycle hooks (startup/shutdown).

create_app

create_app(*, config=None, clock=None, overrides=None, _skip_lifecycle_shutdown=False)

Create and configure the Litestar application.

Parameters:

Name Type Description Default
config RootConfig | None

Root company configuration.

None
clock Clock | None

Optional clock seam threaded into the construction phase so a test can drive a deterministic boot (app_state.clock + startup_time); defaults to SystemClock when not supplied.

None
overrides AppOverrides | None

Optional dependency injections (chiefly tests / bespoke wiring); any field left unset is auto-wired from config and the environment. An injected double always wins over the auto-wired one.

None
_skip_lifecycle_shutdown bool

Test-only flag. When True the app is built with an empty on_shutdown list so a shared-app fixture can reuse it across lifespans without tearing down the task engine, message bus, and persistence. Never use in production: shutdown hooks perform critical cleanup.

False

Returns:

Type Description
Litestar

Configured Litestar application.

Source code in src/synthorg/api/app.py
def create_app(
    *,
    config: RootConfig | None = None,
    clock: Clock | None = None,
    overrides: AppOverrides | None = None,
    _skip_lifecycle_shutdown: bool = False,
) -> Litestar:
    """Create and configure the Litestar application.

    Args:
        config: Root company configuration.
        clock: Optional clock seam threaded into the construction phase so a
            test can drive a deterministic boot (``app_state.clock`` +
            ``startup_time``); defaults to ``SystemClock`` when not supplied.
        overrides: Optional dependency injections (chiefly tests / bespoke
            wiring); any field left unset is auto-wired from config and the
            environment. An injected double always wins over the auto-wired one.
        _skip_lifecycle_shutdown: Test-only flag. When ``True`` the app is built
            with an empty ``on_shutdown`` list so a shared-app fixture can reuse
            it across lifespans without tearing down the task engine, message
            bus, and persistence. Never use in production: shutdown hooks
            perform critical cleanup.

    Returns:
        Configured Litestar application.
    """
    ov = overrides or AppOverrides()
    effective_config = config or RootConfig(company_name="default")

    # Activate the structured logging pipeline before any
    # other setup so that auto-wiring, persistence, and bus logs all
    # flow through the configured sinks.  Respects SYNTHORG_LOG_DIR
    # env var for Docker log directory override.
    try:
        effective_config = _bootstrap_app_logging(effective_config)
    except Exception as exc:
        print(  # noqa: T201
            f"CRITICAL: Failed to initialise logging pipeline: {safe_error_description(exc)}. "  # noqa: E501
            "Check SYNTHORG_LOG_DIR, SYNTHORG_LOG_LEVEL, and the "
            "'logging' section of your config file.",
            file=sys.stderr,
            flush=True,
        )
        raise

    api_config = effective_config.api

    # Auto-wire persistence + artifact storage from the CLI-provided env vars
    # (unless injected); the raw env values flow through for downstream wiring.
    boot = resolve_boot_persistence(
        persistence=ov.persistence,
        artifact_storage=ov.artifact_storage,
    )

    # Build every persistence-independent service, compose + populate each
    # feature's state slice (via ``run_construction_wiring``), and return the
    # collaborators the composition root threads into route assembly, the
    # lifespan hooks, and the Litestar build.
    result = build_construction_services(
        effective_config=effective_config,
        api_config=api_config,
        overrides=ov,
        boot=boot,
        clock=clock,
    )
    app_state = result.app_state

    # Route registration is discovery-based: collect every feature manifest's
    # controllers (api-mounted vs root-mounted) + websocket handlers, evaluating
    # each ControllerRegistration predicate against the constructed AppState so
    # a disabled or unwired subsystem's routes are not registered at all (404).
    api_handlers, root_handlers = collect_route_handlers(app_state)
    api_router = Router(
        path=api_config.api_prefix,
        route_handlers=api_handlers,
        guards=[require_password_changed],
    )

    startup, shutdown = assemble_lifespan_hooks(
        app_state,
        persistence=boot.persistence,
        message_bus=result.message_bus,
        bridge=result.bridge,
        settings_dispatcher=result.settings_dispatcher,
        task_engine=result.task_engine,
        meeting_scheduler=result.meeting_scheduler,
        backup_service=result.backup_service,
        approval_timeout_scheduler=result.approval_timeout_scheduler,
        should_auto_wire_settings=result.should_auto_wire_settings,
        effective_config=effective_config,
        connection_catalog=result.connection_catalog,
        provider_registry=result.provider_registry,
        cost_tracker=result.cost_tracker,
        approval_store=result.approval_store,
        performance_tracker=result.performance_tracker,
        notification_dispatcher=result.notification_dispatcher,
    )

    if _skip_lifecycle_shutdown:
        shutdown = []

    return build_litestar(
        app_state,
        api_config=api_config,
        api_router=api_router,
        root_handlers=root_handlers,
        middleware=result.middleware,
        plugins=result.plugins,
        startup=startup,
        shutdown=shutdown,
        skip_lifecycle_shutdown=_skip_lifecycle_shutdown,
    )

Config

config

API configuration models.

Frozen Pydantic models for CORS, rate limiting, server, authentication, and the top-level ApiConfig that aggregates them all.

CorsConfig pydantic-model

Bases: BaseModel

CORS configuration for the API.

Attributes:

Name Type Description
allowed_origins tuple[str, ...]

Origins permitted to make cross-origin requests.

allow_methods tuple[str, ...]

HTTP methods permitted in cross-origin requests.

allow_headers tuple[str, ...]

Headers permitted in cross-origin requests.

allow_credentials bool

Whether credentials (cookies, auth) are allowed in cross-origin requests.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_wildcard_credentials

allowed_origins pydantic-field

allowed_origins = ()

Origins permitted to make cross-origin requests

allow_methods pydantic-field

allow_methods = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS')

HTTP methods permitted in cross-origin requests

allow_headers pydantic-field

allow_headers = ('Content-Type', 'Authorization', 'X-CSRF-Token')

Headers permitted in cross-origin requests

allow_credentials pydantic-field

allow_credentials = True

Whether credentials (cookies) are allowed

RateLimitTimeUnit

Bases: StrEnum

Valid time windows for rate limiting.

RateLimitConfig pydantic-model

Bases: BaseModel

API rate limiting configuration.

Three tiers stacked around the auth middleware:

  • IP floor (outermost, un-gated): keyed by client IP, applies to every request -- including ones the auth middleware rejects with 401. Guards against flood attacks that burn auth-validation cycles on protected endpoints with forged tokens.
  • Unauthenticated (middle, only when scope["user"] is None): keyed by client IP, aggressive cap on brute-force against login/setup/logout.
  • Authenticated (innermost, only when scope["user"] is set): keyed by user ID, generous cap for normal dashboard use.

Keying authenticated limits by user ID instead of IP prevents multi-user deployments behind a shared gateway or NAT from collectively exhausting a single per-IP budget.

Attributes:

Name Type Description
floor_max_requests int

Maximum total requests per time window (by IP) across the whole API. Catches traffic that auth_middleware rejects before the unauth tier sees it.

unauth_max_requests int

Maximum unauthenticated requests per time window (by IP).

auth_max_requests int

Maximum authenticated requests per time window (by user ID).

time_unit RateLimitTimeUnit

Time window (second, minute, hour, day).

exclude_paths tuple[str, ...]

Paths excluded from rate limiting.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_floor_above_user_tiers
  • _apply_mirrors
  • _reject_legacy_max_requests

floor_max_requests pydantic-field

floor_max_requests = 10000

Maximum total requests per time window (by IP) across the whole API, including requests rejected by the auth middleware. Defense-in-depth against floods of invalid auth attempts on protected endpoints. The floor wraps both user-gated tiers in the middleware stack, so it must be >= auth_max_requests AND >= unauth_max_requests -- a lower floor would silently cap either the authenticated per-user budget or the unauthenticated per-IP budget below its documented value (especially behind a shared NAT where many users share one IP). Enforced by :meth:_validate_floor_above_user_tiers.

unauth_max_requests pydantic-field

unauth_max_requests = 20

Maximum unauthenticated requests per time window (by IP)

auth_max_requests pydantic-field

auth_max_requests = 6000

Maximum authenticated requests per time window (by user ID)

time_unit pydantic-field

time_unit = MINUTE

Time window (second, minute, hour, day)

exclude_paths pydantic-field

exclude_paths = ('/api/v1/healthz', '/api/v1/readyz')

Paths excluded from rate limiting

max_rpm_default pydantic-field

max_rpm_default = 60

Fallback requests-per-minute applied to per-connection coordinators when the catalog does not provide a limiter (mirrors the api.max_rpm_default setting; restart required)

ServerConfig pydantic-model

Bases: BaseModel

Uvicorn server configuration.

Host, port, TLS paths, trusted-proxy list, and the compression / request-size limits are resolved at boot via :func:synthorg.settings.bootstrap_resolver.resolve_init_value against the api.* registry entries rather than carried on this model. Only the worker-process / auto-reload / WebSocket-ping knobs that uvicorn needs at construction time live here.

Attributes:

Name Type Description
reload bool

Enable auto-reload for development.

workers int

Number of worker processes.

ws_ping_interval float

WebSocket ping interval in seconds (0 to disable).

ws_ping_timeout float

WebSocket pong timeout in seconds.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

reload pydantic-field

reload = False

Enable auto-reload for development

workers pydantic-field

workers = 1

Number of worker processes

ws_ping_interval pydantic-field

ws_ping_interval = 20.0

WebSocket ping interval in seconds (0 to disable)

ws_ping_timeout pydantic-field

ws_ping_timeout = 20.0

WebSocket pong timeout in seconds

ApiConfig pydantic-model

Bases: BaseModel

Top-level API configuration aggregating all sub-configs.

Attributes:

Name Type Description
cors CorsConfig

CORS configuration.

rate_limit RateLimitConfig

Global three-tier rate limiting configuration (IP floor un-gated, unauthenticated by IP, authenticated by user ID).

rate_limiter_enabled bool

Master kill switch for the three-tier global rate limiter. Mirrors the api.rate_limiter_enabled registry entry (read_only_post_init=True): the boot-time resolver in api/app.py reads SYNTHORG_API_RATE_LIMITER_ENABLED and falls through to the registered default (env > code default per the Cat-2 precedence model).

per_op_rate_limit PerOpRateLimitConfig

Per-operation throttling configuration (layered on top of the global three-tier limiter).

per_op_concurrency PerOpConcurrencyConfig

Per-operation inflight concurrency capping (layered on top of the sliding-window per-op limiter; caps simultaneous long-running requests per operation per subject).

server ServerConfig

Uvicorn server configuration.

auth AuthConfig

Authentication configuration.

api_prefix NotBlankStr

URL prefix for all API routes.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors

cors pydantic-field

cors

CORS configuration

rate_limit pydantic-field

rate_limit

Global three-tier rate limiting configuration: un-gated IP floor, unauthenticated by IP, authenticated by user ID

rate_limiter_enabled pydantic-field

rate_limiter_enabled = True

Master kill switch for the three-tier global rate limiter. Mirrors the api.rate_limiter_enabled registry entry (read_only_post_init=True): the boot-time resolver in api/app.py reads SYNTHORG_API_RATE_LIMITER_ENABLED and falls through to the registered default (env > code default per the Cat-2 precedence model).

per_op_rate_limit pydantic-field

per_op_rate_limit

Per-operation throttling (layered on the global limiter)

per_op_concurrency pydantic-field

per_op_concurrency

Per-operation inflight concurrency capping (layered on the sliding-window per-op limiter; caps simultaneous long-running requests per (operation, subject))

server pydantic-field

server

Uvicorn server configuration

auth pydantic-field

auth

Authentication configuration

api_prefix pydantic-field

api_prefix = '/api/v1'

URL prefix for all API routes

DTOs

dto

Request/response DTOs and envelope models.

Response envelopes wrap all API responses in a consistent structure. Request DTOs define write-operation payloads (separate from domain models because they omit server-generated fields).

ErrorDetail pydantic-model

Bases: BaseModel

Structured error metadata (RFC 9457).

Self-contained so agents can parse it without referencing the parent envelope.

Attributes:

Name Type Description
detail NotBlankStr

Human-readable occurrence-specific explanation.

error_code ErrorCode

Machine-readable error code (by convention, 4-digit category-grouped; see ErrorCode).

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

instance NotBlankStr

Request correlation ID for log tracing.

title NotBlankStr

Static per-category title (e.g. "Authentication Error").

type NotBlankStr

Documentation URI for the error category.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ProblemDetail pydantic-model

Bases: BaseModel

Bare RFC 9457 application/problem+json response body.

Returned when the client sends Accept: application/problem+json.

Attributes:

Name Type Description
type NotBlankStr

Documentation URI for the error category.

title NotBlankStr

Static per-category title.

status int

HTTP status code.

detail NotBlankStr

Human-readable occurrence-specific explanation.

instance NotBlankStr

Request correlation ID for log tracing.

error_code ErrorCode

Machine-readable 4-digit error code.

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ApiResponse pydantic-model

Bases: BaseModel

Standard API response envelope.

Attributes:

Name Type Description
data T | None

Response payload (None on error).

error str | None

Operator-facing error message (None on success). Must be either a static, generic, non-secret-bearing string or the output of safe_error_description(exc) (from synthorg.observability import safe_error_description); never raw str(exc) because the value is serialised over HTTP and can leak credential material from httpx.HTTPStatusError / psycopg.Error / OAuth provider exception messages otherwise.

error_detail ErrorDetail | None

Structured error metadata (None on success).

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

  • data (T | None)
  • error (str | None)
  • error_detail (ErrorDetail | None)

Validators:

  • _validate_error_detail_consistency

success property

success

Whether the request succeeded (derived from error).

Returns:

Type Description
bool

True or False reflecting the condition.

PaginationMeta pydantic-model

Bases: BaseModel

Pagination metadata for list responses.

Cursor-based: clients receive an opaque next_cursor and walk forward until has_more is False.

Attributes:

Name Type Description
limit int

Maximum items per page.

next_cursor str | None

Opaque cursor for the next page (None on the final page).

has_more bool

Whether more items follow the current page.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_cursor_consistency

limit pydantic-field

limit

Maximum items per page

next_cursor pydantic-field

next_cursor = None

Opaque cursor for the next page (null on final page)

has_more pydantic-field

has_more = False

Whether more items follow the current page

PaginatedResponse pydantic-model

Bases: BaseModel

Paginated API response envelope.

Attributes:

Name Type Description
data tuple[T, ...]

Page of items.

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

pagination PaginationMeta

Pagination metadata.

degraded_sources tuple[NotBlankStr, ...]

Data sources that failed gracefully, resulting in partial data. Empty when all sources responded normally.

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_error_detail_consistency

degraded_sources pydantic-field

degraded_sources = ()

Data sources that failed gracefully (partial data)

success property

success

Whether the request succeeded (derived from error).

Returns:

Type Description
bool

True or False reflecting the condition.

CreateArtifactRequest pydantic-model

Bases: BaseModel

Payload for creating a new artifact.

Attributes:

Name Type Description
type ArtifactType

Artifact type (code, tests, documentation).

path NotBlankStr

Logical file/directory path of the artifact.

task_id NotBlankStr

ID of the originating task.

created_by NotBlankStr

Agent ID of the creator.

description str

Human-readable description.

content_type str

MIME content type (empty if no content stored).

project_id NotBlankStr | None

Optional project ID to link the artifact to.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

type pydantic-field

type

Artifact category (code, tests, documentation).

path pydantic-field

path

File path or artifact identifier within the workspace.

task_id pydantic-field

task_id

Originating task identifier.

created_by pydantic-field

created_by

Agent identifier of the artifact creator.

description pydantic-field

description = ''

Human-readable artifact description.

content_type pydantic-field

content_type = ''

MIME type of the artifact content (empty when no content is stored).

project_id pydantic-field

project_id = None

Optional project identifier to link the artifact to.

CreateProjectRequest pydantic-model

Bases: BaseModel

Payload for creating a new project.

Attributes:

Name Type Description
name NotBlankStr

Project display name.

description str

Detailed project description.

team tuple[NotBlankStr, ...]

Agent IDs assigned to the project.

lead NotBlankStr | None

Agent ID of the project lead.

deadline str | None

Optional deadline (ISO 8601 string).

budget float

Total budget in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_request

CreateTaskRequest pydantic-model

Bases: BaseModel

Payload for creating a new task.

Attributes:

Name Type Description
title NotBlankStr

Short task title.

description NotBlankStr

Detailed task description.

type TaskType

Task work type.

priority Priority

Task priority level.

project NotBlankStr

Project ID.

created_by NotBlankStr

Agent name of the creator.

assigned_to NotBlankStr | None

Optional assignee agent ID.

estimated_complexity Complexity

Complexity estimate.

budget_limit float

Maximum spend in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

UpdateTaskRequest pydantic-model

Bases: BaseModel

Payload for updating task fields.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
title NotBlankStr | None

New title.

description NotBlankStr | None

New description.

priority Priority | None

New priority.

assigned_to NotBlankStr | None

New assignee.

budget_limit float | None

New budget limit.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

TransitionTaskRequest pydantic-model

Bases: BaseModel

Payload for a task status transition.

Attributes:

Name Type Description
target_status TaskStatus

The desired target status.

assigned_to NotBlankStr | None

Optional assignee override for the transition.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

target_status pydantic-field

target_status

Desired target status

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

RegisterExperimentVariantRequest pydantic-model

Bases: BaseModel

Payload for registering an A/B experiment variant.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

variant pydantic-field

variant

Variant name within the experiment

weight pydantic-field

weight

Relative selection weight

description pydantic-field

description = ''

Operator notes

AssignExperimentRequest pydantic-model

Bases: BaseModel

Payload for requesting a deterministic variant assignment.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

subject_id pydantic-field

subject_id

Subject identifier (agent id, user id, project id, ...)

ExecuteTaskRequest pydantic-model

Bases: BaseModel

Payload for the worker-callable POST /tasks/{id}/execute endpoint.

Mirrors the TaskClaim envelope fields the worker carries so the backend's WorkerExecutionService has the same provenance the dispatcher captured when it built the claim. The endpoint only needs the status pair and the dedup key; the task body is read server-side via the task repository.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

previous_status pydantic-field

previous_status = None

Task status before the triggering transition

new_status pydantic-field

new_status

Task status that triggered the dispatch (typically 'assigned' or 'ready')

idempotency_key pydantic-field

idempotency_key

Per-dispatch idempotency key; backend dedups duplicate executions

TaskBoardSubmissionResponse pydantic-model

Bases: BaseModel

Acknowledgement envelope for POST /tasks (HTTP 202 Accepted).

The board hands the filing to the work-entry adapter; the adapter drives the pipeline spine in a detached background coroutine. The spine creates the task during its intake phase, so this response carries the correlation id rather than a task id: the board UI correlates the eventual task.created WS event by this id.

Attributes:

Name Type Description
correlation_id NotBlankStr

End-to-end trace id stamped onto the work item.

title NotBlankStr

Title submitted by the user (echoed for UX confirmation).

project NotBlankStr

Project the task was filed into.

status Literal['submitted']

Always "submitted" at this point; included so the UI can switch off a single enum field rather than inferring from HTTP status.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

correlation_id pydantic-field

correlation_id

End-to-end trace id stamped onto the work item

title pydantic-field

title

Title submitted by the user

project pydantic-field

project

Project the task was filed into

status pydantic-field

status = 'submitted'

Always 'submitted' for the 202 ack

CancelTaskRequest pydantic-model

Bases: BaseModel

Payload for cancelling a task.

Attributes:

Name Type Description
reason NotBlankStr

Reason for cancellation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

reason pydantic-field

reason

Reason for cancellation

CreateApprovalRequest pydantic-model

Bases: BaseModel

Payload for creating a new approval item.

Attributes:

Name Type Description
action_type NotBlankStr

Kind of action requiring approval (category:action format).

title NotBlankStr

Short summary.

description NotBlankStr

Detailed explanation.

risk_level ApprovalRiskLevel

Assessed risk level.

ttl_seconds int | None

Optional time-to-live in seconds (min 60, max 604 800 = 7 days).

task_id NotBlankStr | None

Optional associated task.

metadata dict[str, str]

Additional key-value pairs.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_action_type_formataction_type
  • _validate_metadata_bounds

action_type pydantic-field

action_type

Kind of action requiring approval in category:action format.

title pydantic-field

title

Short human-readable summary of the approval.

description pydantic-field

description

Detailed explanation of the action and why it requires approval.

risk_level pydantic-field

risk_level

Assessed risk level for the action.

ttl_seconds pydantic-field

ttl_seconds = None

Optional time-to-live in seconds before the approval auto-expires (minimum 60, maximum 604800 = 7 days).

task_id pydantic-field

task_id = None

Optional associated task identifier.

ApproveRequest pydantic-model

Bases: BaseModel

Payload for approving an approval item.

Attributes:

Name Type Description
comment NotBlankStr | None

Optional comment explaining the approval.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

RejectRequest pydantic-model

Bases: BaseModel

Payload for rejecting an approval item.

Attributes:

Name Type Description
reason NotBlankStr

Mandatory reason for rejection.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

CoordinateTaskRequest pydantic-model

Bases: BaseModel

Payload for triggering multi-agent coordination on a task.

Attributes:

Name Type Description
agent_names tuple[NotBlankStr, ...] | None

Agent names to coordinate with (None = all active). When provided, must be non-empty and unique.

max_subtasks int

Maximum subtasks for decomposition.

max_concurrency_per_wave int | None

Override for max concurrency per wave.

fail_fast bool | None

Override for fail-fast behaviour (None = use section config default).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

  • agent_names (tuple[NotBlankStr, ...] | None)
  • max_subtasks (int)
  • max_concurrency_per_wave (int | None)
  • fail_fast (bool | None)

Validators:

  • _validate_unique_agent_names

agent_names pydantic-field

agent_names = None

Agent names to coordinate with (None = all active)

CoordinationPhaseResponse pydantic-model

Bases: BaseModel

Response model for a single coordination phase.

Attributes:

Name Type Description
phase NotBlankStr

Phase name.

success bool

Whether the phase completed successfully.

duration_seconds float

Wall-clock duration of the phase.

error NotBlankStr | None

Error description if the phase failed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_success_error_consistency

CoordinationResultResponse pydantic-model

Bases: BaseModel

Response model for a complete coordination run.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

topology NotBlankStr

Resolved coordination topology.

total_duration_seconds float

Total wall-clock duration.

total_cost float

Total cost across all waves.

phases tuple[CoordinationPhaseResponse, ...]

Phase results in execution order.

wave_count int

Number of execution waves.

is_success bool

Whether all phases succeeded (computed).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

currency pydantic-field

currency = DEFAULT_CURRENCY

ISO 4217 currency code

is_success property

is_success

True when every phase completed successfully.

Returns:

Type Description
bool

True or False reflecting the condition.

RollbackAgentIdentityRequest pydantic-model

Bases: BaseModel

Request body for rolling back an agent identity to a previous version.

Attributes:

Name Type Description
target_version int

Snapshot version number to restore content from (monotonic counter in the agent_identity_versions table).

reason NotBlankStr | None

Optional human-readable justification recorded alongside the evolution event for audit purposes.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

target_version pydantic-field

target_version

Snapshot version to rollback to

reason pydantic-field

reason = None

Optional rollback justification for the audit trail

Errors

The error taxonomy and exception classes live in synthorg.core:

  • synthorg.core.error_taxonomy -- ErrorCategory, ErrorCode, RFC 9457 helpers
  • synthorg.core.domain_errors -- DomainError base + concrete subclasses (NotFoundError, ConflictError, ValidationError, ...)
  • synthorg.core.persistence_errors -- PersistenceError hierarchy

Guards

guards

Route guards for access control.

Guards read the authenticated user identity from connection.user (populated by the auth middleware) and check role-based permissions.

The require_roles factory creates guards for arbitrary role sets. Pre-built constants cover common patterns::

require_ceo              -- CEO only
require_ceo_or_manager   -- CEO or Manager
require_approval_roles   -- CEO, Manager, or Board Member

require_ceo module-attribute

require_ceo = require_roles(CEO)

Guard allowing only the CEO role.

require_ceo_or_manager module-attribute

require_ceo_or_manager = require_roles(CEO, MANAGER)

Guard allowing CEO or Manager roles.

require_approval_roles module-attribute

require_approval_roles = require_roles(CEO, MANAGER, BOARD_MEMBER)

Guard allowing roles that can approve or reject actions.

has_write_role

has_write_role(role)

Return True if the role grants write access.

Use this for inline role checks instead of importing _WRITE_ROLES directly. The write set includes CEO, Manager, and Pair Programmer.

Returns:

Type Description
bool

True or False reflecting the condition.

Source code in src/synthorg/api/guards.py
def has_write_role(role: HumanRole) -> bool:
    """Return True if the role grants write access.

    Use this for inline role checks instead of importing ``_WRITE_ROLES``
    directly.  The write set includes CEO, Manager, and Pair Programmer.

    Returns:
        ``True`` or ``False`` reflecting the condition.
    """
    return role in _WRITE_ROLES

require_write_access

require_write_access(connection, _)

Guard that allows only write-capable human roles.

Checks connection.user.role for ceo, manager, or pair_programmer. Board members are excluded (they may only observe and approve). The system role is intentionally excluded -- use require_roles() with the desired roles for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_write_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows only write-capable human roles.

    Checks ``connection.user.role`` for ``ceo``, ``manager``,
    or ``pair_programmer``.  Board members are excluded (they
    may only observe and approve).  The ``system`` role is
    intentionally excluded -- use ``require_roles()`` with the
    desired roles for endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _WRITE_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_write_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Write access denied")

require_read_access

require_read_access(connection, _)

Guard that allows all human roles (excludes SYSTEM).

Checks connection.user.role for any human role including observer and board_member. The internal system role is excluded -- use require_roles() for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_read_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows all human roles (excludes SYSTEM).

    Checks ``connection.user.role`` for any human role
    including ``observer`` and ``board_member``.  The internal
    ``system`` role is excluded -- use ``require_roles()`` for
    endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _READ_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_read_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Read access denied")

require_roles

require_roles(*roles)

Create a guard that allows only the specified roles.

Parameters:

Name Type Description Default
*roles HumanRole

One or more HumanRole members to permit.

()

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
ValueError

If no roles are provided.

Source code in src/synthorg/api/guards.py
def require_roles(
    *roles: HumanRole,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Create a guard that allows only the specified roles.

    Args:
        *roles: One or more ``HumanRole`` members to permit.

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        ValueError: If no roles are provided.
    """
    if not roles:
        msg = "require_roles() requires at least one role"
        raise ValueError(msg)

    allowed = frozenset(roles)
    label = ",".join(sorted(r.value for r in allowed))

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        """Handle guard.

        Raises:
            PermissionDeniedException: Raised on the corresponding failure path.
        """
        role = _get_role(connection)
        if role not in allowed:
            logger.warning(
                API_GUARD_DENIED,
                guard=f"require_roles({label})",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Access denied")

    guard.__name__ = f"require_roles({label})"
    guard.__qualname__ = f"require_roles({label})"
    return guard

require_org_mutation

require_org_mutation(department_param=None)

Guard factory for org config mutations.

Access is granted if the user has one of:

  • OrgRole.OWNER -- always allowed
  • OrgRole.EDITOR -- always allowed
  • OrgRole.DEPARTMENT_ADMIN -- allowed only when the target department (read from the path parameter named department_param) is in the user's scoped_departments

If the user has no org_roles (empty tuple), falls back to the existing HumanRole write-access check so legacy installations without organisation-level roles still resolve.

Parameters:

Name Type Description Default
department_param str | None

Path parameter name containing the target department (e.g. "name"). None skips department scope checking (company-level endpoints).

None

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
PermissionDeniedException

Raised on the corresponding failure path.

Source code in src/synthorg/api/guards.py
def require_org_mutation(
    department_param: str | None = None,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Guard factory for org config mutations.

    Access is granted if the user has one of:

    - ``OrgRole.OWNER`` -- always allowed
    - ``OrgRole.EDITOR`` -- always allowed
    - ``OrgRole.DEPARTMENT_ADMIN`` -- allowed only when the
      target department (read from the path parameter named
      *department_param*) is in the user's ``scoped_departments``

    If the user has no ``org_roles`` (empty tuple), falls back to
    the existing ``HumanRole`` write-access check so legacy
    installations without organisation-level roles still resolve.

    Args:
        department_param: Path parameter name containing the target
            department (e.g. ``"name"``).  ``None`` skips department
            scope checking (company-level endpoints).

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        PermissionDeniedException: Raised on the corresponding failure path.
    """

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        """Handle guard.

        Raises:
            PermissionDeniedException: Raised on the corresponding failure path.
        """
        org_roles = _get_org_roles(connection)

        # Backward compat: if no org_roles set, fall back to HumanRole
        if not org_roles:
            role = _get_role(connection)
            if role in _WRITE_ROLES:
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(fallback)",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Write access denied")

        # Owner and editor always allowed
        if _ORG_ROLE_OWNER in org_roles or _ORG_ROLE_EDITOR in org_roles:
            return

        # Department admin: check scope
        if _ORG_ROLE_DEPARTMENT_ADMIN in org_roles:
            if department_param is None:
                # Company-level endpoint -- dept_admin cannot modify
                logger.warning(
                    API_GUARD_DENIED,
                    guard="require_org_mutation(dept_admin_no_scope)",
                    path=str(connection.url.path),
                )
                raise PermissionDeniedException(
                    detail="Department admins cannot modify company-level settings",
                )
            target_dept = connection.path_params.get(department_param, "")
            scoped = _get_scoped_departments(connection)
            if target_dept.lower() in (d.lower() for d in scoped):
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(dept_admin_out_of_scope)",
                target_department=target_dept,
                scoped_departments=scoped,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(
                detail=f"Department admin access denied for {target_dept!r}",
            )

        # Viewer or unrecognised role
        logger.warning(
            API_GUARD_DENIED,
            guard="require_org_mutation(insufficient_org_role)",
            org_roles=org_roles,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Org mutation access denied")

    guard.__name__ = "require_org_mutation"
    guard.__qualname__ = "require_org_mutation"
    return guard

Middleware

middleware

Request middleware and before-send hooks.

Provides ASGI middleware for request logging, and a before_send hook that injects security headers (CSP, CORP, HSTS, Cache-Control, etc.) into every HTTP response -- including exception-handler and unmatched-route (404/405) responses.

Why before_send instead of ASGI middleware? Litestar's before_send hook wraps the ASGI send callback at the outermost layer (before the middleware stack), so it fires for all responses. By contrast, user-defined ASGI middleware only runs for matched routes -- 404 and 405 responses from the router bypass it.

RequestLoggingMiddleware

RequestLoggingMiddleware(app)

ASGI middleware that logs request start and completion.

Uses time.perf_counter() for high-resolution duration measurement. Only logs HTTP requests (non-HTTP scopes like WebSocket and lifespan are passed through without logging).

Each HTTP request is also wrapped in an OpenTelemetry span (http.request) carrying OTel-semconv attributes (http.request.method, http.route, http.response.status_code) plus the synthorg.correlation_id so distributed traces line up with the structured-log stream. When no tracer provider is configured (default), get_tracer returns a no-op tracer and the span is essentially free.

Source code in src/synthorg/api/middleware.py
def __init__(self, app: ASGIApp) -> None:
    self.app = app

__call__ async

__call__(scope, receive, send)

Process an ASGI request, logging start and completion.

Source code in src/synthorg/api/middleware.py
async def __call__(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
) -> None:
    """Process an ASGI request, logging start and completion."""
    if scope["type"] != ScopeType.HTTP:
        await self.app(scope, receive, send)
        return

    request: Request[Any, Any, Any] = Request(scope)
    method = request.method
    path = str(request.url.path)

    correlation_id = generate_correlation_id()
    bind_correlation_id(request_id=correlation_id)
    _log_request_started(method, path)
    start = time.perf_counter()

    status_code: int | None = None
    original_send = send

    async def capture_send(message: Any) -> None:
        """Run capture send."""
        nonlocal status_code
        if (
            isinstance(message, dict)
            and message.get("type") == "http.response.start"
        ):
            raw_status = message.get("status")
            if raw_status is None:
                logger.warning(
                    API_ASGI_MISSING_STATUS,
                    type=message.get("type"),
                )
                status_code = 500
            else:
                status_code = raw_status
        await original_send(message)  # pyright: ignore[reportArgumentType]

    with _tracer.start_as_current_span(
        "http.request",
        record_exception=False,
        set_status_on_exception=False,
    ) as span:
        span.set_attribute("http.request.method", method)
        span.set_attribute("synthorg.correlation_id", correlation_id)
        try:
            await self.app(scope, receive, capture_send)
        except Exception as exc:
            reraise_critical(exc)
            # OTel's ``record_exception`` would serialise the full
            # traceback (including frame locals) into the span,
            # bypassing the structlog secret-log redaction the
            # rest of the codebase relies on. To keep the OTLP
            # transport on the same redaction posture as the
            # structlog sink, set OTel-semconv exception
            # attributes directly using the scrubbed description
            # and skip the traceback emission. See
            # ``docs/reference/sec-prompt-safety.md`` for the
            # transport-level redaction policy.
            span.set_attribute("exception.type", type(exc).__name__)
            span.set_attribute(
                "exception.message",
                safe_error_description(exc),
            )
            span.set_status(Status(StatusCode.ERROR, type(exc).__name__))
            raise
        finally:
            span.set_attribute("http.route", _resolve_route_template(scope))
            if status_code is not None:
                span.set_attribute("http.response.status_code", status_code)
                if status_code >= 500:  # noqa: PLR2004
                    span.set_status(Status(StatusCode.ERROR))
            elapsed_sec = time.perf_counter() - start
            duration_ms = round(elapsed_sec * 1000, 2)
            _log_request_completion(method, path, status_code, duration_ms)
            _record_request_metric(scope, method, status_code, elapsed_sec)
            clear_correlation_ids()

build_docs_csp

build_docs_csp(origins)

Build the relaxed Scalar UI CSP from a list of trusted origins.

Origins are applied uniformly to script-src, style-src, img-src, font-src and connect-src so operators can swap the public Scalar hosts for an internally-mirrored CDN with a single configuration change.

An empty origins list raises ValueError rather than emit a malformed CSP with trailing whitespace before each ;. CSP parsers tolerate the trailing space but operators reading the header back would see an obviously broken policy; the ApiBridgeConfig validator is the right place to enforce non-empty (currently only validates pattern), so callers pass through the bridge-config-validated tuple.

Parameters:

Name Type Description Default
origins Sequence[str]

Origin URLs that Scalar UI assets and proxy requests may target. Must be non-empty. Each entry must already be a valid origin (scheme + host); ApiBridgeConfig performs the per-entry validation.

required

Returns:

Type Description
str

A CSP header value safe to assign to

str

Content-Security-Policy for /docs/ responses.

Raises:

Type Description
ValueError

If origins is empty.

Source code in src/synthorg/api/middleware.py
def build_docs_csp(origins: Sequence[str]) -> str:
    """Build the relaxed Scalar UI CSP from a list of trusted origins.

    Origins are applied uniformly to ``script-src``, ``style-src``,
    ``img-src``, ``font-src`` and ``connect-src`` so operators can
    swap the public Scalar hosts for an internally-mirrored CDN with
    a single configuration change.

    An empty *origins* list raises ``ValueError`` rather than emit a
    malformed CSP with trailing whitespace before each ``;``. CSP
    parsers tolerate the trailing space but operators reading the
    header back would see an obviously broken policy; the
    ``ApiBridgeConfig`` validator is the right place to enforce
    non-empty (currently only validates pattern), so callers pass
    through the bridge-config-validated tuple.

    Args:
        origins: Origin URLs that Scalar UI assets and proxy requests
            may target. Must be non-empty. Each entry must already be
            a valid origin (scheme + host); ``ApiBridgeConfig``
            performs the per-entry validation.

    Returns:
        A CSP header value safe to assign to
        ``Content-Security-Policy`` for ``/docs/`` responses.

    Raises:
        ValueError: If *origins* is empty.
    """
    if not origins:
        msg = "build_docs_csp requires at least one trusted origin"
        raise ValueError(msg)
    joined = " ".join(origins)
    return (
        f"default-src 'self'; "
        f"script-src 'self' 'unsafe-inline' {joined}; "
        f"style-src 'self' 'unsafe-inline' {joined}; "
        f"img-src 'self' data: {joined}; "
        f"font-src 'self' data: {joined}; "
        f"connect-src 'self' {joined}; "
        f"object-src 'none'; "
        f"base-uri 'self'; "
        f"frame-ancestors 'none'"
    )

set_docs_csp_origins

set_docs_csp_origins(origins)

Replace the docs CSP value with one built from origins.

Called once at app startup after resolving api.csp_docs_external_origins through the settings service. Reset to the default list with _DOCS_CSP_DEFAULT_ORIGINS for test isolation.

Calling this outside startup creates a brief eventual-consistency window for in-flight HTTP responses, since the docs before_send hook reads the global at request time. The api.csp_docs_external_origins setting is marked restart_required=True precisely to keep this single-writer.

Source code in src/synthorg/api/middleware.py
def set_docs_csp_origins(origins: Sequence[str]) -> None:
    """Replace the docs CSP value with one built from *origins*.

    Called once at app startup after resolving
    ``api.csp_docs_external_origins`` through the settings service.
    Reset to the default list with ``_DOCS_CSP_DEFAULT_ORIGINS`` for
    test isolation.

    Calling this outside startup creates a brief eventual-consistency
    window for in-flight HTTP responses, since the docs ``before_send``
    hook reads the global at request time. The
    ``api.csp_docs_external_origins`` setting is marked
    ``restart_required=True`` precisely to keep this single-writer.
    """
    global _DOCS_CSP  # noqa: PLW0603 -- single-writer startup hook; tests reset via the same setter
    _DOCS_CSP = build_docs_csp(origins)
    logger.info(
        SETTINGS_VALUE_RESOLVED,
        namespace="api",
        key="csp_docs_external_origins",
        origins_count=len(origins),
    )

security_headers_hook async

security_headers_hook(message, scope)

Inject security headers into every HTTP response.

Registered as a Litestar before_send hook so it fires for all HTTP responses -- successful, exception-handler, and router-level 404/405.

Adds static security headers (CORP, HSTS, X-Content-Type-Options, etc.) and path-aware Content-Security-Policy (strict for API, relaxed for /docs/ to allow Scalar UI resources) and Cache-Control (no-store for API, public, max-age=300 for /docs/ since it serves public, non-user-specific content).

Uses __setitem__ (not add) so that if any handler or middleware already set a header, the known-good value overwrites it rather than creating a duplicate.

Parameters:

Name Type Description Default
message Message

ASGI message dict (only http.response.start is processed).

required
scope Scope

ASGI connection scope.

required
Source code in src/synthorg/api/middleware.py
async def security_headers_hook(message: Message, scope: Scope) -> None:
    """Inject security headers into every HTTP response.

    Registered as a Litestar ``before_send`` hook so it fires for
    **all** HTTP responses -- successful, exception-handler, and
    router-level 404/405.

    Adds static security headers (CORP, HSTS, X-Content-Type-Options,
    etc.) and path-aware Content-Security-Policy (strict for API,
    relaxed for ``/docs/`` to allow Scalar UI resources) and
    Cache-Control (``no-store`` for API, ``public, max-age=300``
    for ``/docs/`` since it serves public, non-user-specific content).

    Uses ``__setitem__`` (not ``add``) so that if any handler or
    middleware already set a header, the known-good value overwrites
    it rather than creating a duplicate.

    Args:
        message: ASGI message dict (only ``http.response.start``
            is processed).
        scope: ASGI connection scope.
    """
    if scope.get("type") != ScopeType.HTTP:
        return
    if message.get("type") != "http.response.start":
        return

    headers = MutableScopeHeaders.from_message(message)

    # Static security headers -- overwrite to prevent duplicates.
    for name, value in _SECURITY_HEADERS.items():
        headers[name] = value

    # Path-aware headers
    path: str = scope.get("path", "")
    is_docs = path == "/docs" or path.startswith("/docs/")
    headers["Content-Security-Policy"] = _DOCS_CSP if is_docs else _API_CSP

    # Relax COOP for /docs -- Scalar UI may open cross-origin popups
    # for OAuth/API proxy features via proxy.scalar.com.
    # same-origin-allow-popups: allows the page to open popups but
    # blocks cross-origin pages from retaining an opener reference,
    # preventing XS-Leak side-channel attacks via window.opener.
    # Allow brief caching for docs -- public, non-user-specific content.
    if is_docs:
        headers["Cross-Origin-Opener-Policy"] = "same-origin-allow-popups"
        headers["Cache-Control"] = _DOCS_CACHE_CONTROL
        # Defense-in-depth: even if an upstream layer set Pragma we
        # actively clear it on /docs so the no-cache hint can never
        # leak onto cacheable assets.
        with suppress(KeyError):
            del headers["Pragma"]
    else:
        headers["Pragma"] = _API_PRAGMA

Pagination

pagination

Cursor-based pagination helpers.

In-memory helper :func:paginate_cursor slices a tuple and produces a signed cursor so controllers backed by in-memory collections (config lists, bus channel names, approval-store filtered views) can return the same envelope shape as repo-backed endpoints.

The cursor layer is opaque offset encoding today. Repositories that need seek-based paging (append-only tables) decode the opaque cursor into a composite (created_at, id) seek tuple internally -- the wire format stays the same.

CursorLimit module-attribute

CursorLimit = Annotated[
    int,
    QueryParameter(
        ge=1,
        le=MAX_LIMIT,
        description=f"Page size (default {DEFAULT_LIMIT}, max {MAX_LIMIT})",
    ),
]

Query-parameter type for the page size (1-MAX_LIMIT).

HTTP-boundary only: the bounds are enforced by Litestar's QueryParameter metadata at request parsing. Do not reuse this alias for in-process validation, where the constraint would silently not apply.

CursorParam module-attribute

CursorParam = Annotated[
    str | None,
    QueryParameter(
        max_length=512,
        description="Opaque pagination cursor returned by the previous page",
    ),
]

Query-parameter type for the opaque cursor (max 512 chars).

HTTP-boundary only: the max_length is enforced by Litestar's QueryParameter metadata at request parsing, not by the type itself. Do not reuse this alias for in-process validation.

InvalidCursorError

InvalidCursorError(message=None)

Bases: ValidationError

Raised when a cursor token is malformed, tampered, or unsigned.

Renders as HTTP 422 Unprocessable Entity with a structured ErrorDetail (error_category=validation, error_code=VALIDATION_ERROR) via the centralised RFC 9457 dispatch.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

cursor_secret_of

cursor_secret_of(app_state)

Return the wired pagination cursor secret, or raise 503.

The opaque-pagination HMAC secret lives on the api-core state slice (wired once by create_app). Pagination call sites read it through this accessor and pass it as secret= to the cursor encode / decode helpers, so the slice lookup is centralised here rather than repeated at every controller.

Parameters:

Name Type Description Default
app_state AppStateSliceMixin

The application state (any slice-reader).

required

Returns:

Type Description
CursorSecret

The wired cursor secret.

Raises:

Type Description
ServiceUnavailableError

When the secret is not yet wired.

Source code in src/synthorg/api/pagination.py
def cursor_secret_of(app_state: AppStateSliceMixin) -> CursorSecret:
    """Return the wired pagination cursor secret, or raise 503.

    The opaque-pagination HMAC secret lives on the api-core state slice
    (wired once by ``create_app``). Pagination call sites read it
    through this accessor and pass it as ``secret=`` to the cursor
    encode / decode helpers, so the slice lookup is centralised here
    rather than repeated at every controller.

    Args:
        app_state: The application state (any slice-reader).

    Returns:
        The wired cursor secret.

    Raises:
        ServiceUnavailableError: When the secret is not yet wired.
    """
    return require_service(
        app_state.slice(ApiCoreStateSlice).cursor_secret, "Cursor Secret"
    )

paginate_cursor

paginate_cursor(items, *, limit, cursor, secret)

Slice a tuple and produce cursor-based pagination metadata.

Clamps limit to [1, MAX_LIMIT]. A missing cursor starts at offset 0. Invalid / tampered cursors raise :class:InvalidCursorError which controllers should surface as HTTP 400.

Parameters:

Name Type Description Default
items tuple[T, ...]

Full collection to paginate (must be already ordered).

required
limit int

Maximum items to return on this page.

required
cursor str | None

Opaque cursor from the previous page, or None for the first page.

required
secret CursorSecret

HMAC secret used to sign / verify cursors.

required

Returns:

Type Description
tuple[tuple[T, ...], PaginationMeta]

Tuple of (page_items, pagination_meta).

Raises:

Type Description
InvalidCursorError

If cursor is malformed, tampered, or signed by a different secret.

Source code in src/synthorg/api/pagination.py
def paginate_cursor[T](
    items: tuple[T, ...],
    *,
    limit: int,
    cursor: str | None,
    secret: CursorSecret,
) -> tuple[tuple[T, ...], PaginationMeta]:
    """Slice a tuple and produce cursor-based pagination metadata.

    Clamps ``limit`` to ``[1, MAX_LIMIT]``. A missing cursor starts at
    offset 0. Invalid / tampered cursors raise :class:`InvalidCursorError`
    which controllers should surface as HTTP 400.

    Args:
        items: Full collection to paginate (must be already ordered).
        limit: Maximum items to return on this page.
        cursor: Opaque cursor from the previous page, or ``None`` for
            the first page.
        secret: HMAC secret used to sign / verify cursors.

    Returns:
        Tuple of (page_items, pagination_meta).

    Raises:
        InvalidCursorError: If ``cursor`` is malformed, tampered, or
            signed by a different secret.
    """
    if cursor is None:
        offset = 0
    else:
        try:
            offset = decode_cursor(cursor, secret=secret)
        except InvalidCursorError:
            # Malformed / tampered / foreign-secret cursors raise here;
            # log before re-raising so 400s from decode failures are
            # observable in production alongside the truncation branch
            # below.  The cursor itself is NOT logged -- it's attacker-
            # controlled input and may carry secret fragments from
            # tampering attempts.
            logger.warning(
                API_CURSOR_INVALID,
                reason="cursor_decode_failed",
            )
            raise
    effective_limit = max(1, min(limit, MAX_LIMIT))
    # Out-of-bounds cursors are rejected explicitly.  The cursor is
    # HMAC-signed so a client cannot forge one past the true end;
    # reaching this branch means the collection shrunk between
    # issuing the cursor and walking it (e.g. deletions) -- returning
    # an empty page would silently hide the truncation from callers
    # that rely on ``has_more`` progressing consistently.  The
    # comparison is ``>=`` because ``has_more`` is False whenever
    # ``next_offset == len(items)``, so no valid cursor is ever issued
    # pointing exactly at the collection end -- reaching that position
    # is the unambiguous truncation signal.
    if offset and offset >= len(items):
        # Truncation is an operator-visible event: the collection
        # shrank between cursor issuance and replay, and silently
        # returning an empty page would hide that from monitoring.
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
            collection_length=len(items),
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    page = items[offset : offset + effective_limit]
    next_offset = offset + effective_limit
    has_more = next_offset < len(items)
    next_cursor = encode_cursor(next_offset, secret=secret) if has_more else None
    meta = PaginationMeta(
        limit=effective_limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )
    return page, meta

encode_repo_seek_meta

encode_repo_seek_meta(
    *, offset, page_len, total, limit, secret, reject_stale_cursor=True
)

Build PaginationMeta for controllers that push limit+offset into the repo.

Centralizes the has_more snapshot-drift guard so the next pagination bug cannot regress across every version-history controller one at a time. An empty or short page (page_len == 0 or offset + page_len == offset) cannot advance the cursor past the current offset, so the guard refuses to emit a cursor that would loop the client on the same page when count_versions disagrees with list_versions.

Parameters:

Name Type Description Default
offset int

The decoded cursor offset the current page started at.

required
page_len int

The number of repo rows consumed (len(repo_rows), not the filtered slice) -- the cursor must advance by consumed rows so filtered pages do not replay already-read rows on the next request.

required
total int

The repo's reported total row count. Drives the has_more check.

required
limit int

The page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required
reject_stale_cursor bool

When True (the default), a decoded offset == total raises :class:InvalidCursorError (mirrors the paginate_cursor helper). Set to False only when the caller genuinely tolerates a cursor landing exactly on the current end of an append-only repo. offset > total is ALWAYS rejected regardless of this flag -- a cursor past the repo end is never legitimate (the HMAC signature would have come from a larger snapshot) and silently returning a terminal page would hide the truncation from monitoring.

True

Returns:

Type Description
PaginationMeta

PaginationMeta with the has_more / next_cursor

PaginationMeta

fields filled in, safe to wrap in PaginatedResponse.

Raises:

Type Description
InvalidCursorError

When the cursor's decoded offset is past the repo end. offset > total always raises; offset == total (with offset > 0) raises unless reject_stale_cursor=False.

Source code in src/synthorg/api/pagination.py
def encode_repo_seek_meta(  # noqa: PLR0913 -- every arg tracks a distinct pagination input
    *,
    offset: int,
    page_len: int,
    total: int,
    limit: int,
    secret: CursorSecret,
    reject_stale_cursor: bool = True,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for controllers that push limit+offset into the repo.

    Centralizes the ``has_more`` snapshot-drift guard so the next
    pagination bug cannot regress across every version-history
    controller one at a time.  An empty or short page (``page_len ==
    0`` or ``offset + page_len == offset``) cannot advance the cursor
    past the current offset, so the guard refuses to emit a cursor
    that would loop the client on the same page when
    ``count_versions`` disagrees with ``list_versions``.

    Args:
        offset: The decoded cursor offset the current page started at.
        page_len: The number of repo rows consumed (``len(repo_rows)``,
            *not* the filtered slice) -- the cursor must advance by
            consumed rows so filtered pages do not replay already-read
            rows on the next request.
        total: The repo's reported total row count.  Drives the
            ``has_more`` check.
        limit: The page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.
        reject_stale_cursor: When ``True`` (the default), a decoded
            ``offset == total`` raises :class:`InvalidCursorError`
            (mirrors the ``paginate_cursor`` helper).  Set to
            ``False`` only when the caller genuinely tolerates a
            cursor landing exactly on the current end of an
            append-only repo.  ``offset > total`` is ALWAYS rejected
            regardless of this flag -- a cursor past the repo end is
            never legitimate (the HMAC signature would have come from
            a larger snapshot) and silently returning a terminal
            page would hide the truncation from monitoring.

    Returns:
        ``PaginationMeta`` with the ``has_more`` / ``next_cursor``
        fields filled in, safe to wrap in ``PaginatedResponse``.

    Raises:
        InvalidCursorError: When the cursor's decoded offset is past
            the repo end.  ``offset > total`` always raises;
            ``offset == total`` (with offset > 0) raises unless
            ``reject_stale_cursor=False``.
    """
    # Out-of-bounds cursors signal the repo shrank between cursor
    # issuance and replay (deletions, filters).  Silently reporting
    # ``has_more=False`` would hide the truncation from monitoring
    # and strand clients on an empty page they cannot recover from;
    # raise so callers surface the state change as HTTP 400.  Split
    # the boundary (``offset == total``) from the past-end case
    # (``offset > total``) so ``reject_stale_cursor=False`` can relax
    # the boundary alone without opening a loophole for clearly
    # invalid cursors.
    if offset and offset > total:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
            total=total,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    if reject_stale_cursor and offset and offset == total:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_at_end",
            offset=offset,
            total=total,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    next_offset = offset + page_len
    has_more = page_len > 0 and next_offset > offset and next_offset < total
    next_cursor = encode_cursor(next_offset, secret=secret) if has_more else None
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

encode_countless_seek_meta

encode_countless_seek_meta(*, offset, fetched_rows, limit, secret)

Build PaginationMeta for repos that skip the COUNT(*) round-trip.

Counterpart to :func:encode_repo_seek_meta for endpoints that use the fetch limit+1, detect overflow pattern instead of issuing a separate count query. The caller fetches up to limit + 1 rows from the backing store; this helper uses the overflow to drive has_more and ensures PaginationMeta.total stays None so clients know the count is unknown (and must derive display counts from data.length per the frontend contract in web/CLAUDE.md).

Parameters:

Name Type Description Default
offset int

The decoded cursor offset the current page started at.

required
fetched_rows int

The number of rows the repo returned when asked for limit + 1 (cap inclusive; the caller is responsible for slicing the excess before handing to PaginatedResponse).

required
limit int

The page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required

Returns:

Type Description
PaginationMeta

PaginationMeta with total=None and the

PaginationMeta

has_more / next_cursor fields derived from overflow.

Raises:

Type Description
InvalidCursorError

When offset > 0 and fetched_rows == 0. Under the limit + 1 contract a server-issued cursor always points at a row that existed when the previous page responded, so an empty follow-up page signals truncation (rows disappeared between requests); silently returning a terminal page would hide that from monitoring.

Source code in src/synthorg/api/pagination.py
def encode_countless_seek_meta(
    *,
    offset: int,
    fetched_rows: int,
    limit: int,
    secret: CursorSecret,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for repos that skip the COUNT(*) round-trip.

    Counterpart to :func:`encode_repo_seek_meta` for endpoints that
    use the ``fetch limit+1, detect overflow`` pattern instead of
    issuing a separate count query.  The caller fetches up to
    ``limit + 1`` rows from the backing store; this helper uses the
    overflow to drive ``has_more`` and ensures ``PaginationMeta.total``
    stays ``None`` so clients know the count is unknown (and must
    derive display counts from ``data.length`` per the frontend
    contract in ``web/CLAUDE.md``).

    Args:
        offset: The decoded cursor offset the current page started at.
        fetched_rows: The number of rows the repo returned when asked
            for ``limit + 1`` (cap inclusive; the caller is
            responsible for slicing the excess before handing to
            ``PaginatedResponse``).
        limit: The page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.

    Returns:
        ``PaginationMeta`` with ``total=None`` and the
        ``has_more`` / ``next_cursor`` fields derived from overflow.

    Raises:
        InvalidCursorError: When ``offset > 0`` and
            ``fetched_rows == 0``.  Under the ``limit + 1`` contract
            a server-issued cursor always points at a row that
            existed when the previous page responded, so an empty
            follow-up page signals truncation (rows disappeared
            between requests); silently returning a terminal page
            would hide that from monitoring.
    """
    if offset > 0 and fetched_rows == 0:
        logger.warning(
            API_CURSOR_INVALID,
            reason="cursor_past_end",
            offset=offset,
        )
        msg = "cursor points past the end of the collection"
        raise InvalidCursorError(msg)
    has_more = fetched_rows > limit
    next_cursor = encode_cursor(offset + limit, secret=secret) if has_more else None
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

encode_keyset_meta

encode_keyset_meta(*, next_after_key, has_more, limit, secret)

Build PaginationMeta for a keyset-paginated read.

Keyset pagination is stable under concurrent inserts and deletes: the cursor encodes the sort key of the last row returned, and the next page reads WHERE sort_key > after_key. Out-of-bounds cursors degrade gracefully -- a cursor pointing past the current end of the collection just returns an empty page (rather than the offset-pagination InvalidCursorError for offset > total) because keyset reads cannot tell whether a cursor is "stale" or just pointing at a row that has been deleted.

Parameters:

Name Type Description Default
next_after_key str | None

Sort key of the last row on the page that was just returned, or None when there is no next page. has_more=True requires a non-None key.

required
has_more bool

Whether the caller observed an overflow row when fetching limit + 1 rows. Drives next_cursor emission.

required
limit int

Page size requested.

required
secret CursorSecret

HMAC secret used to sign the next_cursor.

required

Returns:

Type Description
PaginationMeta

PaginationMeta ready to wrap in PaginatedResponse.

Raises:

Type Description
ValueError

If has_more=True but next_after_key is None.

Source code in src/synthorg/api/pagination.py
def encode_keyset_meta(
    *,
    next_after_key: str | None,
    has_more: bool,
    limit: int,
    secret: CursorSecret,
) -> PaginationMeta:
    """Build ``PaginationMeta`` for a keyset-paginated read.

    Keyset pagination is stable under concurrent inserts and deletes:
    the cursor encodes the sort key of the last row returned, and the
    next page reads ``WHERE sort_key > after_key``. Out-of-bounds
    cursors degrade gracefully -- a cursor pointing past the current
    end of the collection just returns an empty page (rather than the
    offset-pagination ``InvalidCursorError`` for ``offset > total``)
    because keyset reads cannot tell whether a cursor is "stale" or
    just pointing at a row that has been deleted.

    Args:
        next_after_key: Sort key of the last row on the page that was
            just returned, or ``None`` when there is no next page.
            ``has_more=True`` requires a non-``None`` key.
        has_more: Whether the caller observed an overflow row when
            fetching ``limit + 1`` rows. Drives ``next_cursor``
            emission.
        limit: Page size requested.
        secret: HMAC secret used to sign the ``next_cursor``.

    Returns:
        ``PaginationMeta`` ready to wrap in ``PaginatedResponse``.

    Raises:
        ValueError: If ``has_more=True`` but ``next_after_key`` is
            ``None``.
    """
    if has_more and not next_after_key:
        msg = "keyset pagination: has_more=True requires next_after_key"
        raise ValueError(msg)
    next_cursor = (
        encode_keyset_cursor(next_after_key, secret=secret)
        if has_more and next_after_key is not None
        else None
    )
    return PaginationMeta(
        limit=limit,
        next_cursor=next_cursor,
        has_more=has_more,
    )

WebSocket Models

ws_models

WebSocket event models for real-time feeds.

Defines event types and the WsEvent payload that is serialised to JSON and pushed to WebSocket subscribers.

WsEventType

Bases: StrEnum

Types of real-time WebSocket events.

WsEvent pydantic-model

Bases: BaseModel

A real-time event pushed over WebSocket.

Callers must not mutate the payload dict after construction; the dict is a mutable reference inside a frozen model.

Attributes:

Name Type Description
version int

Wire-protocol version. Clients MUST ignore events whose version they do not understand. Bump only when introducing a breaking change to WsEvent -- coordinate with the WS_PROTOCOL_VERSION constant in web/src/utils/constants.ts.

event_type WsEventType

Classification of the event.

channel NotBlankStr

Target channel name.

timestamp AwareDatetime

When the event occurred.

payload dict[str, object]

Event-specific data.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _deep_copy_payload
  • _validate_payload_shape

version pydantic-field

version = WS_PROTOCOL_VERSION

WS wire-protocol version (clients ignore unknown)

event_type pydantic-field

event_type

Event classification

channel pydantic-field

channel

Target channel name

timestamp pydantic-field

timestamp

When the event occurred

payload pydantic-field

payload

Event-specific data

Auth

The auth domain types (AuthConfig, User, ApiKey, AuthenticatedUser, OrgRole, HumanRole, Session, RefreshRecord) live under synthorg.core.auth; the HTTP-coupled service, middleware, and request-scoped user binding live in synthorg.api.auth.

AuthContextMiddleware (in synthorg.api.auth.context) runs immediately after ApiAuthMiddleware and binds the authenticated user into a per-asyncio-Task ContextVar, so controllers and audit helpers read the user via no-argument accessors (get_authenticated_user_id, get_authenticated_user, audit_actor_from_context) without threading a Request.

service

Authentication service -- password hashing, JWT ops, API key hashing.

SecretNotConfiguredError

SecretNotConfiguredError(message=None)

Bases: ServiceUnavailableError

Raised when the JWT secret is required but not configured.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

RefreshRotation pydantic-model

Bases: BaseModel

Result of a successful refresh-token rotation.

The controller turns this into the session/csrf/refresh cookies and emits the post-persistence SECURITY_AUTH_REFRESH_CONSUMED audit event. session_id is the original session id (the access token rotated in place), not a freshly minted one.

Config:

  • frozen: True
  • extra: forbid

Fields:

  • token (str)
  • expires_in (int)
  • session_id (str)
  • user (User)

AuthService

AuthService(config)

Immutable authentication operations.

Owns the cryptographic primitives behind login: Argon2id password hashing and verification, JWT mint and decode, HMAC-SHA256 API key hashing, secure API key generation, and refresh-token persistence through the auth-domain boundary.

Parameters:

Name Type Description Default
config AuthConfig

Authentication configuration (carries JWT secret).

required

Async vs sync. Methods follow a single rule: an operation is declared async only when it touches an event-loop boundary -- either offloading CPU-bound work via :func:asyncio.to_thread, or awaiting a repository write. Everything else stays sync.

  • :meth:hash_password_async and :meth:verify_password_async are async because Argon2id is CPU-bound (3 time-cost iterations over 64MiB of memory by default); :func:asyncio.to_thread keeps a single login from stalling every concurrent request waiting on the loop.
  • :meth:persist_refresh_token is async because it awaits a repository write through the auth-domain boundary.
  • :meth:create_token, :meth:decode_token, :meth:hash_api_key, and :meth:generate_api_key are sync: each is either pure CPU with bounded sub-millisecond cost (HMAC, secrets.token_urlsafe) or an in-process JWT codec call with no I/O.

Thread-safety. Instances are safe to share across the request-handler pool without external locking. After :meth:__init__, the only state held is _config: AuthConfig -- itself a Pydantic frozen=True model. The module-global :class:argon2.PasswordHasher is configured once at import and treated as a deployment-wide concern (Argon2 parameter selection is not per-request); the underlying argon2 and jwt libraries are stateless and thread-safe.

Out of scope. This service does not implement token revocation (the auth middleware enforces that by checking pwd_sig on every request), session storage (handled by the refresh-token repository), or SYSTEM-role token minting (rejected by :meth:create_token; SYSTEM tokens are minted by the Go CLI with :data:SYSTEM_ISSUER / :data:SYSTEM_AUDIENCE).

Source code in src/synthorg/api/auth/service.py
def __init__(self, config: AuthConfig) -> None:
    self._config = config

hash_password_async async

hash_password_async(password)

Hash a password with Argon2id off the event loop.

Argon2id is CPU-bound; asyncio.to_thread defers the work to the default thread pool so a single login request cannot stall every concurrent request waiting on the loop.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
async def hash_password_async(self, password: str) -> str:
    """Hash a password with Argon2id off the event loop.

    Argon2id is CPU-bound; ``asyncio.to_thread`` defers the work
    to the default thread pool so a single login request cannot
    stall every concurrent request waiting on the loop.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    return await asyncio.to_thread(_hasher.hash, password)

verify_password_async async

verify_password_async(password, password_hash)

Verify a password against an Argon2id hash off the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Raises:

Type Description
VerificationError

On non-mismatch verification failures (e.g. unsupported parameters).

InvalidHashError

If the stored hash is corrupted or malformed (data integrity issue).

Source code in src/synthorg/api/auth/service.py
async def verify_password_async(
    self,
    password: str,
    password_hash: str,
) -> bool:
    """Verify a password against an Argon2id hash off the event loop.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.

    Raises:
        argon2.exceptions.VerificationError: On non-mismatch
            verification failures (e.g. unsupported parameters).
        argon2.exceptions.InvalidHashError: If the stored hash
            is corrupted or malformed (data integrity issue).
    """
    try:
        return await asyncio.to_thread(_hasher.verify, password_hash, password)
    except argon2.exceptions.VerifyMismatchError:
        return False
    except argon2.exceptions.VerificationError as exc:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="hash_verification_error",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise
    except argon2.exceptions.InvalidHashError as exc:
        log_exception_redacted(
            logger, SECURITY_AUTH_FAILED, exc, reason="invalid_hash_data_corruption"
        )
        raise

create_token

create_token(user, *, session_id=None)

Create a JWT for the given human user.

The token includes a pwd_sig claim -- a 16-character truncated SHA-256 of the stored password hash. This is plain SHA-256, not HMAC -- the password hash is already a high-entropy Argon2id output, and the claim is protected by the JWT signature. The auth middleware validates this claim on every request so that tokens issued before a password change are automatically rejected.

A jti (JWT ID) claim is included for per-token session tracking and revocation.

SYSTEM-role tokens are minted by the Go CLI with :data:SYSTEM_ISSUER / :data:SYSTEM_AUDIENCE -- never by this method. Calling create_token with a SYSTEM user would mint a token bearing :data:USER_ISSUER / :data:USER_AUDIENCE, which the middleware's _resolve_jwt_user immediately rejects (per-role iss/aud enforcement). We fail-fast with ValueError here so a future caller that accidentally passes a SYSTEM user surfaces the problem at mint time, not at the next request.

The claim shape is built through :class:JwtClaims so the encode-side payload is statically typed and the decode-side boundary helper validates against the same model.

Parameters:

Name Type Description Default
user User

Authenticated human user.

required
session_id str | None

Reuse this session id (jti) instead of minting a fresh one. Refresh-token rotation passes the consumed record's session id so the access token rotates within the existing session rather than spawning a new one (which would orphan the old session and saturate max_concurrent_sessions).

None

Returns:

Type Description
tuple[str, int, str]

Tuple of (encoded JWT, expiry seconds, session ID).

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

ValueError

If user has the SYSTEM role -- mint via the CLI's system-token path instead.

Source code in src/synthorg/api/auth/service.py
def create_token(
    self,
    user: User,
    *,
    session_id: str | None = None,
) -> tuple[str, int, str]:
    """Create a JWT for the given **human** user.

    The token includes a ``pwd_sig`` claim -- a 16-character
    truncated SHA-256 of the stored password hash.  This is
    plain SHA-256, not HMAC -- the password hash is already a
    high-entropy Argon2id output, and the claim is protected
    by the JWT signature.  The auth middleware validates this
    claim on every request so that tokens issued before a
    password change are automatically rejected.

    A ``jti`` (JWT ID) claim is included for per-token session
    tracking and revocation.

    SYSTEM-role tokens are minted by the Go CLI with
    :data:`SYSTEM_ISSUER` / :data:`SYSTEM_AUDIENCE` -- never by
    this method. Calling ``create_token`` with a SYSTEM user
    would mint a token bearing :data:`USER_ISSUER` /
    :data:`USER_AUDIENCE`, which the middleware's
    ``_resolve_jwt_user`` immediately rejects (per-role iss/aud
    enforcement). We fail-fast with ``ValueError`` here so a
    future caller that accidentally passes a SYSTEM user
    surfaces the problem at mint time, not at the next request.

    The claim shape is built through :class:`JwtClaims` so the
    encode-side payload is statically typed and the decode-side
    boundary helper validates against the same model.

    Args:
        user: Authenticated human user.
        session_id: Reuse this session id (``jti``) instead of
            minting a fresh one. Refresh-token rotation passes the
            consumed record's session id so the access token
            rotates *within* the existing session rather than
            spawning a new one (which would orphan the old
            session and saturate ``max_concurrent_sessions``).

    Returns:
        Tuple of (encoded JWT, expiry seconds, session ID).

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        ValueError: If *user* has the SYSTEM role -- mint via
            the CLI's system-token path instead.
    """
    if user.role is HumanRole.SYSTEM:
        msg = (
            "create_token cannot mint SYSTEM-role tokens; "
            "system tokens are issued by the CLI with "
            "SYSTEM_ISSUER / SYSTEM_AUDIENCE"
        )
        raise ValueError(msg)
    secret = self._require_secret("create_token")
    now = datetime.now(UTC)
    expiry_seconds = self._config.jwt_expiry_minutes * 60
    session_id = session_id if session_id is not None else uuid.uuid4().hex
    pwd_sig = hashlib.sha256(
        user.password_hash.encode(),
    ).hexdigest()[:16]
    claims = JwtClaims(
        iss=USER_ISSUER,
        aud=USER_AUDIENCE,
        sub=user.id,
        jti=session_id,
        iat=int(now.timestamp()),
        exp=int((now + timedelta(seconds=expiry_seconds)).timestamp()),
        username=user.username,
        role=user.role,
        must_change_password=user.must_change_password,
        pwd_sig=pwd_sig,
    )
    token = jwt.encode(
        claims.model_dump(mode="json"),
        secret,
        algorithm=self._config.jwt_algorithm,
    )
    return token, expiry_seconds, session_id

decode_token

decode_token(token)

Decode and validate a JWT into a typed claim set.

Issuer (iss) and audience (aud) verification is intentionally deferred to the auth middleware's _resolve_jwt_user: the canonical pair differs by role (synthorg-cli / synthorg-backend for CLI-minted SYSTEM tokens vs. synthorg-api / synthorg-api for API-minted user tokens), and the middleware loads the user record before deciding which pair to enforce. Both claims are require-listed here so a missing claim fails decode rather than reaching the middleware as None.

After PyJWT validates the signature and required claims, the raw payload is routed through :func:synthorg.api.boundary.parse_typed so a malformed claim set (extra keys, type mismatch, iat >= exp) is rejected at the boundary with a structured api.boundary.validation_failed log instead of slipping through and surprising a downstream attribute access.

Parameters:

Name Type Description Default
token str

Encoded JWT string.

required

Returns:

Name Type Description
Validated JwtClaims

class:JwtClaims instance.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

InvalidTokenError

If the token signature, expiry, or required claim set is invalid.

ValidationError

If the decoded claim set does not conform to :class:JwtClaims (extra keys, wrong types, or violated invariants).

Source code in src/synthorg/api/auth/service.py
def decode_token(self, token: str) -> JwtClaims:
    """Decode and validate a JWT into a typed claim set.

    Issuer (``iss``) and audience (``aud``) verification is
    intentionally deferred to the auth middleware's
    ``_resolve_jwt_user``: the canonical pair differs by role
    (``synthorg-cli`` / ``synthorg-backend`` for CLI-minted
    SYSTEM tokens vs. ``synthorg-api`` / ``synthorg-api`` for
    API-minted user tokens), and the middleware loads the user
    record before deciding which pair to enforce. Both claims are
    ``require``-listed here so a missing claim fails decode rather
    than reaching the middleware as ``None``.

    After PyJWT validates the signature and required claims, the
    raw payload is routed through
    :func:`synthorg.api.boundary.parse_typed` so a malformed claim
    set (extra keys, type mismatch, ``iat >= exp``) is rejected at
    the boundary with a structured ``api.boundary.validation_failed``
    log instead of slipping through and surprising a downstream
    attribute access.

    Args:
        token: Encoded JWT string.

    Returns:
        Validated :class:`JwtClaims` instance.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        jwt.InvalidTokenError: If the token signature, expiry,
            or required claim set is invalid.
        ValidationError: If the decoded claim set does not
            conform to :class:`JwtClaims` (extra keys, wrong
            types, or violated invariants).
    """
    secret = self._require_secret("decode_token")
    raw_claims = jwt.decode(
        token,
        secret,
        algorithms=[self._config.jwt_algorithm],
        options={
            "require": ["exp", "iat", "sub", "jti", "iss", "aud"],
            "verify_aud": False,
            "verify_iss": False,
        },
    )
    return parse_typed("jwt", raw_claims, JwtClaims)

persist_refresh_token async

persist_refresh_token(store, *, token_hash, session_id, user_id, expires_at)

Persist a refresh token through the auth-domain boundary.

Centralises the refresh-store write + audit log so callers (notably make_session_cookies) do not reach into app_state._refresh_store directly. The repo handle is passed in rather than held by the service so this stays compatible with the existing AuthService construction (no constructor change required).

Parameters:

Name Type Description Default
store object

The :class:RefreshTokenRepository instance to write through. Typed as object to keep this module free of persistence-layer imports.

required
token_hash str

HMAC-SHA256 hex digest of the raw refresh token.

required
session_id str

Session identifier.

required
user_id str

User identifier.

required
expires_at datetime

Refresh token expiry (UTC).

required

Raises:

Type Description
QueryError

If the underlying repo write fails.

Source code in src/synthorg/api/auth/service.py
async def persist_refresh_token(
    self,
    store: object,
    *,
    token_hash: str,
    session_id: str,
    user_id: str,
    expires_at: datetime,
) -> None:
    """Persist a refresh token through the auth-domain boundary.

    Centralises the refresh-store write + audit log so callers
    (notably ``make_session_cookies``) do not reach into
    ``app_state._refresh_store`` directly.  The repo handle is
    passed in rather than held by the service so this stays
    compatible with the existing AuthService construction (no
    constructor change required).

    Args:
        store: The :class:`RefreshTokenRepository` instance to
            write through.  Typed as ``object`` to keep this
            module free of persistence-layer imports.
        token_hash: HMAC-SHA256 hex digest of the raw refresh token.
        session_id: Session identifier.
        user_id: User identifier.
        expires_at: Refresh token expiry (UTC).

    Raises:
        QueryError: If the underlying repo write fails.
    """
    await store.create(  # type: ignore[attr-defined]
        token_hash=token_hash,
        session_id=session_id,
        user_id=user_id,
        expires_at=expires_at,
    )
    logger.info(
        SECURITY_AUTH_REFRESH_CREATED,
        session_id=session_id,
        user_id=user_id,
    )

rotate_refresh_token async

rotate_refresh_token(*, raw_refresh_token, refresh_store, users, is_session_revoked)

Single-use refresh rotation: consume, validate, re-mint.

The reject matrix lives here (not the controller) so it is unit-testable without the full app: a missing / replayed / expired refresh token or a revoked session emits SECURITY_AUTH_REFRESH_REJECTED (typed reason) and raises :class:RefreshTokenInvalidError (HTTP 401, code 1005). The success path re-mints the access token within the consumed record's session so rotation does not orphan the session or saturate max_concurrent_sessions.

SECURITY_AUTH_REFRESH_CONSUMED is emitted by the caller AFTER the rotated refresh row is persisted (state-transition events log after the write), so it is intentionally not emitted here.

Parameters:

Name Type Description Default
raw_refresh_token str

The opaque refresh cookie value.

required
refresh_store RefreshTokenRepository

Repository providing single-use consume (CAS + replay + session-revocation).

required
users UserRepository

User repository for the post-consume owner lookup.

required
is_session_revoked Callable[[str], bool] | None

Predicate passed into consume so a revoked session rejects rotation.

required

Returns:

Name Type Description
A RefreshRotation

class:RefreshRotation with the new access token and

RefreshRotation

the preserved session id.

Raises:

Type Description
RefreshTokenInvalidError

For any reject path (missing cookie, consume rejection, or owner deleted between issuance and rotation).

Source code in src/synthorg/api/auth/service.py
async def rotate_refresh_token(
    self,
    *,
    raw_refresh_token: str,
    refresh_store: RefreshTokenRepository,
    users: UserRepository,
    is_session_revoked: Callable[[str], bool] | None,
) -> RefreshRotation:
    """Single-use refresh rotation: consume, validate, re-mint.

    The reject matrix lives here (not the controller) so it is
    unit-testable without the full app: a missing / replayed /
    expired refresh token or a revoked session emits
    ``SECURITY_AUTH_REFRESH_REJECTED`` (typed reason) and raises
    :class:`RefreshTokenInvalidError` (HTTP 401, code 1005). The
    success path re-mints the access token *within the consumed
    record's session* so rotation does not orphan the session or
    saturate ``max_concurrent_sessions``.

    ``SECURITY_AUTH_REFRESH_CONSUMED`` is emitted by the caller
    AFTER the rotated refresh row is persisted (state-transition
    events log after the write), so it is intentionally not
    emitted here.

    Args:
        raw_refresh_token: The opaque refresh cookie value.
        refresh_store: Repository providing single-use
            ``consume`` (CAS + replay + session-revocation).
        users: User repository for the post-consume owner lookup.
        is_session_revoked: Predicate passed into ``consume`` so
            a revoked session rejects rotation.

    Returns:
        A :class:`RefreshRotation` with the new access token and
        the preserved session id.

    Raises:
        RefreshTokenInvalidError: For any reject path (missing
            cookie, consume rejection, or owner deleted between
            issuance and rotation).
    """
    if not raw_refresh_token:
        logger.warning(SECURITY_AUTH_REFRESH_REJECTED, reason="cookie_missing")
        raise RefreshTokenInvalidError

    token_hash = self.hash_api_key(raw_refresh_token)
    outcome = await refresh_store.consume(
        token_hash,
        is_session_revoked=is_session_revoked,
    )
    if outcome.reject_reason is not None:
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason=outcome.reject_reason.value,
        )
        raise RefreshTokenInvalidError

    record = outcome.record
    if record is None:
        # RefreshConsumeOutcome's validator guarantees exactly one
        # of record / reject_reason is set and reject_reason was
        # None above, so this is unreachable in practice. Handle it
        # explicitly anyway (not `assert`, which `python -O`
        # strips) so the security path fails closed if the
        # invariant is ever violated by a future change.
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason="consume_outcome_invariant_violation",
        )
        raise RefreshTokenInvalidError

    user = await users.get(record.user_id)
    if user is None:
        # The token row is already marked used; it cannot be
        # un-consumed. Reject so a deleted owner cannot rotate.
        logger.warning(
            SECURITY_AUTH_REFRESH_REJECTED,
            reason="user_not_found_after_consume",
            user_id=record.user_id,
        )
        raise RefreshTokenInvalidError

    token, expires_in, session_id = self.create_token(
        user,
        session_id=record.session_id,
    )
    return RefreshRotation(
        token=token,
        expires_in=expires_in,
        session_id=session_id,
        user=user,
    )

hash_api_key

hash_api_key(raw_key)

Compute HMAC-SHA256 hex digest of a raw API key.

Uses the server-side JWT secret as the HMAC key so that an attacker with read access to stored hashes cannot brute-force API keys offline.

Parameters:

Name Type Description Default
raw_key str

The plaintext API key.

required

Returns:

Type Description
str

Lowercase hex digest.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def hash_api_key(self, raw_key: str) -> str:
    """Compute HMAC-SHA256 hex digest of a raw API key.

    Uses the server-side JWT secret as the HMAC key so that
    an attacker with read access to stored hashes cannot
    brute-force API keys offline.

    Args:
        raw_key: The plaintext API key.

    Returns:
        Lowercase hex digest.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("hash_api_key")
    return hmac.digest(
        secret.encode(),
        raw_key.encode(),
        "sha256",
    ).hex()

generate_api_key staticmethod

generate_api_key()

Generate a cryptographically secure API key.

Returns:

Type Description
str

URL-safe base64 string sized by security.auth_token_bytes

str

(default 32 bytes / 43 base64 chars).

Source code in src/synthorg/api/auth/service.py
@staticmethod
def generate_api_key() -> str:
    """Generate a cryptographically secure API key.

    Returns:
        URL-safe base64 string sized by ``security.auth_token_bytes``
        (default 32 bytes / 43 base64 chars).
    """
    return secrets.token_urlsafe(get_auth_token_bytes())

middleware

JWT + API key authentication middleware.

ApiAuthMiddleware

Bases: AbstractAuthenticationMiddleware

Authenticate requests via cookie, JWT header, or API key.

Authentication priority:

  1. Session cookie: HttpOnly cookie set by login/setup. Primary auth path for browser sessions.
  2. Authorization header: Bearer <token>. Tokens with dots are JWTs (system user CLI tokens). Tokens without dots are API keys (HMAC-SHA256 lookup).

Requires auth_service, persistence backend on app.state["app_state"].

authenticate_request async

authenticate_request(connection)

Validate the session cookie or Authorization header.

Tries the session cookie first. Falls back to the Authorization header for API keys and system user JWTs.

Parameters:

Name Type Description Default
connection ASGIConnection[Any, Any, Any, Any]

Incoming ASGI connection.

required

Returns:

Type Description
AuthenticationResult

AuthenticationResult with AuthenticatedUser.

Raises:

Type Description
NotAuthorizedException

If authentication fails.

Source code in src/synthorg/api/auth/middleware.py
@override
async def authenticate_request(
    self,
    connection: ASGIConnection[Any, Any, Any, Any],
) -> AuthenticationResult:
    """Validate the session cookie or Authorization header.

    Tries the session cookie first.  Falls back to the
    Authorization header for API keys and system user JWTs.

    Args:
        connection: Incoming ASGI connection.

    Returns:
        AuthenticationResult with AuthenticatedUser.

    Raises:
        NotAuthorizedException: If authentication fails.
    """
    app_state = connection.app.state["app_state"]
    auth_service: AuthService = auth_service_of(app_state)
    path = str(connection.url.path)

    # 1. Try session cookie (primary path for browser sessions)
    cookie_name = _get_cookie_name(app_state)
    session_cookie = connection.cookies.get(cookie_name)
    if session_cookie and "." in session_cookie:
        user = await _try_jwt_auth(
            session_cookie,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            logger.debug(
                API_AUTH_COOKIE_USED,
                user_id=user.user_id,
                path=path,
            )
            return AuthenticationResult(user=user, auth=session_cookie)

    if session_cookie:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="cookie_jwt_invalid",
            path=path,
        )

    # 2. Fall back to Authorization header (API keys, system user)
    auth_header = connection.headers.get("authorization")
    if not auth_header:
        if session_cookie:
            # Cookie was present but invalid
            raise NotAuthorizedException(
                detail="Invalid session cookie",
            )
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="missing_authentication",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Missing authentication",
        )

    token = extract_bearer_token(auth_header)
    if token is None:
        logger.warning(
            SECURITY_AUTH_FAILED,
            reason="invalid_scheme",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Invalid authorization scheme",
        )

    if "." in token:
        user = await _try_jwt_auth(
            token,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            return AuthenticationResult(user=user, auth=token)
        raise NotAuthorizedException(detail="Invalid JWT token")

    user = await _try_api_key_auth(
        token,
        auth_service,
        app_state,
        path,
    )
    if user is not None:
        return AuthenticationResult(user=user, auth=token)
    raise NotAuthorizedException(detail="Invalid credentials")

create_auth_middleware_class

create_auth_middleware_class(auth_config)

Create a middleware class with excluded paths baked in.

Litestar's AbstractAuthenticationMiddleware.__init__ takes exclude as a parameter (default None). We create a subclass whose __init__ forwards the configured exclude list to super().__init__.

The middleware is restricted to ScopeType.HTTP only; WebSocket connections use ticket-based auth handled entirely inside the WS handler (see controllers/ws.py).

Parameters:

Name Type Description Default
auth_config AuthConfig

Auth configuration with exclude_paths.

required

Returns:

Type Description
type[ApiAuthMiddleware]

Middleware class ready for use in the Litestar middleware stack.

Source code in src/synthorg/api/auth/middleware.py
def create_auth_middleware_class(
    auth_config: AuthConfig,
) -> type[ApiAuthMiddleware]:
    """Create a middleware class with excluded paths baked in.

    Litestar's ``AbstractAuthenticationMiddleware.__init__`` takes
    ``exclude`` as a parameter (default ``None``).  We create a
    subclass whose ``__init__`` forwards the configured exclude
    list to ``super().__init__``.

    The middleware is restricted to ``ScopeType.HTTP`` only;
    WebSocket connections use ticket-based auth handled entirely
    inside the WS handler (see ``controllers/ws.py``).

    Args:
        auth_config: Auth configuration with exclude_paths.

    Returns:
        Middleware class ready for use in the Litestar middleware stack.
    """
    exclude_paths = (
        list(auth_config.exclude_paths) if auth_config.exclude_paths else None
    )

    class ConfiguredAuthMiddleware(ApiAuthMiddleware):
        """Auth middleware with pre-configured exclude paths."""

        def __init__(self, app: Any) -> None:
            super().__init__(
                app,
                exclude=exclude_paths,
                scopes={ScopeType.HTTP},
            )

    return ConfiguredAuthMiddleware

context

Request-scoped binding for the authenticated user.

The auth middleware (:class:synthorg.api.auth.middleware.ApiAuthMiddleware) populates connection.scope["user"] with an :class:~synthorg.core.auth.models.AuthenticatedUser after authentication. :class:AuthContextMiddleware runs immediately after auth and binds that user into the per-:class:asyncio.Task :class:~contextvars.ContextVar defined here. Controllers and request-coupled helpers then read the authenticated user via :func:get_authenticated_user_id / :func:get_authenticated_user without threading a Request argument.

Reading the var while no user is bound raises :class:AuthContextMissingError (a 500): this surfaces middleware misconfiguration loudly instead of masking it as "api".

WebSocket scopes use ticket-based authentication (synthorg.api.controllers.ws) and are not handled by this module; :class:AuthContextMiddleware is restricted to HTTP scopes.

AuthContextMissingError

AuthContextMissingError(message=None)

Bases: DomainError

Read attempted on the auth ContextVar with no user bound.

Surfacing this as a 500 is intentional: the auth middleware runs before any controller, so by the time a controller (or helper invoked from one) calls :func:get_authenticated_user_id the var must be set. An unset read is therefore a server bug -- exclude_paths misconfiguration, a helper invoked outside the request lifecycle, or :class:AuthContextMiddleware missing from the middleware stack -- not a client error.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

AuthContextMiddleware

Bases: ASGIMiddleware

Bind scope["user"] into the per-task ContextVar.

Runs immediately after :class:~synthorg.api.auth.middleware.ApiAuthMiddleware so authenticated handlers, downstream middleware, and helpers can read the user via :func:get_authenticated_user_id without threading a Request. Excluded paths (where ApiAuthMiddleware skipped) leave the var at its default None; helpers reading it raise :class:AuthContextMissingError, which is the desired behaviour for endpoints that should never have reached a user-coupled helper without authentication.

HTTP-only: WebSocket scopes use ticket-based authentication and are bypassed by the scopes filter on the base class.

handle async

handle(scope, receive, send, next_app)

Bind scope["user"] for the duration of the inner dispatch.

Source code in src/synthorg/api/auth/context.py
@override
async def handle(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
    next_app: ASGIApp,
) -> None:
    """Bind ``scope["user"]`` for the duration of the inner dispatch."""
    scope_user: Any = scope.get("user")
    bound_user: AuthenticatedUser | None
    if isinstance(scope_user, AuthenticatedUser):
        bound_user = scope_user
        logger.debug(
            API_AUTH_CONTEXT_BOUND,
            user_id=scope_user.user_id,
            path=scope.get("path", ""),
        )
    else:
        # Excluded paths legitimately have no scope.user; a present
        # value of any other type means a downstream middleware
        # mutated it or auth was reordered, which is a wiring bug
        # the operator must see.
        bound_user = None
        if scope_user is not None:
            logger.warning(
                API_AUTH_CONTEXT_SKIPPED,
                scope_user_type=type(scope_user).__name__,
                path=scope.get("path", ""),
            )
            # Normalise the request scope to match the bound
            # ContextVar so downstream layers reading scope["user"]
            # directly (rate-limit identifiers, anonymous-tier
            # gate, etc.) see the same unauthenticated state as
            # get_authenticated_user*(). Without this, a foreign
            # principal would be visible to gates while the
            # accessors raise AuthContextMissingError.
            scope["user"] = None
    # Always bind a token (None on the skipped path) so a context
    # inherited from an outer task cannot leak a stale principal
    # into helpers reading the var; reset unconditionally restores
    # the prior binding.
    token = _authenticated_user.set(bound_user)
    try:
        if bound_user is not None:
            # Bind actor identity so decision leaves resolve
            # ``decided_by`` via ``current_actor()`` instead of
            # every caller threading it. ``actor_id`` is the
            # immutable user id; ``label`` is the human-readable
            # username recorded in audit rows.
            actor = ActorIdentity(
                actor_id=bound_user.user_id,
                kind=ActorKind.HUMAN,
                label=bound_user.username,
            )
            with actor_scope(actor):
                await next_app(scope, receive, send)
        else:
            # No principal resolved: clear any actor inherited
            # from an outer context so decision leaves don't
            # mis-attribute ``decided_by`` to a stale identity.
            with actor_scope_cleared():
                await next_app(scope, receive, send)
    finally:
        _authenticated_user.reset(token)

get_authenticated_user

get_authenticated_user()

Return the user bound to the active request's ContextVar.

Raises:

Type Description
AuthContextMissingError

When called outside an authenticated request scope.

Returns:

Type Description
AuthenticatedUser

AuthenticatedUser instance.

Source code in src/synthorg/api/auth/context.py
def get_authenticated_user() -> AuthenticatedUser:
    """Return the user bound to the active request's ContextVar.

    Raises:
        AuthContextMissingError: When called outside an authenticated
            request scope.

    Returns:
        ``AuthenticatedUser`` instance.
    """
    user = _authenticated_user.get()
    if user is None:
        # An unset read is a server-side wiring bug (excluded path
        # misrouted to a request-coupled helper, AuthContextMiddleware
        # missing from the stack, helper invoked outside a request).
        # Operators see only the 500 envelope without this breadcrumb,
        # so emit a structured event before raising.
        logger.warning(API_AUTH_CONTEXT_MISSING, caller="get_authenticated_user")
        raise AuthContextMissingError
    return user

get_authenticated_user_id

get_authenticated_user_id()

Return the user_id of the user bound to the current request.

Raises:

Type Description
AuthContextMissingError

When called outside an authenticated request scope.

Returns:

Type Description
str

Resulting string.

Source code in src/synthorg/api/auth/context.py
def get_authenticated_user_id() -> str:
    """Return the ``user_id`` of the user bound to the current request.

    Raises:
        AuthContextMissingError: When called outside an authenticated
            request scope.

    Returns:
        Resulting string.
    """
    return get_authenticated_user().user_id

authenticated_user_scope async

authenticated_user_scope(user)

Bind user to the auth ContextVar for the duration of the block.

Production binding is performed by :class:AuthContextMiddleware. This helper exists for tests, background tasks, and any caller that needs to invoke a request-coupled helper outside the HTTP request path. Mirrors :func:synthorg.providers.cost_recording.cost_recording_scope -- token-based reset for exception safety, restoring whatever was active before.

Example (background task that calls a request-coupled helper)::

async def _background_audit(user: AuthenticatedUser) -> None:
    async with authenticated_user_scope(user):
        # audit_actor_from_context() now returns this user's
        # ProviderAuditActor without raising.
        actor = audit_actor_from_context()
        ...
Source code in src/synthorg/api/auth/context.py
@asynccontextmanager
async def authenticated_user_scope(
    user: AuthenticatedUser,
) -> AsyncIterator[None]:
    """Bind ``user`` to the auth ContextVar for the duration of the block.

    Production binding is performed by :class:`AuthContextMiddleware`.
    This helper exists for tests, background tasks, and any caller that
    needs to invoke a request-coupled helper outside the HTTP request
    path. Mirrors :func:`synthorg.providers.cost_recording.cost_recording_scope`
    -- token-based reset for exception safety, restoring whatever was
    active before.

    Example (background task that calls a request-coupled helper)::

        async def _background_audit(user: AuthenticatedUser) -> None:
            async with authenticated_user_scope(user):
                # audit_actor_from_context() now returns this user's
                # ProviderAuditActor without raising.
                actor = audit_actor_from_context()
                ...
    """
    token = _authenticated_user.set(user)
    try:
        yield
    finally:
        _authenticated_user.reset(token)