Code Examples

Complete, runnable examples for every public method on Core and Context

Complete, runnable examples for every public method on Core and Context. Each example focuses on one method or concern — combine them freely. For config field details, see Configuration; for type definitions, see Type Hints.

Sections:

  1. Initialization
  2. Creating a visitor context
  3. Experience evaluation
  4. Feature evaluation
  5. Segments
  6. Conversion tracking
  7. Queue control and flush
  8. Lifecycle event subscription
  9. Diagnostics
  10. Graceful shutdown

Initialization

Direct config (no network)

from convert_sdk import Core, SDKConfig

core = Core(
    SDKConfig(
        data={
            "account_id": "1001",
            "project": {"id": "2002", "name": "Demo"},
            "experiences": [],
            "features": [],
            "goals": [],
        }
    )
).initialize()

assert core.is_ready
core.close()

SDK key (remote fetch)

Read the key from the environment — never hard-code credentials:

import os
from convert_sdk import Core, SDKConfig, TransportConfig

core = Core(
    SDKConfig(
        sdk_key=os.environ["CONVERT_SDK_KEY"],
        environment="production",
        transport=TransportConfig(timeout=5.0),
    )
).initialize()

assert core.is_ready
core.close()

Context-manager form

Use the context-manager form so close() is always called, even on error:

from convert_sdk import Core, SDKConfig

with Core(SDKConfig(data={"account_id": "1001", "project": {"id": "2"}, "experiences": [], "features": [], "goals": []})).initialize() as core:
    context = core.create_context("visitor-001")
    result = context.run_experience("checkout-experiment")
# core.close() called automatically on exit

Long-running service with auto-refresh

import os
from convert_sdk import Core, SDKConfig, RefreshConfig, LifecycleEvent

core = Core(
    SDKConfig(
        sdk_key=os.environ["CONVERT_SDK_KEY"],
        refresh=RefreshConfig(
            interval_seconds=300.0,     # refresh every 5 minutes
            jitter_seconds=30.0,        # avoid thundering herd across instances
            backoff_factor=2.0,         # exponential backoff on transient failures
            backoff_max_seconds=600.0,  # never back off longer than 10 minutes
        ),
    )
).initialize()

# Subscribe to config-swap events (e.g. to bust a downstream cache):
core.on(LifecycleEvent.CONFIG_UPDATED, lambda payload, error=None: print("config swapped"))

# Trigger an immediate out-of-band refresh (fire-and-forget):
core.refresh_now()

core.close()   # stops the background refresh thread

Accessing the current snapshot

core.current_config returns the current immutable ConfigSnapshot, or None if not yet initialized. It is safe to call from any thread:

snapshot = core.current_config
if snapshot is not None:
    print(snapshot.project_id, snapshot.account_id)

Creating a visitor context

core.create_context(visitor_id) returns a Context bound to the current immutable snapshot. Attributes are copied defensively:

context = core.create_context(
    "visitor-abc",
    visitor_attributes={"country": "US", "plan": "pro"},
)

# Read stored attributes back:
print(context.visitor_id)                     # "visitor-abc"
print(dict(context.visitor_attributes))       # {"country": "US", "plan": "pro"}
print(dict(context.attributes))               # same as visitor_attributes (alias)
print(dict(context.default_segments))         # {} — segments are separate

Reuse the same Context within a request. Create a new one per request (or per visitor interaction) to pick up the latest config.

Updating stored attributes

set_attributes merges new keys into the stored state and persists through the configured DataStore. Later create_context calls for the same visitor rehydrate the merged state:

context.set_attributes({"plan": "enterprise", "beta": True})
# context.visitor_attributes now includes all prior keys plus the update.

Experience evaluation

Single experience

run_experience returns ExperienceResult | None. A None result is a normal, non-exceptional outcome — audience miss, outside traffic allocation, or experience not found:

from convert_sdk import ExperienceResult

result = context.run_experience("checkout-experiment")

if result is None:
    pass   # visitor did not qualify — not an error
else:
    assert isinstance(result, ExperienceResult)
    print(result.experience_key)   # "checkout-experiment"
    print(result.experience_id)    # e.g. "100342"
    print(result.variation_key)    # e.g. "treatment" (may be None if unset in config)
    print(result.variation_id)     # e.g. "200512"
    # result.variation is a read-only mapping of the raw variation config

