Async & Framework Integrations

Integrate the sync SDK in Django, FastAPI, Flask, and asyncio runtimes (asyncio.to_thread for flush)

This page is split into two clearly separated sections:

  1. Shipped today — how to integrate the sync SDK in every common Python runtime, including runtimes that run an asyncio event loop.
  2. Planned (Phase 3 design intent) — the future async public API and framework-helper distributions that are not implemented yet.

Part 1 — Shipped: sync runtime integration

The Convert Python SDK is sync-first and framework-agnostic. The full evaluation, tracking, and config-fetch pipeline is synchronous. There is no AsyncCore, no AsyncContext, and no framework middleware in the current package.

How the queue works

context.track_conversion(...) is lightweight and synchronous. It deduplicates by (visitor_id, goal_id) and appends an event to an in-process queue. No network call happens on track_conversion. The queue is released (serialized and delivered over HTTPS) when one of these happens:

  1. Explicit flushcore.flush(). This is the canonical, deterministic control point and the recommended default for all runtimes.
  2. Batch-size release — the queue reaches SDKConfig.batch_size (default 10) and auto-releases.
  3. Periodic flush — opt-in only: set SDKConfig.auto_flush_interval_ms to start a daemonic background timer thread.
  4. atexit hook — opt-in, best-effort: register_atexit_flush(core) attempts a final flush on normal interpreter shutdown.

The default lifecycle is explicit-flush-only, which is safe in every runtime. Periodic flush is not the default because a timed flush silently drops events in short-lived runtimes (Lambda, CLI scripts) that exit before the timer fires.

Quick decision table

RuntimeRecommended strategyReason
AWS LambdaExplicit core.flush() at handler endFrozen between invocations; background timers don't fire reliably
Google Cloud RunExplicit flush per request + optional SIGTERM handlerRequest-scoped; SIGTERM precedes container shutdown
gunicorn (sync workers)Periodic flush (auto_flush_interval_ms) or per-request flushLong-lived workers; a daemonic timer amortizes delivery cost
uvicorn / hypercorn (ASGI)Periodic flush or flush in the ASGI lifespan shutdown handlerLong-lived event loop; lifespan shutdown is a clean flush point
Celery workersFlush at task end or periodic flushLong-lived; per-task flush bounds delivery latency
CLI / batch scriptsExplicit core.flush() before exit, optionally with atexitShort-lived; process exits as soon as work is done

AWS Lambda

Flush explicitly at the end of every handler invocation. Do not rely on a periodic timer or atexit — the execution environment is frozen between invocations and may be killed without firing them.

import os
from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()

def handler(event, context):
    ctx = core.create_context(event["visitor_id"])
    result = ctx.run_experience("checkout-experiment")
    ctx.track_conversion("purchase_completed", revenue=event["amount"])
    # Deliver before the runtime freezes or kills the environment.
    core.flush()
    return {"statusCode": 200, "variation": result.variation_key if result else None}

Core is initialized outside the handler so the config is loaded once per cold start, not per invocation. core.flush() inside the handler guarantees delivery before the Lambda runtime checkpoints the container.

Google Cloud Run

Flush per request. Optionally add a SIGTERM handler for a best-effort final flush when the container is scaled down. Cloud Run sends SIGTERM before termination; the SDK never registers signal handlers — opt in explicitly.

import os
import signal
from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()


def _handle_sigterm(signum, frame):
    core.flush()        # best-effort final delivery
    raise SystemExit(0)


signal.signal(signal.SIGTERM, _handle_sigterm)


def handle_checkout(visitor_id: str, amount: float) -> dict:
    ctx = core.create_context(visitor_id)
    ctx.track_conversion("purchase_completed", revenue=amount)
    core.flush()        # deterministic per-request delivery
    return {"ok": True}

gunicorn (sync workers)

Workers are long-lived, so an opt-in daemonic periodic flush amortizes delivery without requiring an explicit call on every request. The timer thread is daemonic and never blocks worker shutdown.

import os
from convert_sdk import Core, SDKConfig

