Async & Framework Integrations
Integrate the sync SDK in Django, FastAPI, Flask, and asyncio runtimes (asyncio.to_thread for flush)
This page is split into two clearly separated sections:
- Shipped today — how to integrate the sync SDK in every common Python runtime, including runtimes that run an asyncio event loop.
- Planned (Phase 3 design intent) — the future async public API and framework-helper distributions that are not implemented yet.
Part 1 — Shipped: sync runtime integration
The Convert Python SDK is sync-first and framework-agnostic. The full
evaluation, tracking, and config-fetch pipeline is synchronous. There is no
AsyncCore, no AsyncContext, and no framework middleware in the current
package.
How the queue works
context.track_conversion(...) is lightweight and synchronous. It deduplicates
by (visitor_id, goal_id) and appends an event to an in-process queue. No
network call happens on track_conversion. The queue is released (serialized
and delivered over HTTPS) when one of these happens:
- Explicit flush —
core.flush(). This is the canonical, deterministic control point and the recommended default for all runtimes. - Batch-size release — the queue reaches
SDKConfig.batch_size(default10) and auto-releases. - Periodic flush — opt-in only: set
SDKConfig.auto_flush_interval_msto start a daemonic background timer thread. atexithook — opt-in, best-effort:register_atexit_flush(core)attempts a final flush on normal interpreter shutdown.
The default lifecycle is explicit-flush-only, which is safe in every runtime. Periodic flush is not the default because a timed flush silently drops events in short-lived runtimes (Lambda, CLI scripts) that exit before the timer fires.
Quick decision table
| Runtime | Recommended strategy | Reason |
|---|---|---|
| AWS Lambda | Explicit core.flush() at handler end | Frozen between invocations; background timers don't fire reliably |
| Google Cloud Run | Explicit flush per request + optional SIGTERM handler | Request-scoped; SIGTERM precedes container shutdown |
| gunicorn (sync workers) | Periodic flush (auto_flush_interval_ms) or per-request flush | Long-lived workers; a daemonic timer amortizes delivery cost |
| uvicorn / hypercorn (ASGI) | Periodic flush or flush in the ASGI lifespan shutdown handler | Long-lived event loop; lifespan shutdown is a clean flush point |
| Celery workers | Flush at task end or periodic flush | Long-lived; per-task flush bounds delivery latency |
| CLI / batch scripts | Explicit core.flush() before exit, optionally with atexit | Short-lived; process exits as soon as work is done |
AWS Lambda
Flush explicitly at the end of every handler invocation. Do not rely on a
periodic timer or atexit — the execution environment is frozen between
invocations and may be killed without firing them.
import os
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
def handler(event, context):
ctx = core.create_context(event["visitor_id"])
result = ctx.run_experience("checkout-experiment")
ctx.track_conversion("purchase_completed", revenue=event["amount"])
# Deliver before the runtime freezes or kills the environment.
core.flush()
return {"statusCode": 200, "variation": result.variation_key if result else None}Core is initialized outside the handler so the config is loaded once per
cold start, not per invocation. core.flush() inside the handler guarantees
delivery before the Lambda runtime checkpoints the container.
Google Cloud Run
Flush per request. Optionally add a SIGTERM handler for a best-effort final flush when the container is scaled down. Cloud Run sends SIGTERM before termination; the SDK never registers signal handlers — opt in explicitly.
import os
import signal
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
def _handle_sigterm(signum, frame):
core.flush() # best-effort final delivery
raise SystemExit(0)
signal.signal(signal.SIGTERM, _handle_sigterm)
def handle_checkout(visitor_id: str, amount: float) -> dict:
ctx = core.create_context(visitor_id)
ctx.track_conversion("purchase_completed", revenue=amount)
core.flush() # deterministic per-request delivery
return {"ok": True}gunicorn (sync workers)
Workers are long-lived, so an opt-in daemonic periodic flush amortizes delivery without requiring an explicit call on every request. The timer thread is daemonic and never blocks worker shutdown.
import os
from convert_sdk import Core, SDKConfig
# Flush the queue every 5 seconds in the background.
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
auto_flush_interval_ms=5000,
)
).initialize()Prefer a per-request core.flush() if you need delivery to be deterministic
per response. Either way, call core.close() on worker shutdown to cancel the
timer cleanly and release the httpx connection pool.
uvicorn / hypercorn (ASGI)
The sync SDK is safe to call from ASGI applications — evaluation and queue operations are pure CPU work and do not block the event loop in any observable way. Use periodic flush or flush in the ASGI lifespan shutdown handler for a clean final delivery on app shutdown.
import os
from contextlib import asynccontextmanager
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
@asynccontextmanager
async def lifespan(app):
yield # application is running
core.flush() # final delivery before shutdown
core.close() # cancel any periodic timer + close transportcore.flush() is a synchronous call that performs one blocking HTTP POST. For
most workloads this is negligible in a shutdown path. If you need non-blocking
flush in an async context today, use asyncio.to_thread(core.flush) — see
Using the sync SDK from async code below.
Celery workers
Long-lived workers: flush at the end of each task for bounded delivery latency, or enable periodic flush.
import os
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
@app.task
def record_purchase(visitor_id: str, amount: float) -> None:
ctx = core.create_context(visitor_id)
ctx.track_conversion("purchase_completed", revenue=amount)
core.flush() # deliver before the task frame exitsCLI scripts and batch jobs
Short-lived processes: flush explicitly before exit. Optionally register the
best-effort atexit hook as a safety net.
import os
from convert_sdk import Core, SDKConfig
from convert_sdk.tracking.flush import register_atexit_flush
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
register_atexit_flush(core) # best-effort final flush on normal exit
ctx = core.create_context("cli-user")
ctx.track_conversion("report_generated")
core.flush() # explicit, deterministic deliveryatexit is best-effort. It does not fire under SIGKILL, some serverless
runtimes, or hard crashes. Never rely on it as your only delivery path.
Using the sync SDK from async code
The sync SDK can be called from async def functions today without any Phase 3
work. Bucketing, rule evaluation, and feature resolution are pure CPU
computation with no I/O — they never block the event loop. Config refresh
runs on its own daemonic thread and does not touch the event loop at all.
The only operation that blocks is core.flush(), which performs one HTTP POST.
If you want non-blocking flush from a coroutine, offload it:
import asyncio
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(data=my_config)).initialize()
async def evaluate(visitor_id: str) -> str | None:
ctx = core.create_context(visitor_id)
result = ctx.run_experience("checkout-experiment")
return result.variation_key if result else None
async def flush_async() -> None:
# Offload the blocking HTTP POST to a thread pool worker.
await asyncio.to_thread(core.flush)asyncio.to_thread requires Python 3.9+, which is the SDK's minimum Python
version, so this pattern is always available. No async SDK surface is needed
for this use case — asyncio.to_thread is the documented and supported
approach today.
What happens if you never flush?
A process that exits cleanly without flushing silently drops its queued events — no crash, no error. This is intentional: tracking must never destabilize your application. If delivery matters, use one of the flush strategies in the table above.
All release triggers (batch-size, explicit, timer, atexit) funnel through a
single shared release path, so the serialized payload shape is identical
regardless of how the flush was triggered.
Part 2 — Planned (Phase 3 design intent, not yet shipped)
Everything in this section is design intent for Phase 3 — not implemented. No symbols named
AsyncCore,AsyncContext,AsyncTransport, or any framework helper (convert-sdk-django,convert-sdk-fastapi,convert-sdk-flask) exist inconvert_sdktoday. The code shapes below are illustrative of the planned API; they are not executable against the current package. Implementation requires explicit Phase 3 sign-off.
Why async is deferred
The MVP shipped sync-first for three reasons:
- The sync API covers the dominant Python backend shapes (Django request handlers, Flask views, Celery tasks, CLI scripts) without any asyncio entanglement.
- Adding async to the MVP would have doubled the public surface area to maintain and introduced event-loop concerns into a library that does not need them for correctness.
- The parity validation work (cross-SDK bucketing/rule/feature correctness) is more load-bearing than async support, and was prioritized first.
The architecture constraint was: the MVP must not be async-hostile. A forward-compatibility audit (see below) confirmed that no shipped module needs a destructive rewrite when async lands.
Sync–async coexistence model (planned)
When async lands, it lands as a parallel surface alongside the sync API — not as a replacement or migration target. Five rules govern the interaction:
-
Async is a parallel API, not a replacement.
Core/Contextremain first-class.AsyncCore/AsyncContextare added alongside them. There is no deprecation of the sync surface. -
Shared synchronous evaluation core.
evaluation/bucketing.py,evaluation/rules.py,evaluation/segments.py,evaluation/experiences.py,evaluation/features.py, andevaluation/entity_lookup.pyare pure synchronous computation with no I/O. Both surfaces call these functions directly — there are noasyncversions of evaluation functions. -
Async transport adapter. The
TransportProtocol keeps its sync definition. AnAsyncTransportsibling Protocol is added withasync defmethods.HttpxAsyncTransport(backed byhttpx.AsyncClient) is the planned bundled adapter. -
The tracking queue stays thread-safe. The existing
threading.Lock- backed queue is reused unchanged. From async code it is reached viaasyncio.to_thread()or behind an optional async wrapper. -
The SDK never owns the event loop. Async callers provide the loop context; the SDK never calls
asyncio.run()internally. Config refresh remains a daemon thread (not an asyncio task) for sync callers.
The planned class structure (illustrative — not yet implemented):
evaluation/ (pure sync compute, shared)
bucketing, rules, features, segments
| |
Core (sync) AsyncCore (planned)
Context (sync) AsyncContext (planned)
| |
HttpxTransport HttpxAsyncTransport (planned)
httpx.Client httpx.AsyncClient
Forward-compatibility audit
Every module in the current SDK source tree was audited for sync-only assumptions that would block a future async wrapper. No module needs a destructive rewrite — the four I/O-touching seams gain async siblings, and the pure computation modules are reused verbatim.
| Module | Status | Notes |
|---|---|---|
evaluation/* | async-ready | Pure sync computation, no I/O. Reused as-is by both surfaces. |
domain/config_snapshot.py, context_state.py, results.py | async-ready | Immutable dataclasses; safe from any thread or coroutine. |
tracking/payloads.py, tracking/conversions.py, tracking/deduplication.py | async-ready | Pure serialization / dedup-key derivation; no I/O. |
tracking/queue.py | async-ready | Already thread-safe via threading.Lock. Async callers reach it via asyncio.to_thread(). |
errors.py, events.py, config.py, version.py, logging.py | async-ready | Type/constant/exception definitions; no runtime I/O. |
_internal/redaction.py | async-ready | Pure string helpers. |
config_loader/normalizer.py, validators.py | async-ready | Pure transformation/validation; no I/O. |
adapters/storage/in_memory.py | async-ready | Thread-safe; usable from async code via asyncio.to_thread(). |
adapters/events/in_process.py | async-ready | Sync fan-out; an async event bus is an optional sibling, not a precondition. |
ports/transport.py | extend-with-sibling | Add AsyncTransport Protocol alongside Transport. |
ports/storage.py | extend-with-sibling | Add AsyncDataStore Protocol alongside DataStore. |
ports/event_bus.py | extend-with-sibling | Add AsyncEventBus Protocol if async-native delivery is wanted. |
adapters/transport/httpx_transport.py | extend-with-sibling | Add HttpxAsyncTransport on httpx.AsyncClient; sync adapter untouched. |
tracking/tracker.py | extend-with-sibling | Add AsyncTracker reusing the same queue/payload logic, awaiting AsyncTransport for delivery. |
tracking/flush.py | extend-with-sibling | Sync daemon-timer refresher stays for sync callers; async callers get an asyncio.call_later-based scheduler. |
config_loader/loader.py | extend-with-sibling | Add async loader awaiting AsyncTransport; the pure post-processing stages are shared. |
config_loader/refresh.py | extend-with-sibling | Sync daemon-thread refresher stays; AsyncConfigRefresher uses an asyncio-task scheduler for the async surface. |
core.py | extend-with-sibling | AsyncCore is a parallel facade reusing the same evaluation core and domain objects, swapping only I/O collaborators. Core is not rewritten. |
context.py | extend-with-sibling | AsyncContext reuses the same pure evaluation functions; Context is not rewritten. |
Planned public API shape (design intent only)
The planned async surface names are illustrative. The final naming choice
(identical method names on AsyncCore/AsyncContext vs. a-prefix convention)
is a Phase 3 sign-off decision.
# PLANNED — not yet implemented. AsyncCore and AsyncContext do not exist today.
import asyncio
from convert_sdk import SDKConfig # SDKConfig is shared
# Planned import — symbol does not exist yet:
# from convert_sdk import AsyncCore
async def example():
async with AsyncCore(SDKConfig(sdk_key="...")).initialize() as core:
ctx = await core.create_context("visitor-1")
result = await ctx.run_experience("checkout-experiment")
await ctx.track_conversion("purchase_completed", revenue=49.99)
await core.flush()The sync names — run_experience, run_feature, track_conversion, flush,
refresh_now, close — are retained on Core/Context. The async surface
uses the same names as coroutines on AsyncCore/AsyncContext, or adds an
a-prefix variant — the decision is deferred to Phase 3. aclose() is a
deliberate exception following the asyncio convention (cf.
asyncio.StreamWriter.aclose).
Planned framework helpers (design intent only)
Framework helpers ship as separate distributions, not as part of
convert-python-sdk. The core package remains usable with zero framework
dependency. Uninstalling a helper does not remove core functionality.
| Planned package | Framework | What it provides |
|---|---|---|
convert-sdk-django | Django | Middleware, request-scoped Context, settings integration |
convert-sdk-fastapi | FastAPI / Starlette | Dependency-injection helpers, request-scoped Context, lifespan wiring |
convert-sdk-flask | Flask | Extension class, g-scoped Context |
Whether these ship as separate repos or as in-tree namespace packages is a Phase 3 sign-off decision.
Each helper provides at minimum: request-scoped Context construction,
dependency-injection / middleware wiring, and lifecycle wiring
(initialize on startup, flush() / close() on shutdown mapped onto the
framework's lifecycle hooks).
Deprecation policy: each helper pins a supported range of its upstream framework. When the framework makes a breaking change, the helper ships a new major version; the prior major receives a deprecation notice and a documented support window. The core SDK never takes on a framework version constraint as a result.
Cross-SDK parity coverage (binding constraint for async)
The parity vectors (bucketing, rule, feature, state) under tests/parity/
describe correctness contracts of the evaluation core. Because both sync and
async surfaces share that core, the same vectors apply to both. When the async
surface lands, the parity job must run the same fixtures through
AsyncCore/AsyncContext and assert identical normalized outcomes. This is a
hard requirement for Phase 3 — the parity discipline applies unchanged.
Open questions for Phase 3 sign-off
These points are deliberately left open. They require Phase 3 sign-off and must not be pre-empted by MVP or Phase 2 work:
- Async transport tactic: native
HttpxAsyncTransportonhttpx.AsyncClient, orasyncio.to_thread()-wrapping the sync transport at theAsyncCoreboundary? TheAsyncTransportProtocol shape is fixed either way. DataStoreProtocol shape: a separateAsyncDataStoreProtocol, or a dual-mode Protocol that accepts both sync and async adapters?- Async event bus: reuse the sync
EventBus(handlers schedule their own async work) or add a parallelAsyncEventBus? - Async method naming:
a-prefixed methods on a shared type vs. distinctAsyncCore/AsyncContexttypes with identically named coroutines.
Related pages
- Initialization — shipped config refresh (
RefreshConfig,auto_flush_interval_ms) - Extending — the Protocol-based extension model that carries forward into the async surface
- Configuration —
SDKConfig,TransportConfig,RefreshConfigfields