Per-call attribute overlay

attributes= is an ephemeral overlay for a single call. It does not mutate the context's stored visitor_attributes:

# Stored attribute: country="US"
result = context.run_experience(
    "geo-experiment",
    attributes={"country": "DE"},   # this call only; context unchanged
)
assert context.visitor_attributes["country"] == "US"

All applicable experiences

run_experiences() evaluates every active experience in the config and returns a list of results for those the visitor qualifies for. The list is empty (not None) when no experience resolves:

results = context.run_experiences()
for r in results:
    print(r.experience_key, "->", r.variation_key)

Both run_experience and run_experiences accept location_attributes= as a keyword-only argument for location-rule qualification:

result = context.run_experience(
    "landing-page-test",
    location_attributes={"url": "/pricing"},
)

Evaluate without reporting the bucketing event

Bucketing a visitor enqueues a bucketing/activation event so the visitor is counted in the experiment's exposure denominator. Pass enable_tracking=False (keyword-only, default True) to evaluate without reporting it — bucketing, sticky persistence, audience-rule evaluation, and the internal BUCKETING lifecycle event still fire; only the outbound network enqueue is skipped (useful for consent-denied flows or previewing a variation before applying it):

variation = context.run_experience("homepage-test", enable_tracking=False)
context.run_experiences(enable_tracking=False)

Python's tracking control is per-call only — there is no global tracking switch. Leave enable_tracking at its True default for production traffic, or your experiment's exposure count goes to zero and you lose the conversion-rate denominator.


Feature evaluation

run_feature resolves a feature flag and its typed variables from the visitor's selected variation. It returns FeatureResult | NoneNone when the feature is not declared, the visitor is not bucketed into any variation carrying the feature, or the feature is otherwise unavailable:

from convert_sdk import FeatureStatus

feature = context.run_feature("checkout-banner")

if feature is None:
    pass   # feature unavailable or visitor not bucketed into it
else:
    assert feature.status is FeatureStatus.ENABLED
    print(feature.feature_key)     # "checkout-banner"
    print(feature.feature_id)      # the feature's config id
    print(feature.experience_key)  # which experience supplied the change
    print(feature.variation_key)   # the selected variation's key

    # Variables are type-cast from the feature's declared variable types:
    headline = feature.variables.get("headline")   # str
    enabled  = feature.variables.get("enabled")    # bool
    max_items = feature.variables.get("max_items") # int

All applicable features

run_features() resolves every declared feature the visitor buckets into, returning a list (empty if none):

for f in context.run_features():
    print(f.feature_key, f.status.value, dict(f.variables))

Both run_feature and run_features accept attributes= and location_attributes= overlays, identical to the experience methods.


Segments

Default segments

Default segments feed reporting and conversion attribution. They are kept strictly separate from visitor_attributes — do not mix them. Use set_segments to associate default segments with the visitor:

context.set_segments({"customerType": "vip", "loyalty_tier": "gold"})
print(dict(context.default_segments))
# {"customerType": "vip", "loyalty_tier": "gold"}

Like set_attributes, set_segments merges into the stored state and persists through the DataStore. The next create_context for the same visitor rehydrates both attributes and segments.

Custom segment evaluation

run_custom_segments evaluates named segment rules against the visitor and records newly matched segment IDs in the visitor's default-segment state. Returns a typed CustomSegmentsResult — never None, never raises on a miss:

from convert_sdk import CustomSegmentsResult

# rule_data is an ephemeral per-call overlay — never written back to
# visitor_attributes:
result = context.run_custom_segments(
    ["us-visitors", "premium-plan"],
    rule_data={"country": "US", "plan": "pro"},
)

assert isinstance(result, CustomSegmentsResult)
# result.matched_segment_ids: tuple[str, ...] — IDs newly matched THIS call
# result.matched: bool convenience property
print(result.matched_segment_ids)   # e.g. ("seg_us_abc",)
print(result.matched)               # True

# A no-match is a typed empty result, not an exception:
miss = context.run_custom_segments(["us-visitors"], rule_data={"country": "DE"})
assert miss.matched is False
assert miss.matched_segment_ids == ()

Already-matched segment IDs are not re-added on subsequent calls.


Conversion tracking