# Flush the queue every 5 seconds in the background.
core = Core(
    SDKConfig(
        sdk_key=os.environ["CONVERT_SDK_KEY"],
        auto_flush_interval_ms=5000,
    )
).initialize()

Prefer a per-request core.flush() if you need delivery to be deterministic per response. Either way, call core.close() on worker shutdown to cancel the timer cleanly and release the httpx connection pool.

uvicorn / hypercorn (ASGI)

The sync SDK is safe to call from ASGI applications — evaluation and queue operations are pure CPU work and do not block the event loop in any observable way. Use periodic flush or flush in the ASGI lifespan shutdown handler for a clean final delivery on app shutdown.

import os
from contextlib import asynccontextmanager
from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()


@asynccontextmanager
async def lifespan(app):
    yield                   # application is running
    core.flush()            # final delivery before shutdown
    core.close()            # cancel any periodic timer + close transport

core.flush() is a synchronous call that performs one blocking HTTP POST. For most workloads this is negligible in a shutdown path. If you need non-blocking flush in an async context today, use asyncio.to_thread(core.flush) — see Using the sync SDK from async code below.

Celery workers

Long-lived workers: flush at the end of each task for bounded delivery latency, or enable periodic flush.

import os
from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()


@app.task
def record_purchase(visitor_id: str, amount: float) -> None:
    ctx = core.create_context(visitor_id)
    ctx.track_conversion("purchase_completed", revenue=amount)
    core.flush()    # deliver before the task frame exits

CLI scripts and batch jobs

Short-lived processes: flush explicitly before exit. Optionally register the best-effort atexit hook as a safety net.

import os
from convert_sdk import Core, SDKConfig
from convert_sdk.tracking.flush import register_atexit_flush

core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
register_atexit_flush(core)   # best-effort final flush on normal exit

ctx = core.create_context("cli-user")
ctx.track_conversion("report_generated")
core.flush()    # explicit, deterministic delivery

atexit is best-effort. It does not fire under SIGKILL, some serverless runtimes, or hard crashes. Never rely on it as your only delivery path.

Using the sync SDK from async code

The sync SDK can be called from async def functions today without any Phase 3 work. Bucketing, rule evaluation, and feature resolution are pure CPU computation with no I/O — they never block the event loop. Config refresh runs on its own daemonic thread and does not touch the event loop at all.

The only operation that blocks is core.flush(), which performs one HTTP POST. If you want non-blocking flush from a coroutine, offload it:

import asyncio
from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(data=my_config)).initialize()


async def evaluate(visitor_id: str) -> str | None:
    ctx = core.create_context(visitor_id)
    result = ctx.run_experience("checkout-experiment")
    return result.variation_key if result else None


async def flush_async() -> None:
    # Offload the blocking HTTP POST to a thread pool worker.
    await asyncio.to_thread(core.flush)

asyncio.to_thread requires Python 3.9+, which is the SDK's minimum Python version, so this pattern is always available. No async SDK surface is needed for this use case — asyncio.to_thread is the documented and supported approach today.

What happens if you never flush?

A process that exits cleanly without flushing silently drops its queued events — no crash, no error. This is intentional: tracking must never destabilize your application. If delivery matters, use one of the flush strategies in the table above.

All release triggers (batch-size, explicit, timer, atexit) funnel through a single shared release path, so the serialized payload shape is identical regardless of how the flush was triggered.


Part 2 — Planned (Phase 3 design intent, not yet shipped)

Everything in this section is design intent for Phase 3 — not implemented. No symbols named AsyncCore, AsyncContext, AsyncTransport, or any framework helper (convert-sdk-django, convert-sdk-fastapi, convert-sdk-flask) exist in convert_sdk today. The code shapes below are illustrative of the planned API; they are not executable against the current package. Implementation requires explicit Phase 3 sign-off.

Why async is deferred

