PostgreSQL ingest connector.
The PostgreSQL ingest connector pulls rows from an existing Postgres source into your OriginChain tenant in one HTTP call. No separate ETL service, no queue, no glue code. Configure the source, declare the table mappings, POST — the engine handles the rest.
Quickstart.
curl -X POST "https://acme.ap-south-1.db.originchain.ai/v1/tenants/$T/ingest/postgres/sync" \
-H "Authorization: Bearer $OC_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source": {
"host": "pg.internal",
"port": 5432,
"database": "production",
"user": "readonly",
"password": "***",
"sslmode": "require"
},
"mappings": [
{
"source_table": "public.orders",
"target_schema": "orders",
"primary_key": ["id"],
"columns": ["id", "customer_id", "amount", "status", "created_at"]
}
]
}' Source config.
-
host / port / databaseSource endpoint. Must be reachable from your OriginChain instance — public, peered, or via PrivateLink on Enterprise. -
user / passwordRead-only Postgres role recommended. Credentials are held in memory for the duration of the sync. -
sslmode"require" or "verify-full". Plaintext is rejected.
Target schema mapping.
Each mapping picks a source table, a target OriginChain schema, a primary key, and the columns to pull. The connector creates the OriginChain schema if it does not exist, with column types inferred from the Postgres type map. You can declare the schema up-front in the manifest if you want explicit control.
Scheduling.
v1 is one-shot: each POST runs a complete sync. Schedule from your side (cron, a queue, or a tiny scheduler script) until the engine ships a managed scheduler. The endpoint is idempotent on primary key — the same row pulled twice overwrites; no duplicates.
import os, requests, schedule, time
def sync_once():
requests.post(
f"{os.environ['OC_ENDPOINT']}/v1/tenants/{os.environ['T']}/ingest/postgres/sync",
headers={"Authorization": f"Bearer {os.environ['OC_TOKEN']}"},
json={
"source": {...},
"mappings": [...],
},
timeout=900,
).raise_for_status()
schedule.every(15).minutes.do(sync_once)
while True:
schedule.run_pending()
time.sleep(1) What's next.
- Logical-decoding mode (CDC-style stream) is on the roadmap. v1 is full-table sync.
- Source-side column filters (push down predicates to the Postgres query) — coming alongside CDC.
- MySQL + SQLite connectors are next on the integration list.