Extending the SDK

Swap the Transport, DataStore, and EventBus seams via the three runtime-checkable Protocols

The SDK exposes exactly three @runtime_checkable typing.Protocols as swappable I/O seams. Implementing the methods of a protocol is all that is required — no subclassing, no registration. Each has a default adapter the SDK wires automatically; you replace only what you need to replace.

SeamProtocolDefault adapterInjection point
HTTP transportconvert_sdk.ports.transport.Transporthttpx-backed HttpxTransportCore(config, transport=...) (keyword-only)
Visitor state storageconvert_sdk.ports.storage.DataStoreInMemoryDataStore (per-process)SDKConfig(data_store=...) (config field)
Lifecycle event busconvert_sdk.ports.event_bus.EventBusIn-process synchronous busInternal — observe via Core.on(...)

There is no logger Protocol. The logging seam is Python's stdlib logging — supply a logging.Logger via SDKConfig.logger. The SDK emits through it; it never adds handlers, sets levels, or calls logging.basicConfig().


Where each seam is injected

The two external seams have deliberately different injection points:

  • Transport — keyword-only argument on Core: Core(config, transport=my_transport)
  • DataStore — field on SDKConfig: SDKConfig(data=..., data_store=my_store)

The asymmetry is intentional: Core owns the transport lifecycle (it calls close() and uses it as a context manager); SDKConfig owns the storage declaration so it can be shared across multiple Core instances in the same process.


Custom Transport

Implement Transport to replace config fetching and tracking delivery. Common reasons: mTLS, proxy routing, stubbing in integration tests, or capturing tracking calls in test assertions.

Protocol requirements

from convert_sdk.ports.transport import Transport
from typing import Any, Dict

class MyTransport:
    def fetch_config(self, config: "SDKConfig") -> Dict[str, Any]:
        """Fetch and return the raw config payload.

        Raise convert_sdk.ConfigLoadError on any transport/HTTP/decode failure.
        """
        ...

    def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
        """Deliver a serialized tracking-events batch over HTTPS.

        sdk_key is keyword-only. Raise a ConvertSDKError subclass on failure.
        """
        ...

    def close(self) -> None:
        """Release any held resources (e.g. HTTP connection pool)."""
        ...

    def __enter__(self) -> "Transport":
        return self

    def __exit__(self, *exc: Any) -> None:
        self.close()

Transport is @runtime_checkable — the __enter__/__exit__ pair is required for isinstance(obj, Transport) to return True.

MethodSignatureNotes
fetch_config(config: SDKConfig) -> Dict[str, Any]Returns decoded JSON as dict
send_tracking(payload: Dict[str, Any], *, sdk_key: str) -> Nonesdk_key is keyword-only
close() -> NoneCalled on Core.close() and at context-manager exit
__enter__ / __exit__context-manager pairRequired for structural isinstance check

Complete example: in-memory stub transport

from typing import Any, Dict
from convert_sdk import Core, SDKConfig
from convert_sdk.ports.transport import Transport

class StubTransport:
    """Returns canned config and captures tracking payloads."""

    def __init__(self, config_payload: Dict[str, Any]) -> None:
        self._config = config_payload
        self.tracking_calls: list = []

    def fetch_config(self, config) -> Dict[str, Any]:
        return self._config

    def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
        self.tracking_calls.append(payload)

    def close(self) -> None:
        pass

    def __enter__(self) -> "StubTransport":
        return self

    def __exit__(self, *exc: Any) -> None:
        self.close()

stub = StubTransport(config_payload={
    "account_id": "1001",
    "project": {"id": "2002", "name": "Test"},
    "experiences": [],
    "features": [],
    "goals": [],
})

# @runtime_checkable — duck-typed conformance check:
assert isinstance(stub, Transport)

# transport= is KEYWORD-ONLY on Core:
core = Core(SDKConfig(sdk_key="test-key"), transport=stub).initialize()
context = core.create_context("visitor-001")
context.track_conversion("purchase_completed")
core.flush()