The MVP shipped sync-first for three reasons:

  1. The sync API covers the dominant Python backend shapes (Django request handlers, Flask views, Celery tasks, CLI scripts) without any asyncio entanglement.
  2. Adding async to the MVP would have doubled the public surface area to maintain and introduced event-loop concerns into a library that does not need them for correctness.
  3. The parity validation work (cross-SDK bucketing/rule/feature correctness) is more load-bearing than async support, and was prioritized first.

The architecture constraint was: the MVP must not be async-hostile. A forward-compatibility audit (see below) confirmed that no shipped module needs a destructive rewrite when async lands.

Sync–async coexistence model (planned)

When async lands, it lands as a parallel surface alongside the sync API — not as a replacement or migration target. Five rules govern the interaction:

  1. Async is a parallel API, not a replacement. Core / Context remain first-class. AsyncCore / AsyncContext are added alongside them. There is no deprecation of the sync surface.

  2. Shared synchronous evaluation core. evaluation/bucketing.py, evaluation/rules.py, evaluation/segments.py, evaluation/experiences.py, evaluation/features.py, and evaluation/entity_lookup.py are pure synchronous computation with no I/O. Both surfaces call these functions directly — there are no async versions of evaluation functions.

  3. Async transport adapter. The Transport Protocol keeps its sync definition. An AsyncTransport sibling Protocol is added with async def methods. HttpxAsyncTransport (backed by httpx.AsyncClient) is the planned bundled adapter.

  4. The tracking queue stays thread-safe. The existing threading.Lock- backed queue is reused unchanged. From async code it is reached via asyncio.to_thread() or behind an optional async wrapper.

  5. The SDK never owns the event loop. Async callers provide the loop context; the SDK never calls asyncio.run() internally. Config refresh remains a daemon thread (not an asyncio task) for sync callers.

The planned class structure (illustrative — not yet implemented):

                      evaluation/ (pure sync compute, shared)
                      bucketing, rules, features, segments
                           |               |
                     Core (sync)     AsyncCore (planned)
                     Context (sync)  AsyncContext (planned)
                           |               |
                   HttpxTransport   HttpxAsyncTransport (planned)
                   httpx.Client     httpx.AsyncClient

Forward-compatibility audit

Every module in the current SDK source tree was audited for sync-only assumptions that would block a future async wrapper. No module needs a destructive rewrite — the four I/O-touching seams gain async siblings, and the pure computation modules are reused verbatim.