track_conversion is synchronous and lightweight — it resolves the goal, deduplicates, and appends to the in-process queue. No network call happens. It always returns a typed ConversionResult:

from convert_sdk import ConversionStatus

result = context.track_conversion("purchase_completed")

# Inspect the outcome without exceptions:
if result.status is ConversionStatus.QUEUED:
    print("queued:", result.tracked)    # True
elif result.status is ConversionStatus.DEDUPLICATED:
    print("duplicate:", result.reason)  # "deduplicated"
elif result.status is ConversionStatus.GOAL_NOT_FOUND:
    print("missing:", result.reason)    # "goal_not_found"
ConversionStatustrackedreasonMeaning
QUEUEDTrueNoneGoal resolved; event enqueued.
DEDUPLICATEDFalse"deduplicated"Same (visitor, goal) already tracked; no second event.
GOAL_NOT_FOUNDFalse"goal_not_found"Goal key absent from loaded config; typed non-exception outcome.

Conversion with revenue

result = context.track_conversion(
    "purchase_completed",
    revenue=49.99,
)
assert result.tracked is True

Revenue with custom conversion data

conversion_data values must be JSON primitives (int, float, str). Objects, lists, and booleans raise ConversionDataError (programmer misuse):

result = context.track_conversion(
    "purchase_completed",
    revenue=149.00,
    conversion_data={
        "transaction_id": "txn-abc-123",
        "items_count": 3,
    },
)

Allowing repeated transactions (force_multiple)

By default, deduplication is keyed by (visitor_id, goal_id) — a differing revenue or conversion_data does not defeat it. Use force_multiple=True to re-track an already-tracked goal (e.g. recording multiple revenue transactions in one session):

# First purchase:
context.track_conversion("purchase_completed", revenue=49.99)

# Second purchase in the same session:
again = context.track_conversion(
    "purchase_completed",
    revenue=29.99,
    conversion_data={"transaction_id": "txn-2"},
    force_multiple=True,    # overrides default-mode dedup
)
assert again.tracked is True

Attribution context

A tracked conversion carries the visitor's attribution context at conversion time: active default segments (set_segments) and active variation/bucketing assignments. Call set_segments and run_experience/run_feature before track_conversion so the conversion is attributed correctly.


Queue control and flush

track_conversion never sends to the network — it appends to an in-process batch queue. Use core.flush() to deliver queued events explicitly:

context.track_conversion("purchase_completed", revenue=49.99)
context.track_conversion("add_to_cart")

core.flush()   # delivers all queued events and clears the queue
core.flush()   # safe no-op on an empty queue — no transport call, no error

Batch-size automatic release

Set batch_size so the queue self-releases when it fills. This bounds in-process memory and latency without per-call flush() calls:

from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(data=project_config, batch_size=25)).initialize()
# The queue releases automatically once 25 events accumulate.

Periodic auto-flush (opt-in)

auto_flush_interval_ms starts a daemonic timer for long-lived server processes. Leave as None for Lambda, cron jobs, and other short-lived runtimes:

from convert_sdk import Core, SDKConfig

# Flush every 5 seconds in a long-lived background worker:
core = Core(SDKConfig(data=project_config, auto_flush_interval_ms=5000)).initialize()

Recommended flush points by runtime

RuntimeRecommended flush point
Django / Flask (WSGI)Response middleware process_response() hook
FastAPI / Starlette (ASGI)Background task or finally in the route handler
AWS LambdaEnd of handler, before return
CLI / scriptfinally block after main logic
Long-running serverContext manager (with Core(...).initialize() as core:)

Lifecycle event subscription

Register handlers with core.on(event, handler). Handlers receive (payload, error=None)error is non-None only when the release failed. Handlers that raise are isolated and logged; they never affect evaluation or delivery. Safe to call before initialize():

from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload, ConversionEventPayload

def on_queue_released(payload: QueueReleasedPayload, error=None) -> None:
    if error is not None:
        print(f"delivery failed: status={payload.status_code} retries={payload.retry_attempts}")
    else:
        print(
            f"delivered batch: reason={payload.reason} "
            f"events={payload.event_count} visitors={payload.visitor_count}"
        )

def on_conversion(payload: ConversionEventPayload, error=None) -> None:
    print(f"conversion queued: visitor={payload.visitor_id} goal={payload.goal_key}")