assert stub.tracking_calls   # our stub handled the delivery
core.close()

Example: retrying transport wrapper

import time
from typing import Any, Dict
from convert_sdk.ports.transport import Transport
from convert_sdk.errors import ConfigLoadError

class RetryingTransport:
    """Wraps another transport and retries fetch_config up to `max_retries` times."""

    def __init__(self, inner: Transport, max_retries: int = 3) -> None:
        self._inner = inner
        self._max_retries = max_retries

    def fetch_config(self, config) -> Dict[str, Any]:
        last_exc: Exception | None = None
        for attempt in range(self._max_retries):
            try:
                return self._inner.fetch_config(config)
            except ConfigLoadError as exc:
                last_exc = exc
                if attempt < self._max_retries - 1:
                    time.sleep(2 ** attempt)
        raise last_exc  # type: ignore[misc]

    def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
        self._inner.send_tracking(payload, sdk_key=sdk_key)

    def close(self) -> None:
        self._inner.close()

    def __enter__(self) -> "RetryingTransport":
        return self

    def __exit__(self, *exc: Any) -> None:
        self.close()

Custom DataStore

Implement DataStore to persist visitor state and deduplication markers across process restarts — for example, backed by Redis, Memcache, or a database. Inject it through SDKConfig.data_store, not as a Core argument.

Protocol requirements

from convert_sdk import DataStore   # top-level export
from typing import Any, Optional

