Code Examples
Complete, runnable examples for every public method on Core and Context
Complete, runnable examples for every public method on Core and Context.
Each example focuses on one method or concern — combine them freely. For config
field details, see Configuration; for type definitions, see
Type Hints.
Sections:
- Initialization
- Creating a visitor context
- Experience evaluation
- Feature evaluation
- Segments
- Conversion tracking
- Queue control and flush
- Lifecycle event subscription
- Diagnostics
- Graceful shutdown
Initialization
Direct config (no network)
from convert_sdk import Core, SDKConfig
core = Core(
SDKConfig(
data={
"account_id": "1001",
"project": {"id": "2002", "name": "Demo"},
"experiences": [],
"features": [],
"goals": [],
}
)
).initialize()
assert core.is_ready
core.close()SDK key (remote fetch)
Read the key from the environment — never hard-code credentials:
import os
from convert_sdk import Core, SDKConfig, TransportConfig
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
environment="production",
transport=TransportConfig(timeout=5.0),
)
).initialize()
assert core.is_ready
core.close()Context-manager form
Use the context-manager form so close() is always called, even on error:
from convert_sdk import Core, SDKConfig
with Core(SDKConfig(data={"account_id": "1001", "project": {"id": "2"}, "experiences": [], "features": [], "goals": []})).initialize() as core:
context = core.create_context("visitor-001")
result = context.run_experience("checkout-experiment")
# core.close() called automatically on exitLong-running service with auto-refresh
import os
from convert_sdk import Core, SDKConfig, RefreshConfig, LifecycleEvent
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
refresh=RefreshConfig(
interval_seconds=300.0, # refresh every 5 minutes
jitter_seconds=30.0, # avoid thundering herd across instances
backoff_factor=2.0, # exponential backoff on transient failures
backoff_max_seconds=600.0, # never back off longer than 10 minutes
),
)
).initialize()
# Subscribe to config-swap events (e.g. to bust a downstream cache):
core.on(LifecycleEvent.CONFIG_UPDATED, lambda payload, error=None: print("config swapped"))
# Trigger an immediate out-of-band refresh (fire-and-forget):
core.refresh_now()
core.close() # stops the background refresh threadAccessing the current snapshot
core.current_config returns the current immutable ConfigSnapshot, or None
if not yet initialized. It is safe to call from any thread:
snapshot = core.current_config
if snapshot is not None:
print(snapshot.project_id, snapshot.account_id)Creating a visitor context
core.create_context(visitor_id) returns a Context bound to the current
immutable snapshot. Attributes are copied defensively:
context = core.create_context(
"visitor-abc",
visitor_attributes={"country": "US", "plan": "pro"},
)
# Read stored attributes back:
print(context.visitor_id) # "visitor-abc"
print(dict(context.visitor_attributes)) # {"country": "US", "plan": "pro"}
print(dict(context.attributes)) # same as visitor_attributes (alias)
print(dict(context.default_segments)) # {} — segments are separateReuse the same Context within a request. Create a new one per request (or per
visitor interaction) to pick up the latest config.
Updating stored attributes
set_attributes merges new keys into the stored state and persists through the
configured DataStore. Later create_context calls for the same visitor
rehydrate the merged state:
context.set_attributes({"plan": "enterprise", "beta": True})
# context.visitor_attributes now includes all prior keys plus the update.Experience evaluation
Single experience
run_experience returns ExperienceResult | None. A None result is a normal,
non-exceptional outcome — audience miss, outside traffic allocation, or
experience not found:
from convert_sdk import ExperienceResult
result = context.run_experience("checkout-experiment")
if result is None:
pass # visitor did not qualify — not an error
else:
assert isinstance(result, ExperienceResult)
print(result.experience_key) # "checkout-experiment"
print(result.experience_id) # e.g. "100342"
print(result.variation_key) # e.g. "treatment" (may be None if unset in config)
print(result.variation_id) # e.g. "200512"
# result.variation is a read-only mapping of the raw variation configPer-call attribute overlay
attributes= is an ephemeral overlay for a single call. It does not mutate the
context's stored visitor_attributes:
# Stored attribute: country="US"
result = context.run_experience(
"geo-experiment",
attributes={"country": "DE"}, # this call only; context unchanged
)
assert context.visitor_attributes["country"] == "US"All applicable experiences
run_experiences() evaluates every active experience in the config and returns
a list of results for those the visitor qualifies for. The list is empty (not
None) when no experience resolves:
results = context.run_experiences()
for r in results:
print(r.experience_key, "->", r.variation_key)Both run_experience and run_experiences accept location_attributes= as a
keyword-only argument for location-rule qualification:
result = context.run_experience(
"landing-page-test",
location_attributes={"url": "/pricing"},
)Evaluate without reporting the bucketing event
Bucketing a visitor enqueues a bucketing/activation event so the visitor is
counted in the experiment's exposure denominator. Pass enable_tracking=False
(keyword-only, default True) to evaluate without reporting it — bucketing,
sticky persistence, audience-rule evaluation, and the internal BUCKETING
lifecycle event still fire; only the outbound network enqueue is skipped (useful
for consent-denied flows or previewing a variation before applying it):
variation = context.run_experience("homepage-test", enable_tracking=False)
context.run_experiences(enable_tracking=False)Python's tracking control is per-call only — there is no global tracking switch. Leave
enable_trackingat itsTruedefault for production traffic, or your experiment's exposure count goes to zero and you lose the conversion-rate denominator.
Feature evaluation
run_feature resolves a feature flag and its typed variables from the visitor's
selected variation. It returns FeatureResult | None — None when the feature
is not declared, the visitor is not bucketed into any variation carrying the
feature, or the feature is otherwise unavailable:
from convert_sdk import FeatureStatus
feature = context.run_feature("checkout-banner")
if feature is None:
pass # feature unavailable or visitor not bucketed into it
else:
assert feature.status is FeatureStatus.ENABLED
print(feature.feature_key) # "checkout-banner"
print(feature.feature_id) # the feature's config id
print(feature.experience_key) # which experience supplied the change
print(feature.variation_key) # the selected variation's key
# Variables are type-cast from the feature's declared variable types:
headline = feature.variables.get("headline") # str
enabled = feature.variables.get("enabled") # bool
max_items = feature.variables.get("max_items") # intAll applicable features
run_features() resolves every declared feature the visitor buckets into,
returning a list (empty if none):
for f in context.run_features():
print(f.feature_key, f.status.value, dict(f.variables))Both run_feature and run_features accept attributes= and
location_attributes= overlays, identical to the experience methods.
Segments
Default segments
Default segments feed reporting and conversion attribution. They are kept
strictly separate from visitor_attributes — do not mix them. Use
set_segments to associate default segments with the visitor:
context.set_segments({"customerType": "vip", "loyalty_tier": "gold"})
print(dict(context.default_segments))
# {"customerType": "vip", "loyalty_tier": "gold"}Like set_attributes, set_segments merges into the stored state and persists
through the DataStore. The next create_context for the same visitor
rehydrates both attributes and segments.
Custom segment evaluation
run_custom_segments evaluates named segment rules against the visitor and
records newly matched segment IDs in the visitor's default-segment state.
Returns a typed CustomSegmentsResult — never None, never raises on a miss:
from convert_sdk import CustomSegmentsResult
# rule_data is an ephemeral per-call overlay — never written back to
# visitor_attributes:
result = context.run_custom_segments(
["us-visitors", "premium-plan"],
rule_data={"country": "US", "plan": "pro"},
)
assert isinstance(result, CustomSegmentsResult)
# result.matched_segment_ids: tuple[str, ...] — IDs newly matched THIS call
# result.matched: bool convenience property
print(result.matched_segment_ids) # e.g. ("seg_us_abc",)
print(result.matched) # True
# A no-match is a typed empty result, not an exception:
miss = context.run_custom_segments(["us-visitors"], rule_data={"country": "DE"})
assert miss.matched is False
assert miss.matched_segment_ids == ()Already-matched segment IDs are not re-added on subsequent calls.
Conversion tracking
track_conversion is synchronous and lightweight — it resolves the goal,
deduplicates, and appends to the in-process queue. No network call happens.
It always returns a typed ConversionResult:
from convert_sdk import ConversionStatus
result = context.track_conversion("purchase_completed")
# Inspect the outcome without exceptions:
if result.status is ConversionStatus.QUEUED:
print("queued:", result.tracked) # True
elif result.status is ConversionStatus.DEDUPLICATED:
print("duplicate:", result.reason) # "deduplicated"
elif result.status is ConversionStatus.GOAL_NOT_FOUND:
print("missing:", result.reason) # "goal_not_found"ConversionStatus | tracked | reason | Meaning |
|---|---|---|---|
QUEUED | True | None | Goal resolved; event enqueued. |
DEDUPLICATED | False | "deduplicated" | Same (visitor, goal) already tracked; no second event. |
GOAL_NOT_FOUND | False | "goal_not_found" | Goal key absent from loaded config; typed non-exception outcome. |
Conversion with revenue
result = context.track_conversion(
"purchase_completed",
revenue=49.99,
)
assert result.tracked is TrueRevenue with custom conversion data
conversion_data values must be JSON primitives (int, float, str).
Objects, lists, and booleans raise ConversionDataError (programmer misuse):
result = context.track_conversion(
"purchase_completed",
revenue=149.00,
conversion_data={
"transaction_id": "txn-abc-123",
"items_count": 3,
},
)Allowing repeated transactions (force_multiple)
force_multiple)By default, deduplication is keyed by (visitor_id, goal_id) — a differing
revenue or conversion_data does not defeat it. Use force_multiple=True
to re-track an already-tracked goal (e.g. recording multiple revenue
transactions in one session):
# First purchase:
context.track_conversion("purchase_completed", revenue=49.99)
# Second purchase in the same session:
again = context.track_conversion(
"purchase_completed",
revenue=29.99,
conversion_data={"transaction_id": "txn-2"},
force_multiple=True, # overrides default-mode dedup
)
assert again.tracked is TrueAttribution context
A tracked conversion carries the visitor's attribution context at conversion
time: active default segments (set_segments) and active variation/bucketing
assignments. Call set_segments and run_experience/run_feature before
track_conversion so the conversion is attributed correctly.
Queue control and flush
track_conversion never sends to the network — it appends to an in-process
batch queue. Use core.flush() to deliver queued events explicitly:
context.track_conversion("purchase_completed", revenue=49.99)
context.track_conversion("add_to_cart")
core.flush() # delivers all queued events and clears the queue
core.flush() # safe no-op on an empty queue — no transport call, no errorBatch-size automatic release
Set batch_size so the queue self-releases when it fills. This bounds
in-process memory and latency without per-call flush() calls:
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(data=project_config, batch_size=25)).initialize()
# The queue releases automatically once 25 events accumulate.Periodic auto-flush (opt-in)
auto_flush_interval_ms starts a daemonic timer for long-lived server
processes. Leave as None for Lambda, cron jobs, and other short-lived
runtimes:
from convert_sdk import Core, SDKConfig
# Flush every 5 seconds in a long-lived background worker:
core = Core(SDKConfig(data=project_config, auto_flush_interval_ms=5000)).initialize()Recommended flush points by runtime
| Runtime | Recommended flush point |
|---|---|
| Django / Flask (WSGI) | Response middleware process_response() hook |
| FastAPI / Starlette (ASGI) | Background task or finally in the route handler |
| AWS Lambda | End of handler, before return |
| CLI / script | finally block after main logic |
| Long-running server | Context manager (with Core(...).initialize() as core:) |
Lifecycle event subscription
Register handlers with core.on(event, handler). Handlers receive
(payload, error=None) — error is non-None only when the release failed.
Handlers that raise are isolated and logged; they never affect evaluation or
delivery. Safe to call before initialize():
from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload, ConversionEventPayload
def on_queue_released(payload: QueueReleasedPayload, error=None) -> None:
if error is not None:
print(f"delivery failed: status={payload.status_code} retries={payload.retry_attempts}")
else:
print(
f"delivered batch: reason={payload.reason} "
f"events={payload.event_count} visitors={payload.visitor_count}"
)
def on_conversion(payload: ConversionEventPayload, error=None) -> None:
print(f"conversion queued: visitor={payload.visitor_id} goal={payload.goal_key}")
def on_config_updated(payload, error=None) -> None:
print(f"config swapped: project={payload['project_id']} entities={payload['entity_counts']}")
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_queue_released)
core.on(LifecycleEvent.CONVERSION, on_conversion)
core.on(LifecycleEvent.CONFIG_UPDATED, on_config_updated)Available lifecycle events
| Event | When fired | Payload type |
|---|---|---|
LifecycleEvent.READY | initialize() succeeds | internal |
LifecycleEvent.CONFIG_UPDATED | Background refresh swaps in a new snapshot, or on first successful init | dict with account_id, project_id, entity_counts |
LifecycleEvent.CONVERSION | A conversion event is enqueued (not on dedup or miss) | ConversionEventPayload |
LifecycleEvent.API_QUEUE_RELEASED | The tracking queue is released (success or delivery failure) | QueueReleasedPayload |
LifecycleEvent.DATA_STORE_QUEUE_RELEASED | (JS parity; out of MVP scope) | — |
QueueReleasedPayload fields:
| Field | Type | Meaning |
|---|---|---|
reason | ReleaseReason | Why the release fired: EXPLICIT, BATCH_SIZE, TIMER, or ATEXIT |
batch_size | int | Number of events delivered in this batch |
visitor_count | int | Distinct visitor IDs in this batch |
event_count | int | Total event records delivered |
status_code | int | None | HTTP status on failure; None on success |
retry_attempts | int | None | Exhausted retry count; None or 0 if the adapter does not retry |
Import QueueReleasedPayload and ConversionEventPayload from
convert_sdk.events.
Diagnostics
When you need to know why an experience or feature did not resolve — without
raising — use the diagnose_* methods. They return typed diagnostic objects
and never raise on a normal miss:
from convert_sdk import DiagnosticReason
# Experience diagnostic:
diag = context.diagnose_experience("checkout-experiment")
print(diag.resolved) # bool — True only when RESOLVED
print(diag.reason) # DiagnosticReason enum, e.g. AUDIENCE_MISMATCH
print(diag.reason.value) # str wire value, e.g. "audience_mismatch"
print(diag.message) # human-readable description
print(dict(diag.details)) # allowlist-safe detail mapAll diagnose methods
# Feature — RESOLVED, FEATURE_NOT_FOUND, or FEATURE_NOT_IN_SELECTED_VARIATIONS:
feat_diag = context.diagnose_feature("checkout-banner")
# Goal — RESOLVED or GOAL_NOT_FOUND:
goal_diag = context.diagnose_goal("purchase_completed")
# Config-entity lookup — RESOLVED, ENTITY_NOT_FOUND, or PROJECT_MAPPING_REQUIRED:
entity_diag = context.diagnose_entity("experience", "checkout-experiment")DiagnosticReason values
| Value | String | Produced by |
|---|---|---|
RESOLVED | "resolved" | All diagnose_* — visitor qualified and the entity resolved |
AUDIENCE_MISMATCH | "audience_mismatch" | diagnose_experience — experience exists but visitor did not qualify |
EXPERIENCE_NOT_FOUND | "experience_not_found" | diagnose_experience — key not in config |
FEATURE_NOT_FOUND | "feature_not_found" | diagnose_feature — key not in config |
FEATURE_NOT_IN_SELECTED_VARIATIONS | "feature_not_in_selected_variations" | diagnose_feature — feature declared but visitor's variation does not carry it |
GOAL_NOT_FOUND | "goal_not_found" | diagnose_goal — key not in config |
ENTITY_NOT_FOUND | "entity_not_found" | diagnose_entity — key/id not found or wrong entity_type |
PROJECT_MAPPING_REQUIRED | "project_mapping_required" | diagnose_entity — loaded config has no project mapping |
Miss-path reasons are also emitted as structured log records through the
diagnostic logger ("convert_sdk" namespace). See Diagnostics
for the full log-record field reference.
Graceful shutdown
core.close() stops the background refresh thread (if running), cancels the
periodic-flush timer (if set), and closes the httpx transport if Core
created it. It does not perform a final flush:
# Explicit call:
core.flush() # deliver any remaining queued events first
core.close() # then release resources
# Or use the context-manager form (close is called automatically on exit):
with Core(SDKConfig(data=project_config)).initialize() as core:
...close() is idempotent. Calling it multiple times is safe.
What to read next
- Configuration — every option of every config dataclass
- Type Hints — dataclasses, Protocols, and enums for every result type
- Diagnostics — structured log fields, error codes, support workflow
- Extending — custom
Transport,DataStore, event-bus observers - Running Experiences — detailed experience-evaluation guide
- Running Features — feature-flag resolution and typed variables
- Tracking Conversions — deduplication, attribution, payload reference