Extending the SDK
Swap the Transport, DataStore, and EventBus seams via the three runtime-checkable Protocols
The SDK exposes exactly three @runtime_checkable typing.Protocols as
swappable I/O seams. Implementing the methods of a protocol is all that is
required — no subclassing, no registration. Each has a default adapter the SDK
wires automatically; you replace only what you need to replace.
| Seam | Protocol | Default adapter | Injection point |
|---|---|---|---|
| HTTP transport | convert_sdk.ports.transport.Transport | httpx-backed HttpxTransport | Core(config, transport=...) (keyword-only) |
| Visitor state storage | convert_sdk.ports.storage.DataStore | InMemoryDataStore (per-process) | SDKConfig(data_store=...) (config field) |
| Lifecycle event bus | convert_sdk.ports.event_bus.EventBus | In-process synchronous bus | Internal — observe via Core.on(...) |
There is no logger Protocol. The logging seam is Python's stdlib
logging— supply alogging.LoggerviaSDKConfig.logger. The SDK emits through it; it never adds handlers, sets levels, or callslogging.basicConfig().
Where each seam is injected
The two external seams have deliberately different injection points:
- Transport — keyword-only argument on
Core:Core(config, transport=my_transport) - DataStore — field on
SDKConfig:SDKConfig(data=..., data_store=my_store)
The asymmetry is intentional: Core owns the transport lifecycle (it calls
close() and uses it as a context manager); SDKConfig owns the storage
declaration so it can be shared across multiple Core instances in the same
process.
Custom Transport
Implement Transport to replace config fetching and tracking delivery.
Common reasons: mTLS, proxy routing, stubbing in integration tests, or
capturing tracking calls in test assertions.
Protocol requirements
from convert_sdk.ports.transport import Transport
from typing import Any, Dict
class MyTransport:
def fetch_config(self, config: "SDKConfig") -> Dict[str, Any]:
"""Fetch and return the raw config payload.
Raise convert_sdk.ConfigLoadError on any transport/HTTP/decode failure.
"""
...
def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
"""Deliver a serialized tracking-events batch over HTTPS.
sdk_key is keyword-only. Raise a ConvertSDKError subclass on failure.
"""
...
def close(self) -> None:
"""Release any held resources (e.g. HTTP connection pool)."""
...
def __enter__(self) -> "Transport":
return self
def __exit__(self, *exc: Any) -> None:
self.close()Transport is @runtime_checkable — the __enter__/__exit__ pair is
required for isinstance(obj, Transport) to return True.
| Method | Signature | Notes |
|---|---|---|
fetch_config | (config: SDKConfig) -> Dict[str, Any] | Returns decoded JSON as dict |
send_tracking | (payload: Dict[str, Any], *, sdk_key: str) -> None | sdk_key is keyword-only |
close | () -> None | Called on Core.close() and at context-manager exit |
__enter__ / __exit__ | context-manager pair | Required for structural isinstance check |
Complete example: in-memory stub transport
from typing import Any, Dict
from convert_sdk import Core, SDKConfig
from convert_sdk.ports.transport import Transport
class StubTransport:
"""Returns canned config and captures tracking payloads."""
def __init__(self, config_payload: Dict[str, Any]) -> None:
self._config = config_payload
self.tracking_calls: list = []
def fetch_config(self, config) -> Dict[str, Any]:
return self._config
def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
self.tracking_calls.append(payload)
def close(self) -> None:
pass
def __enter__(self) -> "StubTransport":
return self
def __exit__(self, *exc: Any) -> None:
self.close()
stub = StubTransport(config_payload={
"account_id": "1001",
"project": {"id": "2002", "name": "Test"},
"experiences": [],
"features": [],
"goals": [],
})
# @runtime_checkable — duck-typed conformance check:
assert isinstance(stub, Transport)
# transport= is KEYWORD-ONLY on Core:
core = Core(SDKConfig(sdk_key="test-key"), transport=stub).initialize()
context = core.create_context("visitor-001")
context.track_conversion("purchase_completed")
core.flush()
assert stub.tracking_calls # our stub handled the delivery
core.close()Example: retrying transport wrapper
import time
from typing import Any, Dict
from convert_sdk.ports.transport import Transport
from convert_sdk.errors import ConfigLoadError
class RetryingTransport:
"""Wraps another transport and retries fetch_config up to `max_retries` times."""
def __init__(self, inner: Transport, max_retries: int = 3) -> None:
self._inner = inner
self._max_retries = max_retries
def fetch_config(self, config) -> Dict[str, Any]:
last_exc: Exception | None = None
for attempt in range(self._max_retries):
try:
return self._inner.fetch_config(config)
except ConfigLoadError as exc:
last_exc = exc
if attempt < self._max_retries - 1:
time.sleep(2 ** attempt)
raise last_exc # type: ignore[misc]
def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
self._inner.send_tracking(payload, sdk_key=sdk_key)
def close(self) -> None:
self._inner.close()
def __enter__(self) -> "RetryingTransport":
return self
def __exit__(self, *exc: Any) -> None:
self.close()Custom DataStore
Implement DataStore to persist visitor state and deduplication markers across
process restarts — for example, backed by Redis, Memcache, or a database. Inject
it through SDKConfig.data_store, not as a Core argument.
Protocol requirements
from convert_sdk import DataStore # top-level export
from typing import Any, Optional
class MyStore:
def get(self, key: str) -> Any:
"""Return the value stored under key, or None if absent or expired."""
...
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Store value. ttl is expiry in seconds; None means no expiry."""
...
def has(self, key: str) -> bool:
"""True if key has a present, unexpired value."""
...
def delete(self, key: str) -> None:
"""Remove key. Deleting an absent key is a safe no-op."""
...DataStore is @runtime_checkable — isinstance(obj, DataStore) checks for
the four method names at runtime (not their signatures).
| Method | Signature | Notes |
|---|---|---|
get | (key: str) -> Any | Returns None for absent or expired keys |
set | (key: str, value: Any, ttl: Optional[int] = None) -> None | ttl in seconds; None = no expiry |
has | (key: str) -> bool | Must return False for absent and expired keys |
delete | (key: str) -> None | Idempotent — no-op on absent key |
Implementations must be safe to call from the synchronous tracking path and from worker threads (the SDK's flush timer runs on a daemon thread).
Complete example: dict-backed store
from typing import Any, Optional
from convert_sdk import Core, SDKConfig, DataStore
class DictStore:
def __init__(self) -> None:
self._data: dict = {}
self.writes: int = 0
def get(self, key: str) -> Any:
return self._data.get(key)
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
self._data[key] = value
self.writes += 1
def has(self, key: str) -> bool:
return key in self._data
def delete(self, key: str) -> None:
self._data.pop(key, None)
store = DictStore()
assert isinstance(store, DataStore)
# data_store is a CONFIG FIELD, not a Core argument:
core = Core(SDKConfig(data=my_config, data_store=store)).initialize()
context = core.create_context("visitor-001")
context.set_segments({"loyalty_tier": "gold"}) # persists through the store
assert store.writes >= 1
core.close()Example: Redis-backed store
import json
from typing import Any, Optional
from convert_sdk import Core, SDKConfig
class RedisDataStore:
def __init__(self, redis_client) -> None:
self._redis = redis_client
def get(self, key: str) -> Any:
raw = self._redis.get(key)
return json.loads(raw) if raw is not None else None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
serialized = json.dumps(value)
if ttl is not None:
self._redis.setex(key, ttl, serialized)
else:
self._redis.set(key, serialized)
def has(self, key: str) -> bool:
return bool(self._redis.exists(key))
def delete(self, key: str) -> None:
self._redis.delete(key)
core = Core(
SDKConfig(data=my_config, data_store=RedisDataStore(redis_client=my_redis))
).initialize()The built-in InMemoryDataStore is thread-safe but process-local. With the
default store, deduplication markers reset on process restart. A shared Redis
store removes that isolation without any protocol change.
Namespaced storage keys
The SDK stores different types of state under distinct key prefixes so they never collide in the shared store:
- Visitor state (attributes + segments): keys built by
convert_sdk.ports.storage.visitor_state_key(visitor_id)→"state:[\"visitor-001\"]" - Deduplication markers: keys built by the tracking layer, prefixed
"dedup:"
from convert_sdk.ports.storage import visitor_state_key
key = visitor_state_key("visitor-001")
# key == 'state:["visitor-001"]'See Persistent DataStore for the full lifecycle of reads and writes.
Logging seam (no Protocol)
There is no Logger Protocol — the seam is Python's stdlib logging. Pass
your own logging.Logger via SDKConfig.logger to route SDK output through
your application's infrastructure:
import logging
from convert_sdk import Core, SDKConfig
my_logger = logging.getLogger("myapp.convert")
core = Core(SDKConfig(data=my_config, logger=my_logger)).initialize()To silence all SDK output in production:
logging.getLogger("convert_sdk").setLevel(logging.WARNING)The SDK never adds handlers, sets levels, or calls logging.basicConfig() —
that is always the application's responsibility.
Observing lifecycle events (EventBus)
You do not implement the EventBus Protocol directly. The SDK owns the bus.
You observe it by registering handlers through Core.on:
from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload, ConversionEventPayload
def on_conversion(payload: ConversionEventPayload, error=None) -> None:
print(f"conversion: goal={payload.goal_key} visitor={payload.visitor_id}")
def on_released(payload: QueueReleasedPayload, error=None) -> None:
if error is not None:
print(f"delivery failed: status={payload.status_code}")
else:
print(f"released {payload.batch_size} events, reason={payload.reason.value}")
core.on(LifecycleEvent.CONVERSION, on_conversion)
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_released)Handlers receive (payload, error=None) — error is a BaseException | None.
A handler that raises is isolated, logged, and swallowed — it cannot break
delivery or other handlers.
EventBus Protocol (for custom bus implementations)
If you need to replace the bus (advanced — most integrations only need
Core.on):
from convert_sdk.ports.event_bus import EventBus, EventHandler
from convert_sdk.events import LifecycleEvent
from typing import Any, Optional
class MyEventBus:
def on(self, event: LifecycleEvent, handler: EventHandler) -> None:
...
def emit(
self,
event: LifecycleEvent,
payload: Any,
error: Optional[BaseException] = None,
) -> None:
...Note: EventBus has exactly two methods — on and emit. There is no
off (handler deregistration is not part of the MVP protocol surface).
See Event System for the full lifecycle event reference.
Testing with custom adapters
The most common use of the extension points in tests: a stub transport with
canned config payloads avoids network calls; a fresh InMemoryDataStore per
test isolates deduplication state.
from typing import Any, Dict
from convert_sdk import Core, SDKConfig, InMemoryDataStore
class CannedTransport:
def __init__(self, payload: Dict[str, Any]) -> None:
self._payload = payload
self.tracking_calls: list = []
def fetch_config(self, config) -> Dict[str, Any]:
return self._payload
def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
self.tracking_calls.append(payload)
def close(self) -> None:
pass
def __enter__(self) -> "CannedTransport":
return self
def __exit__(self, *exc: Any) -> None:
pass
def make_test_core(config_payload: Dict[str, Any]) -> Core:
return Core(
SDKConfig(sdk_key="test-key", data_store=InMemoryDataStore()),
transport=CannedTransport(config_payload),
).initialize()Future async support
These Protocol seams are the foundation for the planned Phase 3 async surface.
When async lands, each seam gains an async sibling (e.g. AsyncTransport,
AsyncDataStore) rather than being re-shaped — existing sync adapters keep
working unchanged. See the async design intent (not implemented in the current release).
What to read next
- Type Hints — Protocol method signatures and field reference
- Initialization —
SDKConfig,TransportConfig,RefreshConfig - Diagnostics — diagnostic logging and
SDKConfig.logger - Persistent DataStore — full lifecycle of DataStore reads and writes
- Event System — lifecycle event payload reference