class MyStore:
    def get(self, key: str) -> Any:
        """Return the value stored under key, or None if absent or expired."""
        ...

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Store value. ttl is expiry in seconds; None means no expiry."""
        ...

    def has(self, key: str) -> bool:
        """True if key has a present, unexpired value."""
        ...

    def delete(self, key: str) -> None:
        """Remove key. Deleting an absent key is a safe no-op."""
        ...

DataStore is @runtime_checkableisinstance(obj, DataStore) checks for the four method names at runtime (not their signatures).

MethodSignatureNotes
get(key: str) -> AnyReturns None for absent or expired keys
set(key: str, value: Any, ttl: Optional[int] = None) -> Nonettl in seconds; None = no expiry
has(key: str) -> boolMust return False for absent and expired keys
delete(key: str) -> NoneIdempotent — no-op on absent key

Implementations must be safe to call from the synchronous tracking path and from worker threads (the SDK's flush timer runs on a daemon thread).

Complete example: dict-backed store

from typing import Any, Optional
from convert_sdk import Core, SDKConfig, DataStore

class DictStore:
    def __init__(self) -> None:
        self._data: dict = {}
        self.writes: int = 0

    def get(self, key: str) -> Any:
        return self._data.get(key)

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        self._data[key] = value
        self.writes += 1

    def has(self, key: str) -> bool:
        return key in self._data

    def delete(self, key: str) -> None:
        self._data.pop(key, None)

store = DictStore()
assert isinstance(store, DataStore)

# data_store is a CONFIG FIELD, not a Core argument:
core = Core(SDKConfig(data=my_config, data_store=store)).initialize()
context = core.create_context("visitor-001")
context.set_segments({"loyalty_tier": "gold"})  # persists through the store

assert store.writes >= 1
core.close()

Example: Redis-backed store

import json
from typing import Any, Optional
from convert_sdk import Core, SDKConfig

class RedisDataStore:
    def __init__(self, redis_client) -> None:
        self._redis = redis_client

    def get(self, key: str) -> Any:
        raw = self._redis.get(key)
        return json.loads(raw) if raw is not None else None

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        serialized = json.dumps(value)
        if ttl is not None:
            self._redis.setex(key, ttl, serialized)
        else:
            self._redis.set(key, serialized)

    def has(self, key: str) -> bool:
        return bool(self._redis.exists(key))

    def delete(self, key: str) -> None:
        self._redis.delete(key)

core = Core(
    SDKConfig(data=my_config, data_store=RedisDataStore(redis_client=my_redis))
).initialize()

The built-in InMemoryDataStore is thread-safe but process-local. With the default store, deduplication markers reset on process restart. A shared Redis store removes that isolation without any protocol change.

Namespaced storage keys

The SDK stores different types of state under distinct key prefixes so they never collide in the shared store:

  • Visitor state (attributes + segments): keys built by convert_sdk.ports.storage.visitor_state_key(visitor_id)"state:[\"visitor-001\"]"
  • Deduplication markers: keys built by the tracking layer, prefixed "dedup:"
from convert_sdk.ports.storage import visitor_state_key

key = visitor_state_key("visitor-001")
# key == 'state:["visitor-001"]'

See Persistent DataStore for the full lifecycle of reads and writes.


Logging seam (no Protocol)

There is no Logger Protocol — the seam is Python's stdlib logging. Pass your own logging.Logger via SDKConfig.logger to route SDK output through your application's infrastructure:

import logging
from convert_sdk import Core, SDKConfig

my_logger = logging.getLogger("myapp.convert")
core = Core(SDKConfig(data=my_config, logger=my_logger)).initialize()

To silence all SDK output in production:

logging.getLogger("convert_sdk").setLevel(logging.WARNING)

The SDK never adds handlers, sets levels, or calls logging.basicConfig() — that is always the application's responsibility.


Observing lifecycle events (EventBus)

You do not implement the EventBus Protocol directly. The SDK owns the bus. You observe it by registering handlers through Core.on:

from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload, ConversionEventPayload

def on_conversion(payload: ConversionEventPayload, error=None) -> None:
    print(f"conversion: goal={payload.goal_key} visitor={payload.visitor_id}")

def on_released(payload: QueueReleasedPayload, error=None) -> None:
    if error is not None:
        print(f"delivery failed: status={payload.status_code}")
    else:
        print(f"released {payload.batch_size} events, reason={payload.reason.value}")

core.on(LifecycleEvent.CONVERSION, on_conversion)
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_released)

Handlers receive (payload, error=None)error is a BaseException | None. A handler that raises is isolated, logged, and swallowed — it cannot break delivery or other handlers.

EventBus Protocol (for custom bus implementations)

If you need to replace the bus (advanced — most integrations only need Core.on):

from convert_sdk.ports.event_bus import EventBus, EventHandler
from convert_sdk.events import LifecycleEvent
from typing import Any, Optional

class MyEventBus:
    def on(self, event: LifecycleEvent, handler: EventHandler) -> None:
        ...

    def emit(
        self,
        event: LifecycleEvent,
        payload: Any,
        error: Optional[BaseException] = None,
    ) -> None:
        ...

Note: EventBus has exactly two methods — on and emit. There is no off (handler deregistration is not part of the MVP protocol surface).

See Event System for the full lifecycle event reference.


Testing with custom adapters

The most common use of the extension points in tests: a stub transport with canned config payloads avoids network calls; a fresh InMemoryDataStore per test isolates deduplication state.

from typing import Any, Dict
from convert_sdk import Core, SDKConfig, InMemoryDataStore

class CannedTransport:
    def __init__(self, payload: Dict[str, Any]) -> None:
        self._payload = payload
        self.tracking_calls: list = []

    def fetch_config(self, config) -> Dict[str, Any]:
        return self._payload

    def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
        self.tracking_calls.append(payload)

    def close(self) -> None:
        pass

    def __enter__(self) -> "CannedTransport":
        return self

    def __exit__(self, *exc: Any) -> None:
        pass


def make_test_core(config_payload: Dict[str, Any]) -> Core:
    return Core(
        SDKConfig(sdk_key="test-key", data_store=InMemoryDataStore()),
        transport=CannedTransport(config_payload),
    ).initialize()

Future async support

These Protocol seams are the foundation for the planned Phase 3 async surface. When async lands, each seam gains an async sibling (e.g. AsyncTransport, AsyncDataStore) rather than being re-shaped — existing sync adapters keep working unchanged. See the async design intent (not implemented in the current release).


What to read next