Migrating from Raw REST
Map raw Convert HTTP/config/tracking flows onto the typed Python SDK surface
If you integrate with Convert today by calling the HTTP endpoints directly — fetching the config payload and POSTing tracking events — the Python SDK gives you the same outcomes behind a typed, higher-level surface, plus operational behaviors you would otherwise hand-roll. This guide maps your existing raw REST flows onto the SDK and shows what you stop having to maintain.
What the SDK replaces at a glance
| Concern | Raw REST today | SDK equivalent |
|---|---|---|
| Config fetch | GET /api/v1/config/{sdk_key} on startup or per-request | Core(SDKConfig(sdk_key=...)).initialize() — fetches and snapshots config once |
| Bucketing | Hand-rolled MurmurHash3 + allocation walk | context.run_experience(key) — deterministic, in-process, no network |
| Audience rule evaluation | Deferred to the platform or skipped | Local evaluation against the config snapshot per call |
| Conversion POST | Manual JSON construction + one POST per event | context.track_conversion(goal_key, ...) then core.flush() |
| Batching | Manual or absent | Automatic — configurable SDKConfig.batch_size (default 10) |
| Deduplication | Manual per-visitor state | Built-in (visitor_id, goal_id) dedup via DataStore |
| Retry logic | Manual | Re-call core.flush() after a TrackingDeliveryError |
The SDK fetches config exactly once like your existing REST call does, then holds an immutable config snapshot and evaluates locally on every request — there is no per-evaluation network call.
Config retrieval: before and after
Raw REST:
import os
import httpx
sdk_key = os.environ["CONVERT_SDK_KEY"]
response = httpx.get(
f"https://cdn-4.convertexperiments.com/api/v1/config/{sdk_key}",
headers={"Accept": "application/json"},
)
response.raise_for_status()
project_config = response.json()
# ...now you parse project_config and implement bucketing yourselfSDK equivalent:
import os
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(sdk_key=os.environ["CONVERT_SDK_KEY"])).initialize()
# core.current_config holds the parsed snapshot; no manual JSON parsingIf you already hold a config payload (e.g. you fetch and cache it yourself), pass it as data= and skip the network entirely:
from convert_sdk import Core, SDKConfig
# existing_config is the dict you already parsed from your cache
core = Core(SDKConfig(data=existing_config)).initialize()See Initialization for the full SDKConfig reference.
Bucketing: before and after
Raw REST integrations either skip bucketing (relying on server-side assignment) or reimplement the MurmurHash3 bucketing locally. The algorithm must match the SDK's exactly or you get different assignments.
Raw REST (manual bucketing — error-prone):
import mmh3 # third-party, not a Convert SDK dependency
def bucket_visitor(experience_id: str, visitor_id: str, seed: int = 9999) -> int:
hash_input = f"{experience_id}{visitor_id}"
hash_value = mmh3.hash(hash_input, seed) & 0xFFFF_FFFF
return int((hash_value / 4_294_967_296) * 10_000)
def select_variation(variations: list, bucket_value: int):
accumulated = 0.0
for v in variations:
accumulated += float(v["traffic_allocation"]) * 100
if bucket_value < accumulated:
return v["id"]
return None
experience = next(
e for e in project_config["experiences"] if e["key"] == "checkout-flow"
)
bucket = bucket_visitor(experience["id"], "visitor-abc123")
variation_id = select_variation(experience["variations"], bucket)SDK equivalent:
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(data=project_config)).initialize()
context = core.create_context("visitor-abc123", visitor_attributes={"tier": "premium"})
result = context.run_experience("checkout-flow")
if result is not None:
variation_id = result.variation_id
variation_key = result.variation_keyThe SDK ships a pure-Python MurmurHash3-32 implementation that encodes the input string to UTF-8 bytes before hashing (seed 9999) — byte-identical to the JavaScript SDK (which uses new TextEncoder().encode()) and the PHP SDK (which uses unpack('C*')). No third-party hashing library is required or used.
To inspect the internal bucket value (e.g. for debugging or audit), use the diagnose_experience surface rather than the public result:
diag = context.diagnose_experience("checkout-flow")
print(diag.details) # includes bucket_value when bucketing was attemptedConversion tracking: before and after
Raw REST — one HTTP POST per conversion:
import httpx
import os
payload = {
"source": "python-sdk",
"enrichData": True,
"accountId": project_config["account_id"],
"projectId": project_config["project"]["id"],
"visitors": [
{
"visitorId": "visitor-abc123",
"events": [
{
"eventType": "conversion",
"data": {
"goalId": "goal-1",
"goalData": [{"key": "revenue", "value": 49.99}],
"bucketingData": {"exp-checkout": "var-treatment"},
},
}
],
}
],
}
sdk_key = os.environ["CONVERT_SDK_KEY"]
project_id = project_config["project"]["id"]
httpx.post(
f"https://{project_id}.metrics.convertexperiments.com/v1/track/{sdk_key}",
json=payload,
headers={
"Content-Type": "application/json",
# Required: the metrics endpoint bot filter uses this UA to identify
# Convert SDK traffic and avoid silently discarding events.
"User-Agent": "ConvertAgent/1.0",
},
)SDK equivalent — enqueue and flush:
from convert_sdk import ConversionStatus
result = context.track_conversion("purchase_completed", revenue=49.99)
# result.status is ConversionStatus.QUEUED — nothing sent yet
# result.tracked is True
core.flush() # single delivery point: sends all queued events as one batched POSTtrack_conversion never performs network I/O — it appends to an in-process queue. The actual HTTP POST happens when the queue releases: via core.flush(), when batch_size events accumulate, when the optional auto_flush_interval_ms timer fires, or at process exit via atexit.
The SDK constructs the full wire payload automatically from the config snapshot and the visitor's active bucketing assignments. You no longer need to manually assemble bucketingData, accountId, projectId, or the conversion event envelope.
Operational improvements over raw REST
Batching
The SDK accumulates events and delivers them in configurable batches (default: 10 events per POST). Raw REST integrations typically send one HTTP request per conversion. Adjust via SDKConfig(batch_size=25).
Deduplication
The SDK prevents double-counting the same goal for the same visitor within a process lifetime. With raw REST you must track this state yourself. An already-seen (visitor_id, goal_id) pair returns ConversionResult(tracked=False, reason="deduplicated") rather than silently double-posting. Use force_multiple=True to allow intentional repeat conversions.
An unknown goal returns a typed ConversionResult(status=GOAL_NOT_FOUND) — it never raises an exception.
Local audience rule evaluation
Raw REST integrations typically either skip audience evaluation (sending all visitors to Convert for platform-side assignment) or re-evaluate rules in custom code that may drift from the platform. The SDK evaluates audience rules locally against the config snapshot on every run_experience() or run_feature() call — the same rule engine used by the JavaScript SDK, verified by shared parity fixtures.
Lifecycle observability
Subscribe to LifecycleEvent.API_QUEUE_RELEASED to observe delivery:
from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload
def on_released(payload: QueueReleasedPayload, error=None) -> None:
if error is not None:
print(f"Delivery failed: {error}")
else:
print(f"Delivered {payload.batch_size} events")
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_released)The payload carries batch_size, visitor_count, event_count, and on failure status_code and retry_attempts — all without exposing credentials or raw visitor attributes.
Privacy-safe diagnostics
The SDK redacts visitor IDs (hashed) and SDK keys from all diagnostic logs by construction. Raw REST integrations typically log raw credentials and identifiers. See Diagnostics.
Using your existing config fetch
If you already have a working config-fetching layer that you want to keep, pass the parsed payload as data= and let the SDK handle all evaluation and tracking. There is no requirement to use the SDK's built-in config transport:
from convert_sdk import Core, SDKConfig
# your_config_cache is whatever you already use to get the project config
config = your_config_cache.get()
core = Core(SDKConfig(data=config)).initialize()
context = core.create_context("visitor-abc123")
result = context.run_experience("my-experiment")For long-lived processes that need periodic config refresh, the SDK's built-in opt-in auto-refresh replaces your manual polling loop:
from convert_sdk import Core, RefreshConfig, SDKConfig
core = Core(
SDKConfig(
sdk_key="your-sdk-key",
refresh=RefreshConfig(interval_seconds=300.0), # 5-minute refresh
)
).initialize()See Initialization § automatic config refresh.
Future async / framework support
The Python SDK is sync-first today. If your service is already async (FastAPI, asyncio workers), you can call the sync SDK from a coroutine via asyncio.to_thread() — evaluation is non-blocking compute, and config refresh runs on its own daemon thread. A native async surface and framework-specific helpers are planned for Phase 3 and not yet implemented. See Async and Framework Integrations for the design intent.
What to read next
- Initialization —
sdk_keyvsdatamodes, the fullSDKConfigreference - Code Examples —
run_experience()/run_feature()/ tracking side by side - Diagnostics —
diagnose_*helpers in place of manual log spelunking - Async and Framework Integrations — bridging async code today; Phase 3 plan