def on_config_updated(payload, error=None) -> None:
    print(f"config swapped: project={payload['project_id']} entities={payload['entity_counts']}")

core.on(LifecycleEvent.API_QUEUE_RELEASED, on_queue_released)
core.on(LifecycleEvent.CONVERSION, on_conversion)
core.on(LifecycleEvent.CONFIG_UPDATED, on_config_updated)

Available lifecycle events

EventWhen firedPayload type
LifecycleEvent.READYinitialize() succeedsinternal
LifecycleEvent.CONFIG_UPDATEDBackground refresh swaps in a new snapshot, or on first successful initdict with account_id, project_id, entity_counts
LifecycleEvent.CONVERSIONA conversion event is enqueued (not on dedup or miss)ConversionEventPayload
LifecycleEvent.API_QUEUE_RELEASEDThe tracking queue is released (success or delivery failure)QueueReleasedPayload
LifecycleEvent.DATA_STORE_QUEUE_RELEASED(JS parity; out of MVP scope)

QueueReleasedPayload fields:

FieldTypeMeaning
reasonReleaseReasonWhy the release fired: EXPLICIT, BATCH_SIZE, TIMER, or ATEXIT
batch_sizeintNumber of events delivered in this batch
visitor_countintDistinct visitor IDs in this batch
event_countintTotal event records delivered
status_codeint | NoneHTTP status on failure; None on success
retry_attemptsint | NoneExhausted retry count; None or 0 if the adapter does not retry

Import QueueReleasedPayload and ConversionEventPayload from convert_sdk.events.


Diagnostics

When you need to know why an experience or feature did not resolve — without raising — use the diagnose_* methods. They return typed diagnostic objects and never raise on a normal miss:

from convert_sdk import DiagnosticReason

# Experience diagnostic:
diag = context.diagnose_experience("checkout-experiment")
print(diag.resolved)           # bool — True only when RESOLVED
print(diag.reason)             # DiagnosticReason enum, e.g. AUDIENCE_MISMATCH
print(diag.reason.value)       # str wire value, e.g. "audience_mismatch"
print(diag.message)            # human-readable description
print(dict(diag.details))      # allowlist-safe detail map

All diagnose methods

# Feature — RESOLVED, FEATURE_NOT_FOUND, or FEATURE_NOT_IN_SELECTED_VARIATIONS:
feat_diag = context.diagnose_feature("checkout-banner")

# Goal — RESOLVED or GOAL_NOT_FOUND:
goal_diag = context.diagnose_goal("purchase_completed")

# Config-entity lookup — RESOLVED, ENTITY_NOT_FOUND, or PROJECT_MAPPING_REQUIRED:
entity_diag = context.diagnose_entity("experience", "checkout-experiment")

DiagnosticReason values

ValueStringProduced by
RESOLVED"resolved"All diagnose_* — visitor qualified and the entity resolved
AUDIENCE_MISMATCH"audience_mismatch"diagnose_experience — experience exists but visitor did not qualify
EXPERIENCE_NOT_FOUND"experience_not_found"diagnose_experience — key not in config
FEATURE_NOT_FOUND"feature_not_found"diagnose_feature — key not in config
FEATURE_NOT_IN_SELECTED_VARIATIONS"feature_not_in_selected_variations"diagnose_feature — feature declared but visitor's variation does not carry it
GOAL_NOT_FOUND"goal_not_found"diagnose_goal — key not in config
ENTITY_NOT_FOUND"entity_not_found"diagnose_entity — key/id not found or wrong entity_type
PROJECT_MAPPING_REQUIRED"project_mapping_required"diagnose_entity — loaded config has no project mapping

Miss-path reasons are also emitted as structured log records through the diagnostic logger ("convert_sdk" namespace). See Diagnostics for the full log-record field reference.


Graceful shutdown

core.close() stops the background refresh thread (if running), cancels the periodic-flush timer (if set), and closes the httpx transport if Core created it. It does not perform a final flush:

# Explicit call:
core.flush()   # deliver any remaining queued events first
core.close()   # then release resources

# Or use the context-manager form (close is called automatically on exit):
with Core(SDKConfig(data=project_config)).initialize() as core:
    ...

close() is idempotent. Calling it multiple times is safe.


What to read next