Postgres integration
Use this after the Rewrite API when your app keeps the Postgres connection.
SQL Tamer plans the rewrite. Your app executes that plan on one Postgres session.
The whole flow
- Call
POST /v1/rewrite. - Begin one transaction.
- Run every statement in
setup_sql. - Execute every
load_opsentry on the same connection. - Run each
main_sql[i]withmain_bindings[i]. - Run every statement in
cleanup_sql. - Commit or roll back.
Every step must stay on the same Postgres session.
Helper rules
postgres_bigint_arraymeans "pass this array as one Postgres parameter"postgres_copy_textmeans "run COPY FROM STDIN intotarget_tablefor thesecolumnsandrows"- Empty
setup_sql,load_ops, andcleanup_sqlarrays are normal
Integration skeleton
const plan = await rewriteResponse.json();
await pg.query("BEGIN");
try {
for (const sql of plan.setup_sql) {
await pg.query(sql);
}
for (const loadOp of plan.load_ops) {
await copyLoadOp(pg, loadOp);
}
for (const [index, sql] of plan.main_sql.entries()) {
const params = decodeBindings(plan.main_bindings?.[index] ?? []);
await pg.query(sql, params);
}
for (const sql of plan.cleanup_sql) {
await pg.query(sql);
}
await pg.query("COMMIT");
} catch (error) {
await pg.query("ROLLBACK");
throw error;
}
plan = response.json()
with psycopg.connect(database_url) as conn:
with conn.cursor() as cur:
for sql in plan["setup_sql"]:
cur.execute(sql)
for load_op in plan["load_ops"]:
copy_load_op(cur, load_op)
binding_sets = plan.get("main_bindings", [])
for index, sql in enumerate(plan["main_sql"]):
params = decode_bindings(binding_sets[index] if index < len(binding_sets) else [])
cur.execute(sql, params)
for sql in plan["cleanup_sql"]:
cur.execute(sql)
conn.commit()
$plan = json_decode($response, true, 512, JSON_THROW_ON_ERROR);
runStatement($pg, 'BEGIN');
try {
foreach ($plan['setup_sql'] as $sql) {
runStatement($pg, $sql);
}
foreach ($plan['load_ops'] as $loadOp) {
copyLoadOp($pg, $loadOp);
}
$bindingSets = $plan['main_bindings'] ?? [];
foreach ($plan['main_sql'] as $index => $sql) {
$params = decodeBindings($bindingSets[$index] ?? []);
$params ? pg_query_params($pg, $sql, $params) : pg_query($pg, $sql);
}
foreach ($plan['cleanup_sql'] as $sql) {
runStatement($pg, $sql);
}
runStatement($pg, 'COMMIT');
} catch (Throwable $e) {
@pg_query($pg, 'ROLLBACK');
throw $e;
}
What the helpers do
decodeBindings:
postgres_bigint_array-> passbinding.valuesas one driver parameter
copyLoadOp / copy_load_op:
- Build
COPY target_table (columns...) FROM STDIN - Stream every row from
load_op.rows - Finish the copy before running the main query
Failure rules
- Roll back on any error
- Do not split the plan across connections
- If the plan only contains
main_sqlandmain_bindings, you can skip the COPY helper path
Repo examples:
examples/api-demo-fetch.jsexamples/api-demo-python.pyexamples/api-demo-php.php