Skip to content

Postgres integration

Use this after the Rewrite API when your app keeps the Postgres connection.

SQL Tamer plans the rewrite. Your app executes that plan on one Postgres session.

The whole flow

  1. Call POST /v1/rewrite.
  2. Begin one transaction.
  3. Run every statement in setup_sql.
  4. Execute every load_ops entry on the same connection.
  5. Run each main_sql[i] with main_bindings[i].
  6. Run every statement in cleanup_sql.
  7. Commit or roll back.

Every step must stay on the same Postgres session.

Helper rules

  • postgres_bigint_array means "pass this array as one Postgres parameter"
  • postgres_copy_text means "run COPY FROM STDIN into target_table for these columns and rows"
  • Empty setup_sql, load_ops, and cleanup_sql arrays are normal

Integration skeleton

const plan = await rewriteResponse.json();

await pg.query("BEGIN");
try {
  for (const sql of plan.setup_sql) {
    await pg.query(sql);
  }

  for (const loadOp of plan.load_ops) {
    await copyLoadOp(pg, loadOp);
  }

  for (const [index, sql] of plan.main_sql.entries()) {
    const params = decodeBindings(plan.main_bindings?.[index] ?? []);
    await pg.query(sql, params);
  }

  for (const sql of plan.cleanup_sql) {
    await pg.query(sql);
  }

  await pg.query("COMMIT");
} catch (error) {
  await pg.query("ROLLBACK");
  throw error;
}
plan = response.json()

with psycopg.connect(database_url) as conn:
    with conn.cursor() as cur:
        for sql in plan["setup_sql"]:
            cur.execute(sql)

        for load_op in plan["load_ops"]:
            copy_load_op(cur, load_op)

        binding_sets = plan.get("main_bindings", [])
        for index, sql in enumerate(plan["main_sql"]):
            params = decode_bindings(binding_sets[index] if index < len(binding_sets) else [])
            cur.execute(sql, params)

        for sql in plan["cleanup_sql"]:
            cur.execute(sql)

    conn.commit()
$plan = json_decode($response, true, 512, JSON_THROW_ON_ERROR);

runStatement($pg, 'BEGIN');
try {
    foreach ($plan['setup_sql'] as $sql) {
        runStatement($pg, $sql);
    }

    foreach ($plan['load_ops'] as $loadOp) {
        copyLoadOp($pg, $loadOp);
    }

    $bindingSets = $plan['main_bindings'] ?? [];
    foreach ($plan['main_sql'] as $index => $sql) {
        $params = decodeBindings($bindingSets[$index] ?? []);
        $params ? pg_query_params($pg, $sql, $params) : pg_query($pg, $sql);
    }

    foreach ($plan['cleanup_sql'] as $sql) {
        runStatement($pg, $sql);
    }

    runStatement($pg, 'COMMIT');
} catch (Throwable $e) {
    @pg_query($pg, 'ROLLBACK');
    throw $e;
}

What the helpers do

decodeBindings:

  • postgres_bigint_array -> pass binding.values as one driver parameter

copyLoadOp / copy_load_op:

  1. Build COPY target_table (columns...) FROM STDIN
  2. Stream every row from load_op.rows
  3. Finish the copy before running the main query

Failure rules

  • Roll back on any error
  • Do not split the plan across connections
  • If the plan only contains main_sql and main_bindings, you can skip the COPY helper path

Repo examples:

  • examples/api-demo-fetch.js
  • examples/api-demo-python.py
  • examples/api-demo-php.php