Contents

PostgreSQL Data Pipeline Architecture

Overview

This is a breakdown of a production-ready init.sql file that creates a complete data engineering infrastructure with three layers: staging, production, and analytics.


Database & Extensions

CREATE DATABASE dev_db;
\c dev_db;

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS postgis;

Extensions:

  • uuid-ossp — Generate unique IDs for distributed systems
  • postgis — Geographic data support (GPS coordinates, distance calculations)

Three-Layer Architecture

CREATE SCHEMA IF NOT EXISTS staging;
CREATE SCHEMA IF NOT EXISTS production;
CREATE SCHEMA IF NOT EXISTS analytics;
SchemaPurposeData QualityRetention
stagingRaw data landing zoneNo constraintsShort-term
productionCleaned, validated dataStrict constraintsLong-term
analyticsAggregations, KPIsPre-computedRefreshed regularly

Analogy:

  • staging = warehouse receiving dock (unopened boxes)
  • production = organized store (shelved products)
  • analytics = display window (curated highlights)

Staging Tables (Raw Data)

CREATE TABLE IF NOT EXISTS staging.raw_users (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id INTEGER NOT NULL,
    username VARCHAR(100),
    email VARCHAR(255),
    created_at TIMESTAMP,
    country VARCHAR(100),
    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system VARCHAR(50) DEFAULT 'api'
);

Key points:

  • UUID as primary key — unique across distributed systems
  • ingested_at — when data arrived (for debugging)
  • source_system — where data came from (for auditing)
  • No strict constraints — raw data can be incomplete

JSONB for Flexible Data

CREATE TABLE IF NOT EXISTS staging.raw_events (
    ...
    event_data JSONB,
    ...
);

JSONB stores variable structures:

{
  "page": "/home",
  "duration": 45,
  "device": "mobile"
}

Production Tables (Clean Data)

CREATE TABLE IF NOT EXISTS production.users (
    user_id SERIAL PRIMARY KEY,
    username VARCHAR(100) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    email_verified BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT valid_email CHECK (
        email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'
    )
);

Production constraints:

  • SERIAL — Auto-incrementing IDs
  • NOT NULL — Required fields
  • UNIQUE — No duplicates
  • CHECK — Regex validation for email format

Foreign Keys

CREATE TABLE IF NOT EXISTS production.transactions (
    transaction_id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    CONSTRAINT fk_user
        FOREIGN KEY (user_id)
        REFERENCES production.users(user_id)
        ON DELETE CASCADE,
    CONSTRAINT positive_amount CHECK (amount > 0),
    CONSTRAINT valid_status CHECK (
        status IN ('pending', 'completed', 'failed', 'refunded')
    )
);

Notes:

  • DECIMAL(10, 2) — Never use FLOAT for money (rounding errors)
  • ON DELETE CASCADE — Delete transactions when user is deleted
  • Multiple CHECK constraints for data integrity

Date Dimension Table

CREATE TABLE IF NOT EXISTS production.dim_date (
    date_id SERIAL PRIMARY KEY,
    full_date DATE NOT NULL UNIQUE,
    year INTEGER NOT NULL,
    quarter INTEGER NOT NULL,
    month INTEGER NOT NULL,
    month_name VARCHAR(20),
    week INTEGER NOT NULL,
    day_of_week INTEGER NOT NULL,
    day_name VARCHAR(20),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN DEFAULT FALSE
);

Why a date table?

  • Faster analytical queries
  • Easy filtering by quarter, weekend, holidays
  • Standardized definitions across the organization

Usage:

SELECT d.quarter, SUM(t.amount)
FROM transactions t
JOIN dim_date d ON DATE(t.transaction_date) = d.full_date
GROUP BY d.quarter;

Materialized Views

CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.daily_user_metrics AS
SELECT
    DATE(t.transaction_date) as date,
    u.country,
    COUNT(DISTINCT u.user_id) as active_users,
    SUM(t.amount) as total_revenue,
    AVG(t.amount) as avg_transaction
FROM production.users u
LEFT JOIN production.transactions t ON u.user_id = t.user_id
GROUP BY DATE(t.transaction_date), u.country;
Regular ViewMaterialized View
Computed on each queryStored physically
Always currentNeeds REFRESH
SlowerMuch faster
No disk spaceUses disk space

Refresh:

REFRESH MATERIALIZED VIEW analytics.daily_user_metrics;

Indexes

CREATE INDEX idx_transactions_user_id
    ON production.transactions(user_id);

CREATE INDEX idx_user_date
    ON production.transactions(user_id, transaction_date);

CREATE INDEX idx_events_data
    ON staging.raw_events USING GIN(event_data);

Index types:

  • B-Tree (default) — Standard lookups
  • Composite — Multiple columns (order matters)
  • GIN — For JSONB data

Index columns used in:

  • WHERE clauses
  • JOIN conditions
  • ORDER BY / GROUP BY

Trade-off: Faster reads, slower writes, uses disk space.


Triggers

CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_users_updated_at
    BEFORE UPDATE ON production.users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

Trigger concepts:

  • BEFORE — Can modify data before it’s saved
  • AFTER — For logging, notifications
  • NEW — New row values
  • OLD — Previous row values

Use cases:

  • Audit trails
  • Auto-updating timestamps
  • Complex validation
  • Synchronization between tables

Utility Functions

Cleanup Old Data

CREATE OR REPLACE FUNCTION staging.cleanup_old_data(
    days_to_keep INTEGER DEFAULT 30
)
RETURNS INTEGER AS $$
DECLARE
    rows_deleted INTEGER;
BEGIN
    DELETE FROM staging.raw_users
    WHERE ingested_at < CURRENT_TIMESTAMP - (days_to_keep || ' days')::INTERVAL;

    GET DIAGNOSTICS rows_deleted = ROW_COUNT;
    RETURN rows_deleted;
END;
$$ LANGUAGE plpgsql;

Usage:

SELECT staging.cleanup_old_data(7);  -- Keep 7 days

Idempotent Patterns

-- Safe to run multiple times
CREATE TABLE IF NOT EXISTS ...
CREATE INDEX IF NOT EXISTS ...

-- Upsert pattern
INSERT INTO production.users (username, email)
VALUES ('alice', '[email protected]')
ON CONFLICT (username) DO NOTHING;

-- Or update on conflict
ON CONFLICT (username) DO UPDATE
SET email = EXCLUDED.email;

Data Engineering Concepts

ConceptImplementation
Medallion Architecturestaging → production → analytics
Data QualityNOT NULL, UNIQUE, CHECK, FK
Metadataingested_at, source_system, updated_at
IdempotenceIF NOT EXISTS, ON CONFLICT
Star Schemadim_date + fact tables

Typical Pipeline Workflow

1. Ingestion  staging.raw_*
   
2. Transform  production.*
   
3. Aggregate  analytics.*
   
4. Visualize (BI tools)

Frequency:

  • staging → production: Hourly
  • production → analytics: Nightly
  • Materialized view refresh: On demand

Verification Commands

\dn                      -- List schemas
\dt production.*         -- List tables in schema
\di production.users     -- Show indexes
\d+ production.users     -- Table details with comments

Complete PostgreSQL setup for data engineering pipelines. Based on the three-layer architecture pattern.