Python SDK.
A thin shell around the HTTP API. Constructor takes base_url, bearer, and tenant;
auto-generates idempotency keys for retries; auto-backs off on 429 and 5xx; raises a typed exception per status class.
The async variant mirrors the surface exactly.
pip install originchain python >= 3.10 · httpx stable surface · semver-tracked from originchain import OriginChain
db = OriginChain(
base_url="https://acme.originchain.ai",
bearer="oc_live_...",
tenant="01HW7G5...",
timeout=30.0, # per-request, seconds
max_retries=3, # 429 / 5xx with exponential backoff + jitter
)from originchain import OriginChain
# Reads OC_BASE_URL, OC_BEARER, OC_TENANT.
db = OriginChain.from_env()from originchain import OriginChain
with OriginChain.from_env() as db:
rows = db.query({ "Scan": { "schema": "trading.orders" } }) db.schemas
Register a TOML manifest, list ids, fetch raw TOML.
# TOML manifest source (read from disk or composed inline).
toml_src = open("trading.orders.toml").read()
resp = db.schemas.register(toml_src)
# -> {"id": "trading.orders", "version": 1}ids = db.schemas.list()
# -> ["trading.orders", "trading.users", ...]# Returns the raw TOML text.
toml_src = db.schemas.get("trading.orders")
print(toml_src) db.rows
Typed CRUD over a registered schema. put_batch chunks
a generator into atomic WAL frames; each chunk gets a derived idempotency key, so partial retries don't double-write.
# Single-row upsert. Idempotency-Key auto-generated if you don't pass one.
import uuid
resp = db.rows.put(
"trading.orders",
{ "order_id": "o-0001", "symbol": "AAPL", "qty": 100 },
expect_insert=False, # set True to skip the prior-state read
idempotency_key=str(uuid.uuid4()), # safe to retry
)
# -> {"ok": True, "lsn": {"segment": 4, "offset": 8421007}}row = db.rows.get("trading.orders", "o-0001")
# -> {"order_id": "o-0001", "symbol": "AAPL", "qty": 100, "_oc_row_version": 1}# Chunks the iterable into batches of `chunk` rows. Each chunk is
# ONE atomic WAL frame. A per-chunk idempotency key is derived from
# `idempotency_key` so partial retries don't double-count.
def gen_orders():
for i in range(50_000):
yield { "order_id": f"o-{i}", "symbol": "AAPL", "qty": 100 }
inserted = db.rows.put_batch(
"trading.orders",
gen_orders(),
chunk=2_000,
idempotency_key="bulk-2026-05-01",
)
print(f"{inserted} rows accepted") db.ask · db.query
Two paths into the executor. db.ask compiles a natural-language
query into a Plan; db.query takes a hand-authored Plan tree
directly.
# Compiled to a Plan via the rule grammar (and optionally Bedrock).
res = db.ask("last 10 orders where status is pending",
schemas=["trading.orders"])
for row in res["rows"]:
print(row)# Hand-authored Plan trees skip the NL layer entirely.
plan = {
"Limit": { "n": 10, "child": {
"Filter": {
"predicate": { "Eq": ["status", "pending"] },
"child": { "Scan": { "schema": "trading.orders" } }
}
}}
}
rows = db.query(plan) AsyncOriginChain
Same surface, asyncio-friendly. The async client owns its own httpx.AsyncClient
so it composes cleanly with FastAPI / Starlette workers. Always await db.aclose() on shutdown.
import asyncio
from originchain import AsyncOriginChain
async def main():
db = AsyncOriginChain(
base_url="https://acme.originchain.ai",
bearer="oc_live_...",
tenant="01HW7G5...",
)
try:
await db.schemas.list()
await db.rows.put("trading.orders",
{ "order_id": "o-1", "symbol": "AAPL", "qty": 100 })
finally:
await db.aclose()
asyncio.run(main()) Typed errors
Every HTTP failure surfaces as a subclass of OCError.
Catch the specific class to decide retryability without inspecting status codes.
| Exception | Status | Retry? | Meaning |
|---|---|---|---|
| OCAuthError | 401 / 403 | no | Bearer missing or wrong-tenant. |
| OCNotFoundError | 404 | no | Resource doesn't exist. |
| OCValidationError | 400 | no | Body / params malformed. |
| OCRateLimitedError | 429 | yes (after retry_after) | Per-bearer bucket drained. |
| OCServerError | 5xx | if idempotent | Server-side failure. |
| OCReplicationDegraded | 2xx (warning) | n/a | Write committed, follower lagged. Promote to alert if RPO=0 matters. |
from originchain import (
OriginChain,
OCAuthError, OCNotFoundError, OCRateLimitedError,
OCValidationError, OCServerError,
)
db = OriginChain.from_env()
try:
db.rows.put("trading.orders", { "qty": "not-an-int" })
except OCValidationError as e:
# 400 — DON'T retry. Body is malformed.
print("client bug:", e, e.body)
except OCAuthError as e:
# 401 / 403 — DON'T retry. Bearer is wrong or out of scope.
raise
except OCNotFoundError as e:
# 404 — schema or row doesn't exist.
raise
except OCRateLimitedError as e:
# 429 — Retry after `e.retry_after` seconds.
time.sleep(e.retry_after)
except OCServerError as e:
# 5xx — server-side. Retry with backoff if idempotent.
raiseimport warnings
from originchain import OCReplicationDegraded
# OCReplicationDegraded is a *Warning*, not an exception. The write
# committed durably on the leader; the follower didn't ack within
# the sync-timeout window. Promote it to an alert via the warnings
# filter, or ignore it if RPO=0 isn't a contract for the workload.
warnings.simplefilter("error", OCReplicationDegraded)
The client retries up to max_retries (default 3) on
429 and
5xx with exponential backoff
(0.25 s, 0.5 s, 1 s — jitter capped at 4 s). When the server returns
Retry-After, that wins. Mutating writes are safe to retry only when
Idempotency-Key is set; the client auto-derives a per-chunk key in
put_batch, but for single-row put you should pass one yourself.
Raw HTTP escape-hatch
The SQL, vector, full-text, graph, watch, and migrations endpoints are reached via the underlying
db._client
(an httpx.Client) — typed wrappers are landing in upcoming releases. The auth header and base URL are already wired.
The HTTP shapes are fully documented on the API reference page; copy-paste the JSON body and you're good.
# /v1/sql doesn't have a typed Python method yet — drop into
# the underlying httpx.Client to call it directly.
resp = db._client.post(
f"/v1/tenants/{db.tenant}/sql",
json={ "sql": "SELECT order_id, symbol FROM trading.orders WHERE status = 'pending' LIMIT 10" },
)
resp.raise_for_status()
data = resp.json()
# -> {"kind": "select", "rows": [...]}# Upsert
db._client.post(
f"/v1/tenants/{db.tenant}/vector/embeddings/put",
json={
"id": "doc-1",
"embedding": [0.011, -0.443, ...],
"dim": 384,
"metric": "cosine",
"metadata": { "lang": "en" },
},
).raise_for_status()
# Top-k with filter
hits = db._client.post(
f"/v1/tenants/{db.tenant}/vector/embeddings/topk",
json={
"query": [0.012, -0.439, ...],
"k": 10,
"dim": 384,
"filter": { "lang": "en" },
"mode": "hnsw",
},
).json()# Index
db._client.post(
f"/v1/tenants/{db.tenant}/fts/articles/body",
json={ "doc_id": "art-001", "text": "OriginChain ships managed substrate-grade storage." },
).raise_for_status()
# BM25 search
hits = db._client.get(
f"/v1/tenants/{db.tenant}/fts/articles/body",
params={ "q": "managed storage", "mode": "bm25", "k": 10 },
).json()
# -> [{"doc_id": "art-001", "score": 7.42}, ...]# One-hop forward neighbours
out = db._client.get(
f"/v1/tenants/{db.tenant}/graph/social.users/neighbors",
params={ "rel": "follows", "pk": "alice" },
).json()
# -> ["bob", "carol", "dave"]
# Weighted shortest path
import json
cost = db._client.get(
f"/v1/tenants/{db.tenant}/graph/road.cities/dijkstra",
params={
"rel": "connects",
"src": "NYC",
"dst": "SFO",
"weights_json": json.dumps({"NYC|CHI": 2.0, "CHI|DEN": 1.5, "DEN|SFO": 1.2}),
},
).json()
# -> {"cost": 4.7}# Server-Sent Events stream — no typed iterator yet.
import httpx
with httpx.stream(
"GET",
f"{db.base_url}/v1/tenants/{db.tenant}/watch",
params={ "schemas": "trading.orders" },
headers={
"Authorization": f"Bearer {db.bearer}",
"Accept": "text/event-stream",
},
timeout=None, # SSE connections idle indefinitely
) as resp:
for line in resp.iter_lines():
if line.startswith("data:"):
print(line.removeprefix("data: "))