RAG / LLM apps
Retrieval-augmented generation (RAG) means: pull the most relevant chunks from your data, give them to an LLM as context, ask the LLM to answer using only that context. OriginChain handles the storage + retrieval; you handle the chunking + the LLM call.
The big advantage over a Pinecone + Elasticsearch + Postgres stack: rows, vector embeddings, and full-text indexes all live in one place. No ETL, no eventual consistency between systems, one bearer token. Your app's authorization rules apply to retrieval automatically because there's only one store.
1. Schema for chunks.
A chunk is one passage of text addressable as a single row. Each chunk gets a primary key (typically docId:chunkIndex), the source document ID for filtering, page / location metadata for citations, and the chunk text itself.
# manifest.toml - the row schema for your chunks.
# Vector embeddings and the BM25 index live on separate runtime endpoints,
# linked back to this row by primary key.
namespace = "rag"
table = "chunks"
primary_key = ["id"]
[[columns]]
name = "id"
ty = "str"
required = true
[[columns]]
name = "doc_id"
ty = "str"
required = true
[[columns]]
name = "source"
ty = "str"
[[columns]]
name = "page"
ty = "i64"
[[columns]]
name = "text"
ty = "str"
[[columns]]
name = "created_ms"
ty = "u64"
# Index doc_id so "all chunks for this doc" is fast.
[[indexes]]
name = "by_doc"
columns = ["doc_id"]
Register this with POST /v1/tenants/:t/schemas (see Schemas overview). Vector dim and FTS analyzer are set per-call at the runtime endpoints - not on this schema.
2. Ingest path.
For each chunk: one row write, one vector put, one FTS index call. Three separate endpoints today (helpers that combine them ship in a future release). Each call is atomic on its own.
# 1. Chunk the source text into ~512-token windows with 50-token overlap.
# 2. Embed each chunk with whatever embedding model you use.
# 3. Write the chunk row + the embedding + the FTS index entry.
from originchain import OriginChain
from openai import OpenAI
import time, os
db = OriginChain(
base_url=f"https://{os.environ['OC_HOST']}",
bearer=os.environ["OC_TOKEN"],
tenant=os.environ["OC_TENANT"],
)
ai = OpenAI()
def ingest(doc_id, source, full_text):
chunks = chunk_text(full_text, tokens=512, overlap=50)
emb = ai.embeddings.create(
model="text-embedding-3-small",
input=[c.text for c in chunks],
)
for i, c in enumerate(chunks):
cid = f"{doc_id}:{i}"
# 1. The row.
db.rows.put("rag.chunks", {
"id": cid,
"doc_id": doc_id,
"source": source,
"page": c.page,
"text": c.text,
"created_ms": int(time.time() * 1000),
})
# 2. The vector embedding.
db.vector.put("rag.chunks", cid, emb.data[i].embedding,
metadata={"source": source, "page": c.page})
# 3. The BM25 index.
db.fts.index("rag.chunks", "text", doc_id=cid, text=c.text)// TS SDK doesn't wrap row writes yet - using fetch for the row write,
// SDK for vector + FTS. Same logic as Python above.
import { OriginChainClient } from "@originchain/sdk";
import OpenAI from "openai";
const db = new OriginChainClient({
baseUrl: `https://${process.env.OC_HOST}`,
bearer: process.env.OC_TOKEN!,
});
const ai = new OpenAI();
async function ingest(docId: string, source: string, fullText: string) {
const chunks = chunkText(fullText, { tokens: 512, overlap: 50 });
const emb = await ai.embeddings.create({
model: "text-embedding-3-small",
input: chunks.map(c => c.text),
});
for (let i = 0; i < chunks.length; i++) {
const cid = `${docId}:${i}`;
// 1. Row write via fetch.
await fetch(`https://${process.env.OC_HOST}/v1/tenants/${process.env.OC_TENANT}/rows/rag.chunks`, {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.OC_TOKEN}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
id: cid, doc_id: docId, source, page: chunks[i].page,
text: chunks[i].text, created_ms: Date.now(),
}),
});
// 2. Vector embedding.
await db.vectorPut("rag.chunks", {
id: cid, embedding: emb.data[i].embedding, dim: 1536, metric: "cosine",
metadata: { source, page: chunks[i].page },
});
// 3. BM25 index.
await db.ftsIndex("rag.chunks", "text", { doc_id: cid, text: chunks[i].text });
}
} - Chunks too big. Most embedding models lose quality past ~512 tokens. Past ~1500 you're throwing away precision.
- No overlap. Without 10-20% chunk overlap, key sentences land at chunk boundaries and disappear from retrieval.
- Embedding only - or BM25 only. Vector misses exact tokens (SKUs, error codes, model numbers). BM25 misses synonyms. Index both, fuse the results.
3. Retrieval + fusion.
Run vector and BM25 in parallel, fuse the results, fetch the full chunks for the top-N fused IDs, send to the LLM with a strict "use only this context" instruction.
# 1. Embed the question.
# 2. Run vector topk + BM25 search in parallel.
# 3. Fuse with Reciprocal Rank Fusion.
# 4. Pull the chunk rows for the top-k IDs.
# 5. Send them to the LLM as context.
def answer(question, source_filter=None):
# 1.
q_vec = ai.embeddings.create(
model="text-embedding-3-small",
input=question,
).data[0].embedding
# 2. Parallel retrieval (one round-trip each, can run concurrently).
vec_hits = db.vector_topk(
"rag.chunks",
query=q_vec, k=20, dim=1536, metric="cosine",
filter={"source": source_filter} if source_filter else None,
)
fts_hits = db.fts.search("rag.chunks", "text", q=question, mode="bm25", k=20)
# 3. Reciprocal Rank Fusion. Pick the top 5 fused IDs.
fused_ids = rrf([h.id for h in vec_hits], [h.doc_id for h in fts_hits])[:5]
# 4. Pull full chunk text + metadata.
rows = []
for cid in fused_ids:
rows.append(db.rows.get("rag.chunks", cid))
# 5. Send to the LLM as bounded, cited context.
context = "\n\n".join(
f"[Source {i+1}] ({r['source']} p.{r['page']}) {r['text']}"
for i, r in enumerate(rows)
)
completion = ai.chat.completions.create(
model="gpt-4o",
temperature=0.0,
messages=[
{ "role": "system", "content":
"Answer using ONLY the context below. Cite every claim with [Source N]. "
"If the context is insufficient, say so." },
{ "role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}" },
],
)
return completion.choices[0].message.contentasync function answer(question: string, sourceFilter?: string) {
// 1. Embed the question.
const qVec = (await ai.embeddings.create({
model: "text-embedding-3-small", input: question,
})).data[0].embedding;
// 2. Parallel retrieval.
const [vecHits, ftsHits] = await Promise.all([
db.vectorTopk("rag.chunks", {
query: qVec, k: 20, dim: 1536, metric: "cosine",
filter: sourceFilter ? { source: sourceFilter } : undefined,
}),
db.ftsSearch("rag.chunks", "text", { q: question, mode: "bm25", k: 20 }),
]);
// 3. Reciprocal Rank Fusion. Pick top 5.
const fusedIds = rrf(
vecHits.map(h => h.id),
(ftsHits as { doc_id: string }[]).map(h => h.doc_id),
).slice(0, 5);
// 4. Fetch chunks via raw fetch (row helpers ship soon).
const rows = await Promise.all(fusedIds.map(async (cid) => {
const r = await fetch(
`https://${process.env.OC_HOST}/v1/tenants/${process.env.OC_TENANT}/rows/rag.chunks/${cid}`,
{ headers: { "Authorization": `Bearer ${process.env.OC_TOKEN}` } },
);
return r.json();
}));
// 5. Send to LLM.
const context = rows.map((r, i) =>
`[Source ${i+1}] (${r.source} p.${r.page}) ${r.text}`,
).join("\n\n");
const completion = await ai.chat.completions.create({
model: "gpt-4o", temperature: 0.0,
messages: [
{ role: "system", content:
"Answer using ONLY the context below. Cite every claim with [Source N]. " +
"If the context is insufficient, say so." },
{ role: "user", content: `Context:\n${context}\n\nQuestion: ${question}` },
],
});
return completion.choices[0].message.content;
} 4. The RRF helper.
Reciprocal Rank Fusion. Five lines. Works for any number of ranked lists. k=60 is the constant from the original paper - rarely worth tuning.
# Reciprocal Rank Fusion - simple, language-agnostic.
# Takes ranked lists of IDs, returns a fused ranked list.
# k=60 is the standard constant from the original paper.
def rrf(*ranked_lists, k=60):
scores = {}
for lst in ranked_lists:
for rank, doc_id in enumerate(lst):
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
return [doc_id for doc_id, _ in
sorted(scores.items(), key=lambda x: -x[1])] 5. Tips + tradeoffs.
- Optional reranker. A cross-encoder reranker (Cohere Rerank, Voyage Rerank, or a self-hosted bge-reranker) on the fused top-20 typically gains 5-15% NDCG. Add it as step 4.5.
- Filter on metadata for tenant isolation. Pass
filter: {tenant_id: "..."}on vector topk so users only see their own chunks. The filter applies during search, not after - so it's still fast. - Mode selector for speed. Set
mode: "fast"on the vector topk when latency matters more than recall (e.g., search-as-you-type). The hit count drops slightly but the call gets ~3x faster. - Semantic cache for hot questions. Embed user questions into a small
rag.query_cachetable. Before going through retrieval, do a vector topk on it - if any past question has cosine ≥ 0.97 to this one, return the cached answer. - Cite every claim. Always pass page numbers + source IDs in the context. Force the LLM to cite. Users trust answers they can verify.