ModuleStatusNotes
evaluation/*async-readyPure sync computation, no I/O. Reused as-is by both surfaces.
domain/config_snapshot.py, context_state.py, results.pyasync-readyImmutable dataclasses; safe from any thread or coroutine.
tracking/payloads.py, tracking/conversions.py, tracking/deduplication.pyasync-readyPure serialization / dedup-key derivation; no I/O.
tracking/queue.pyasync-readyAlready thread-safe via threading.Lock. Async callers reach it via asyncio.to_thread().
errors.py, events.py, config.py, version.py, logging.pyasync-readyType/constant/exception definitions; no runtime I/O.
_internal/redaction.pyasync-readyPure string helpers.
config_loader/normalizer.py, validators.pyasync-readyPure transformation/validation; no I/O.
adapters/storage/in_memory.pyasync-readyThread-safe; usable from async code via asyncio.to_thread().
adapters/events/in_process.pyasync-readySync fan-out; an async event bus is an optional sibling, not a precondition.
ports/transport.pyextend-with-siblingAdd AsyncTransport Protocol alongside Transport.
ports/storage.pyextend-with-siblingAdd AsyncDataStore Protocol alongside DataStore.
ports/event_bus.pyextend-with-siblingAdd AsyncEventBus Protocol if async-native delivery is wanted.
adapters/transport/httpx_transport.pyextend-with-siblingAdd HttpxAsyncTransport on httpx.AsyncClient; sync adapter untouched.
tracking/tracker.pyextend-with-siblingAdd AsyncTracker reusing the same queue/payload logic, awaiting AsyncTransport for delivery.
tracking/flush.pyextend-with-siblingSync daemon-timer refresher stays for sync callers; async callers get an asyncio.call_later-based scheduler.
config_loader/loader.pyextend-with-siblingAdd async loader awaiting AsyncTransport; the pure post-processing stages are shared.
config_loader/refresh.pyextend-with-siblingSync daemon-thread refresher stays; AsyncConfigRefresher uses an asyncio-task scheduler for the async surface.
core.pyextend-with-siblingAsyncCore is a parallel facade reusing the same evaluation core and domain objects, swapping only I/O collaborators. Core is not rewritten.
context.pyextend-with-siblingAsyncContext reuses the same pure evaluation functions; Context is not rewritten.

Planned public API shape (design intent only)

The planned async surface names are illustrative. The final naming choice (identical method names on AsyncCore/AsyncContext vs. a-prefix convention) is a Phase 3 sign-off decision.

# PLANNED — not yet implemented. AsyncCore and AsyncContext do not exist today.

import asyncio
from convert_sdk import SDKConfig  # SDKConfig is shared

# Planned import — symbol does not exist yet:
# from convert_sdk import AsyncCore

async def example():
    async with AsyncCore(SDKConfig(sdk_key="...")).initialize() as core:
        ctx = await core.create_context("visitor-1")
        result = await ctx.run_experience("checkout-experiment")
        await ctx.track_conversion("purchase_completed", revenue=49.99)
        await core.flush()

The sync names — run_experience, run_feature, track_conversion, flush, refresh_now, close — are retained on Core/Context. The async surface uses the same names as coroutines on AsyncCore/AsyncContext, or adds an a-prefix variant — the decision is deferred to Phase 3. aclose() is a deliberate exception following the asyncio convention (cf. asyncio.StreamWriter.aclose).

Planned framework helpers (design intent only)

Framework helpers ship as separate distributions, not as part of convert-python-sdk. The core package remains usable with zero framework dependency. Uninstalling a helper does not remove core functionality.

Planned packageFrameworkWhat it provides
convert-sdk-djangoDjangoMiddleware, request-scoped Context, settings integration
convert-sdk-fastapiFastAPI / StarletteDependency-injection helpers, request-scoped Context, lifespan wiring
convert-sdk-flaskFlaskExtension class, g-scoped Context

Whether these ship as separate repos or as in-tree namespace packages is a Phase 3 sign-off decision.

Each helper provides at minimum: request-scoped Context construction, dependency-injection / middleware wiring, and lifecycle wiring (initialize on startup, flush() / close() on shutdown mapped onto the framework's lifecycle hooks).

Deprecation policy: each helper pins a supported range of its upstream framework. When the framework makes a breaking change, the helper ships a new major version; the prior major receives a deprecation notice and a documented support window. The core SDK never takes on a framework version constraint as a result.

Cross-SDK parity coverage (binding constraint for async)

The parity vectors (bucketing, rule, feature, state) under tests/parity/ describe correctness contracts of the evaluation core. Because both sync and async surfaces share that core, the same vectors apply to both. When the async surface lands, the parity job must run the same fixtures through AsyncCore/AsyncContext and assert identical normalized outcomes. This is a hard requirement for Phase 3 — the parity discipline applies unchanged.

Open questions for Phase 3 sign-off

These points are deliberately left open. They require Phase 3 sign-off and must not be pre-empted by MVP or Phase 2 work:

  1. Async transport tactic: native HttpxAsyncTransport on httpx.AsyncClient, or asyncio.to_thread()-wrapping the sync transport at the AsyncCore boundary? The AsyncTransport Protocol shape is fixed either way.
  2. DataStore Protocol shape: a separate AsyncDataStore Protocol, or a dual-mode Protocol that accepts both sync and async adapters?
  3. Async event bus: reuse the sync EventBus (handlers schedule their own async work) or add a parallel AsyncEventBus?
  4. Async method naming: a-prefixed methods on a shared type vs. distinct AsyncCore/AsyncContext types with identically named coroutines.

Related pages

  • Initialization — shipped config refresh (RefreshConfig, auto_flush_interval_ms)
  • Extending — the Protocol-based extension model that carries forward into the async surface
  • ConfigurationSDKConfig, TransportConfig, RefreshConfig fields