Home Programming Managing Metadata and Time-Series Data Together: A Practical Guide for Facility and Sensor Signal Systems

Managing Metadata and Time-Series Data Together: A Practical Guide for Facility and Sensor Signal Systems

Last updated: May 27, 2026
k
Published April 7, 2026 · Updated May 27, 2026 · 19 min read

Summary

What this post covers: A complete reference for designing systems that store facility metadata and high-frequency sensor time-series together, with SQL schemas, ingestion pipelines, Python code, and a manufacturing case study.

Key insights:

  • Metadata and time-series have fundamentally incompatible workloads — relational/hierarchical/slow-changing versus append-only/time-partitioned/high-volume — so forcing both into one storage engine produces queries that take minutes instead of milliseconds.
  • The correct architecture pairs PostgreSQL for metadata (facilities, equipment, sensors, maintenance logs) with TimescaleDB hypertables for measurements, bridged only by a sensor_id foreign key — not by embedding metadata into every reading.
  • Cross-domain queries like “show vibration anomalies on Building A’s CNC machines installed after 2023” should be answered with a metadata-filter-first pattern that resolves sensor IDs in PostgreSQL, then performs a time-windowed scan in TimescaleDB.
  • Scaling beyond billions of rows requires compressing chunks after roughly seven days, materializing continuous aggregates for dashboards, and pushing tag-rich metadata into a JSONB column to avoid schema explosion.
  • The most common failure modes are duplicating metadata in every time-series row, leaving orphaned sensor IDs when assets are retired, and skipping API-level joins so callers have to manually correlate two opaque payloads.

Main topics: Introduction, The Data Model Challenge, Architecture Patterns, Detailed Schema Design Best Practices, Data Ingestion Pipeline, Querying Across Metadata and Time-Series, API Design for Metadata + Time-Series, Handling Scale, Real-World Example: Manufacturing Plant, Common Pitfalls, Final Thoughts, References.

Introduction

Consider a factory floor with 500 sensors generating 2.6 billion data points per year. Every vibration reading, every temperature spike, and every pressure anomaly is faithfully captured and stored. When an engineer asks a straightforward question—”Show me all vibration anomalies from Building A’s CNC machines installed after 2023″—the team is unable to provide an answer in under ten minutes. The data exist, scattered across three different systems, but nobody can extract them quickly.

This scenario recurs in manufacturing plants, energy grids, building management systems, and IoT deployments worldwide. The root cause is consistently the same: the team treated metadata and time-series data as separate problems and never designed the bridge between them. The choice of storage layer is an important first step, and the comparison of databases for preprocessed time-series data covers the options in depth.

Any industrial, manufacturing, or IoT system involves two fundamentally different types of data that must work in concert. First, there is metadata: information about facilities, equipment, sensors, locations, configurations, maintenance history, and calibration records. These data are relational, hierarchical, and change slowly. Second, there is time-series data: the actual sensor signals (temperature, vibration, pressure, torque, current, flow rate) streaming in at high frequency, sometimes thousands of readings per second. These data are append-only, voluminous, and indexed by time.

The relationship between these two data types is what enables the system to function. A sensor reading of “47.3” means nothing without the knowledge that sensor S-0142 is a thermocouple mounted on a FANUC CNC spindle in Building A, calibrated last month, with an operating range of 15 to 85 °C. The sensor_id is the bridge: metadata indicate what, while time-series indicate when and how much.

Most teams handle this relationship incorrectly. They embed metadata in every time-series row, creating substantial bloat; they separate the two completely without proper foreign keys, creating orphaned data; or they force everything into a single database that performs poorly on at least one workload. The outcome is consistent: queries that should take milliseconds take minutes, data that should be connected remain isolated, and engineers who should be detecting anomalies instead contend with data infrastructure.

This guide provides a reference for designing a system that manages metadata and time-series data together correctly. It examines four architecture patterns, complete SQL schemas, Python code using SQLAlchemy and FastAPI, ingestion pipelines, query optimisation strategies, and a real-world manufacturing example. By the conclusion, the reader will have the necessary material to build a system in which the “CNC vibration anomalies in Building A” query returns results in less than a second.

Metadata + Time-Series Architecture: PostgreSQL and TimescaleDB Metadata + Time-Series Architecture PostgreSQL (Metadata) facilities equipment sensors maintenance_logs Relational · Hierarchical · Slow-changing sensor_id Foreign Key Bridge TimescaleDB (Measurements) sensor_readings (hypertable) anomaly_events continuous aggregates compressed chunks (7d+) Append-only · Time-partitioned · High-volume

The Data Model Challenge

Before considering solutions, it is necessary to understand clearly why these two data types are difficult to manage together. They have fundamentally different characteristics, and a database architecture that is optimal for one is almost always suboptimal for the other.

Metadata: Relational, Hierarchical, and Slowly Changing

Facility and sensor metadata follow a natural hierarchy. A typical industrial deployment is structured as follows:

Organisation → Site → Building → Production Line → Machine → Component → Sensor

Each level in this hierarchy carries substantial attributes. A sensor record may include sensor type, unit of measurement, sampling rate in Hz, minimum and maximum operating range, calibration date, firmware version, installation date, and the equipment on which it is mounted. A machine record includes manufacturer, model, serial number, commissioning date, maintenance schedule, and operating parameters.

These data are relational—sensors belong to equipment, equipment belongs to production lines, and production lines belong to buildings. They are hierarchical—queries such as “all sensors in Building A” require tree traversal. They are slowly changing—sensors are recalibrated, machines are moved to different production lines, and firmware is updated. They are schema-rich—each entity type has many attributes with different data types, constraints, and relationships.

Entity Hierarchy: Facility to Measurements Entity Hierarchy Facility location, type, status Equipment manufacturer, model Sensor type, unit, Hz range Signal channel, quality_code Measurements timestamp, value (billions) facility_id equipment_id sensor_id signal_id Key Attributes at Each Level Facility name, location facility_type commissioned_date status, metadata (JSONB) Equipment manufacturer, model serial_number production_line operating_params (JSONB) Sensor sensor_type, unit sampling_rate_hz min/max range calibration_date Measurements time (TIMESTAMPTZ) sensor_id (FK) value (DOUBLE) Hypertable—billions of rows

Time-Series: Append-Only, High Volume, and Time-Indexed

Sensor readings are the opposite in nearly every respect. A typical reading consists of just three fields: timestamp, sensor_id, and value. A few additional channels may exist for multi-axis sensors (x, y, z for accelerometers). The schema is narrow and rarely changes.

The volume, however, is substantial. A single vibration sensor sampling at 1 kHz generates 86.4 million readings per day. Even at a modest 1 Hz sampling rate, 500 sensors produce 43.2 million readings per day—approximately 15.8 billion per year. These data are append-only (historical readings are almost never updated), time-indexed (every query includes a time range), and write-heavy (ingestion throughput is important).

Characteristics Comparison

Characteristic Metadata Time-Series
Schema Wide, complex, many tables Narrow (timestamp, id, value)
Volume Thousands to millions of rows Billions to trillions of rows
Write pattern Infrequent updates, inserts Continuous high-throughput appends
Read pattern Lookups, JOINs, tree traversal Range scans by time, aggregations
Relationships Rich foreign keys, hierarchies Single FK (sensor_id)
Mutability Updates and deletes common Append-only, rarely modified
Indexing B-tree, GIN, full-text Time-partitioned, BRIN
Retention Keep forever Tiered (raw → downsampled → archived)

 

Common Mistakes

Teams typically fall into one of three traps:

Mistake 1: Embedding metadata in every time-series row. Instead of storing (timestamp, sensor_id, value), the row stores (timestamp, sensor_id, value, building_name, machine_name, manufacturer, sensor_type, unit, ...). A row that should be 24 bytes becomes 500 bytes. With billions of rows, this results in terabytes of redundant data, slower queries, and serious difficulty when metadata change (does one backfill every historical row?).

Mistake 2: Complete separation without proper linking. Metadata reside in PostgreSQL, time-series in InfluxDB, and the only link is a sensor-name string entered manually. For teams operating this kind of split architecture and considering migration of the InfluxDB side to a lakehouse, the InfluxDB-to-AWS Iceberg pipeline guide describes how to do so while preserving the sensor-id bridge. Sensor names change, new sensors are added to the time-series database without being registered in the metadata database, and suddenly 15 per cent of readings are orphaned—data exist for sensors absent from the metadata system.

Mistake 3: Using one database for everything. Forcing all data into PostgreSQL makes time-series queries slow (no time-partitioning, no columnar compression). Forcing everything into InfluxDB makes metadata queries impossible (no JOINs, no foreign keys, no transactions). Neither database excels at the other’s workload.

Key Takeaway: The sensor_id is the bridge between metadata and time-series. The architecture must make it straightforward to begin from either side—filtering by metadata attributes and then fetching time-series, or detecting time-series anomalies and then retrieving the metadata context.

Architecture Patterns

There is no single “right” architecture for combining metadata and time-series data. The most appropriate choice depends on scale, team expertise, existing infrastructure, and query patterns. Four proven patterns are described below, from the most commonly recommended to the most specialised.

Pattern 1: PostgreSQL with TimescaleDB (Recommended)

This is the pattern recommended for most teams and the one to which the discussion devotes the most attention. TimescaleDB is a PostgreSQL extension that adds time-series capabilities—hypertables, automatic time partitioning, continuous aggregates, and compression—while preserving full PostgreSQL functionality. Because it runs within PostgreSQL, native SQL JOINs are available between metadata tables and time-series hypertables.

The complete schema is shown below:

-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- ============================================
-- METADATA TABLES
-- ============================================

CREATE TABLE facilities (
    id          SERIAL PRIMARY KEY,
    name        VARCHAR(200) NOT NULL,
    location    VARCHAR(500),
    facility_type VARCHAR(50) NOT NULL,  -- 'manufacturing', 'warehouse', 'office'
    commissioned_date DATE,
    status      VARCHAR(20) DEFAULT 'active',
    metadata    JSONB DEFAULT '{}',
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    updated_at  TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE equipment (
    id              SERIAL PRIMARY KEY,
    facility_id     INTEGER NOT NULL REFERENCES facilities(id),
    name            VARCHAR(200) NOT NULL,
    equipment_type  VARCHAR(50) NOT NULL,  -- 'cnc', 'robot', 'conveyor', 'pump'
    manufacturer    VARCHAR(200),
    model           VARCHAR(200),
    serial_number   VARCHAR(100) UNIQUE,
    install_date    DATE,
    production_line VARCHAR(100),
    status          VARCHAR(20) DEFAULT 'operational',
    operating_params JSONB DEFAULT '{}',
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_equipment_facility ON equipment(facility_id);
CREATE INDEX idx_equipment_type ON equipment(equipment_type);
CREATE INDEX idx_equipment_manufacturer ON equipment(manufacturer);
CREATE INDEX idx_equipment_line ON equipment(production_line);

CREATE TABLE sensors (
    id                SERIAL PRIMARY KEY,
    equipment_id      INTEGER NOT NULL REFERENCES equipment(id),
    name              VARCHAR(200) NOT NULL,
    sensor_type       VARCHAR(50) NOT NULL,   -- 'temperature', 'vibration', 'pressure'
    unit              VARCHAR(20) NOT NULL,    -- 'celsius', 'mm/s', 'bar', 'A'
    sampling_rate_hz  REAL DEFAULT 1.0,
    min_range         REAL,
    max_range         REAL,
    calibration_date  DATE,
    firmware_version  VARCHAR(50),
    is_active         BOOLEAN DEFAULT TRUE,
    tags              JSONB DEFAULT '{}',
    created_at        TIMESTAMPTZ DEFAULT NOW(),
    updated_at        TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_sensors_equipment ON sensors(equipment_id);
CREATE INDEX idx_sensors_type ON sensors(sensor_type);
CREATE INDEX idx_sensors_active ON sensors(is_active) WHERE is_active = TRUE;
CREATE INDEX idx_sensors_tags ON sensors USING GIN(tags);

CREATE TABLE maintenance_logs (
    id              SERIAL PRIMARY KEY,
    equipment_id    INTEGER NOT NULL REFERENCES equipment(id),
    maintenance_type VARCHAR(50) NOT NULL,  -- 'preventive', 'corrective', 'calibration'
    description     TEXT,
    performed_at    TIMESTAMPTZ NOT NULL,
    completed_at    TIMESTAMPTZ,
    technician      VARCHAR(200),
    parts_replaced  JSONB DEFAULT '[]',
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_maintenance_equipment ON maintenance_logs(equipment_id);
CREATE INDEX idx_maintenance_time ON maintenance_logs(performed_at);

-- ============================================
-- TIME-SERIES TABLES (TimescaleDB Hypertables)
-- ============================================

CREATE TABLE sensor_readings (
    time        TIMESTAMPTZ NOT NULL,
    sensor_id   INTEGER NOT NULL REFERENCES sensors(id),
    value       DOUBLE PRECISION NOT NULL
);

SELECT create_hypertable('sensor_readings', 'time');

CREATE INDEX idx_readings_sensor_time ON sensor_readings (sensor_id, time DESC);

-- Enable compression (after 7 days)
ALTER TABLE sensor_readings SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'sensor_id',
    timescaledb.compress_orderby = 'time DESC'
);

SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');

-- Anomaly events table
CREATE TABLE anomaly_events (
    id              SERIAL PRIMARY KEY,
    sensor_id       INTEGER NOT NULL REFERENCES sensors(id),
    start_time      TIMESTAMPTZ NOT NULL,
    end_time        TIMESTAMPTZ,
    anomaly_type    VARCHAR(50) NOT NULL,  -- 'threshold', 'trend', 'pattern'
    severity        VARCHAR(20) NOT NULL,  -- 'low', 'medium', 'high', 'critical'
    value_at_detection DOUBLE PRECISION,
    model_version   VARCHAR(50),
    notes           TEXT,
    acknowledged    BOOLEAN DEFAULT FALSE,
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_anomaly_sensor ON anomaly_events(sensor_id);
CREATE INDEX idx_anomaly_time ON anomaly_events(start_time);

Populating the anomaly_events table in real time is a natural fit for complex event processing with Apache Flink CEP, which can detect multi-event anomaly patterns across thousands of sensor streams with millisecond latency.

Tip: The compress_segmentby = 'sensor_id' setting is important. It instructs TimescaleDB to group compressed data by sensor, which means queries filtered by sensor_id only decompress the relevant segments. Without this setting, every query would decompress entire chunks.

The power of native JOINs is illustrated below. The following queries cross the metadata/time-series boundary without difficulty:

-- Query 1: Average temperature for all sensors in Building A, last 24 hours
SELECT
    f.name AS facility,
    e.name AS equipment,
    s.name AS sensor,
    AVG(r.value) AS avg_temp,
    MIN(r.value) AS min_temp,
    MAX(r.value) AS max_temp
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
  AND s.sensor_type = 'temperature'
  AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY f.name, e.name, s.name
ORDER BY avg_temp DESC;

-- Query 2: FANUC machines with vibration exceeding threshold
SELECT
    e.name AS machine,
    e.model,
    s.name AS sensor,
    s.max_range AS threshold,
    MAX(r.value) AS peak_vibration,
    COUNT(*) AS exceedance_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.manufacturer = 'FANUC'
  AND s.sensor_type = 'vibration'
  AND r.value > s.max_range
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, e.model, s.name, s.max_range
ORDER BY peak_vibration DESC;

-- Query 3: Compare vibration across CNC machines on Production Line 3
SELECT
    e.name AS machine,
    time_bucket('1 hour', r.time) AS hour,
    AVG(r.value) AS avg_vibration,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY r.value) AS p95_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
  AND e.equipment_type = 'cnc'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, hour
ORDER BY e.name, hour;

Each query seamlessly combines metadata filters (facility name, manufacturer, production line, sensor type) with time-series operations (time ranges, aggregations, percentiles). This is the principal advantage of the PostgreSQL + TimescaleDB pattern: a single SQL statement can traverse the entire data model.

Pattern 2: PostgreSQL with InfluxDB

When InfluxDB is already part of the stack, or when write throughput exceeds what PostgreSQL can handle (generally above 500,000 inserts per second on a single node), a split architecture is appropriate. Metadata remain in PostgreSQL, time-series move to InfluxDB, and the application performs the JOIN.

import asyncpg
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

class DualDatabaseQuery:
    def __init__(self, pg_dsn: str, influx_url: str, influx_token: str, influx_org: str):
        self.pg_dsn = pg_dsn
        self.influx = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
        self.query_api = self.influx.query_api()

    async def get_readings_by_facility(
        self, facility_name: str, sensor_type: str, hours: int = 24
    ):
        # Step 1: Query metadata from PostgreSQL
        conn = await asyncpg.connect(self.pg_dsn)
        sensors = await conn.fetch("""
            SELECT s.id, s.name, e.name AS equipment_name
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE f.name = $1 AND s.sensor_type = $2 AND s.is_active = TRUE
        """, facility_name, sensor_type)
        await conn.close()

        if not sensors:
            return []

        # Step 2: Query time-series from InfluxDB, filtered by sensor IDs
        sensor_ids = [str(s['id']) for s in sensors]
        sensor_filter = ' or '.join(
            f'r["sensor_id"] == "{sid}"' for sid in sensor_ids
        )

        flux_query = f'''
        from(bucket: "sensor_data")
          |> range(start: -{hours}h)
          |> filter(fn: (r) => r["_measurement"] == "readings")
          |> filter(fn: (r) => {sensor_filter})
          |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
        '''
        tables = self.query_api.query(flux_query)

        # Step 3: Merge metadata with time-series results
        sensor_lookup = {str(s['id']): s for s in sensors}
        results = []
        for table in tables:
            for record in table.records:
                sid = record.values.get("sensor_id")
                meta = sensor_lookup.get(sid, {})
                results.append({
                    "time": record.get_time(),
                    "sensor_id": sid,
                    "sensor_name": meta.get("name"),
                    "equipment": meta.get("equipment_name"),
                    "value": record.get_value(),
                })
        return results
Caution: The two-step query pattern (metadata first, then time-series) places consistency responsibilities on the application. If a sensor is deleted from PostgreSQL but readings still exist in InfluxDB, orphaned data result. Sensor-id existence should always be validated before writing to InfluxDB.

The PostgreSQL + InfluxDB pattern works but sacrifices the elegance of native JOINs. Every cross-domain query requires two round-trips, and complex queries (such as “compare vibration patterns across machines by manufacturer”) require substantial application-level logic. This pattern is appropriate when InfluxDB is already in production and migration is not feasible, or when write throughput genuinely exceeds PostgreSQL/TimescaleDB limits.

Pattern 3: PostgreSQL with Parquet/Iceberg on S3

For very large-scale deployments (terabytes of time-series data) or when the primary consumer is batch ML training pipelines, storing time-series data as Parquet files on S3 is cost-effective and scalable. Metadata remain in PostgreSQL, and joins are performed at query time using DuckDB, Athena, or Spark.

import duckdb
import asyncpg
from pathlib import Path

class ParquetTimeSeriesQuery:
    """
    Time-series stored as Parquet files on S3, partitioned by:
    s3://data-lake/sensor_readings/sensor_id={id}/date={YYYY-MM-DD}/data.parquet
    """

    def __init__(self, pg_dsn: str, s3_base: str):
        self.pg_dsn = pg_dsn
        self.s3_base = s3_base
        self.duck = duckdb.connect()
        self.duck.execute("INSTALL httpfs; LOAD httpfs;")
        self.duck.execute("SET s3_region='us-east-1';")

    async def query_with_metadata(
        self, facility_name: str, sensor_type: str, start_date: str, end_date: str
    ):
        # Step 1: Get relevant sensor IDs from PostgreSQL
        conn = await asyncpg.connect(self.pg_dsn)
        sensors = await conn.fetch("""
            SELECT s.id, s.name, s.unit, e.name AS equipment,
                   e.manufacturer, f.name AS facility
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE f.name = $1 AND s.sensor_type = $2
        """, facility_name, sensor_type)
        await conn.close()

        # Step 2: Build Parquet glob paths for relevant sensors
        sensor_ids = [s['id'] for s in sensors]
        paths = [
            f"{self.s3_base}/sensor_id={sid}/date=*/data.parquet"
            for sid in sensor_ids
        ]

        # Step 3: Query with DuckDB
        result = self.duck.execute(f"""
            SELECT
                sensor_id,
                date_trunc('hour', time) AS hour,
                AVG(value) AS avg_value,
                MAX(value) AS max_value,
                COUNT(*) AS reading_count
            FROM parquet_scan({paths})
            WHERE time BETWEEN '{start_date}' AND '{end_date}'
            GROUP BY sensor_id, hour
            ORDER BY sensor_id, hour
        """).fetchdf()

        # Step 4: Merge with metadata
        sensor_lookup = {s['id']: dict(s) for s in sensors}
        result['equipment'] = result['sensor_id'].map(
            lambda sid: sensor_lookup.get(sid, {}).get('equipment')
        )
        result['facility'] = result['sensor_id'].map(
            lambda sid: sensor_lookup.get(sid, {}).get('facility')
        )
        return result

This pattern is best suited to data lakes and ML training pipelines requiring cost-effective processing of large volumes of historical data. Parquet’s columnar format provides excellent compression (ten to twenty times that of CSV), and partitioning by sensor_id and date ensures that queries read only the relevant files. The pattern is poorly suited, however, to real-time queries or dashboards that require sub-second response times.

Pattern 4: TDengine Super Tables

TDengine takes a substantially different approach. Its “super table” concept embeds metadata as tags directly alongside time-series data. Each physical sensor receives a sub-table inheriting from a super table, and tags (metadata) are stored only once per sub-table rather than repeated in every row.

-- Create a super table with tags (metadata) and columns (time-series)
CREATE STABLE sensor_readings (
    ts          TIMESTAMP,
    value       DOUBLE,
    quality     INT
) TAGS (
    facility    NCHAR(200),
    building    NCHAR(100),
    equipment   NCHAR(200),
    manufacturer NCHAR(200),
    sensor_type NCHAR(50),
    unit        NCHAR(20),
    line        NCHAR(100)
);

-- Create sub-tables for each sensor (tags are set once)
CREATE TABLE sensor_0001 USING sensor_readings TAGS (
    'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'vibration', 'mm/s', 'Line 3'
);

CREATE TABLE sensor_0002 USING sensor_readings TAGS (
    'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'temperature', 'celsius', 'Line 3'
);

-- Insert data (just timestamp + values, no metadata repetition)
INSERT INTO sensor_0001 VALUES (NOW(), 4.52, 100);
INSERT INTO sensor_0002 VALUES (NOW(), 67.3, 100);

-- Query across all sensors using metadata tags
SELECT
    facility,
    equipment,
    AVG(value) AS avg_vibration
FROM sensor_readings
WHERE sensor_type = 'vibration'
  AND facility = 'Plant Chicago'
  AND ts > NOW() - 24h
GROUP BY facility, equipment;

TDengine’s approach is elegant for IoT: metadata reside alongside the data, tags are indexed automatically, and a separate metadata database is not required. The disadvantage is that complex metadata relationships (maintenance logs, calibration history, hierarchical queries) are difficult to model with flat tags. If the metadata are simple and relatively static, TDengine is worth considering; if rich relational metadata are required, Pattern 1 or Pattern 2 should be preferred.

Pattern Comparison

Criteria PG + TimescaleDB PG + InfluxDB PG + Parquet/S3 TDengine
Complexity Low Medium Medium-High Low
Native JOINs Yes No (app-level) No (query engine) Tags only
Write throughput 100K-500K rows/s 1M+ rows/s Batch (unlimited) 1M+ rows/s
Query flexibility Full SQL Flux + SQL SQL (DuckDB/Athena) SQL subset
Metadata richness Full relational Full relational Full relational Flat tags only
Scalability TB scale TB scale PB scale TB scale
Best for Most teams Existing InfluxDB Data lakes, ML Simple IoT

 

Detailed Schema Design Best Practices

Regardless of the architecture pattern chosen, certain schema-design principles apply universally. The most important are discussed below.

Hierarchical Facility Modelling

Facility hierarchies are inherently tree-structured. Queries such as “all sensors in Building A” must be answered efficiently, which requires identifying every piece of equipment in every production line in that building. Two effective approaches exist in PostgreSQL.

Approach 1: the ltree extension.

CREATE EXTENSION IF NOT EXISTS ltree;

-- Add a path column to each entity
ALTER TABLE facilities ADD COLUMN path ltree;
ALTER TABLE equipment ADD COLUMN path ltree;
ALTER TABLE sensors ADD COLUMN path ltree;

-- Example paths
-- Facility: 'org.chicago'
-- Equipment: 'org.chicago.building_a.line_3.cnc_001'
-- Sensor: 'org.chicago.building_a.line_3.cnc_001.vibration_x'

CREATE INDEX idx_facility_path ON facilities USING GIST(path);
CREATE INDEX idx_equipment_path ON equipment USING GIST(path);
CREATE INDEX idx_sensor_path ON sensors USING GIST(path);

-- Find all sensors under Building A (any depth)
SELECT s.* FROM sensors s
WHERE s.path <@ 'org.chicago.building_a';

-- Find all equipment exactly 2 levels below org.chicago
SELECT e.* FROM equipment e
WHERE e.path ~ 'org.chicago.*{2}';

Approach 2: recursive CTEs with adjacency list.

If extensions are to be avoided, recursive CTEs work well for moderate-sized hierarchies:

-- Find all equipment under a specific facility, including nested structures
WITH RECURSIVE facility_tree AS (
    -- Base case: the target facility
    SELECT id, name, facility_type, id AS root_id
    FROM facilities
    WHERE name = 'Building A'

    UNION ALL

    -- Recursive case: equipment belonging to facilities in the tree
    SELECT e.id, e.name, e.equipment_type, ft.root_id
    FROM equipment e
    JOIN facility_tree ft ON e.facility_id = ft.id
)
SELECT * FROM facility_tree;

Slowly Changing Dimensions (SCD Type 2)

Equipment moves between production lines, sensors are recalibrated, and firmware is updated. Simply overwriting the old value removes the ability to interpret historical data correctly. A vibration reading from last month should be evaluated against the calibration that was active at that time, not against today's calibration.

SCD Type 2 addresses this requirement by maintaining a history of changes with effective date ranges:

CREATE TABLE sensor_history (
    id              SERIAL PRIMARY KEY,
    sensor_id       INTEGER NOT NULL REFERENCES sensors(id),
    equipment_id    INTEGER NOT NULL REFERENCES equipment(id),
    calibration_date DATE,
    min_range       REAL,
    max_range       REAL,
    firmware_version VARCHAR(50),
    effective_from  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    effective_to    TIMESTAMPTZ,  -- NULL means "current"
    is_current      BOOLEAN DEFAULT TRUE
);

CREATE INDEX idx_sensor_history_current
    ON sensor_history(sensor_id) WHERE is_current = TRUE;

CREATE INDEX idx_sensor_history_range
    ON sensor_history(sensor_id, effective_from, effective_to);

-- When recalibrating a sensor:
-- Step 1: Close the current record
UPDATE sensor_history
SET effective_to = NOW(), is_current = FALSE
WHERE sensor_id = 42 AND is_current = TRUE;

-- Step 2: Insert new record
INSERT INTO sensor_history
    (sensor_id, equipment_id, calibration_date, min_range, max_range,
     firmware_version, effective_from, is_current)
VALUES
    (42, 15, '2026-04-01', 0, 100, 'v3.2.1', NOW(), TRUE);

-- Query: What was the calibration when this anomaly was detected?
SELECT sh.*
FROM sensor_history sh
JOIN anomaly_events ae ON ae.sensor_id = sh.sensor_id
WHERE ae.id = 789
  AND ae.start_time BETWEEN sh.effective_from
      AND COALESCE(sh.effective_to, '9999-12-31'::timestamptz);

JSONB for Flexible Attributes

Not every piece of equipment shares the same attributes. A CNC machine has spindle speed and tool count; a conveyor has belt speed and length; a robot has axis count and payload capacity. Rather than creating separate tables for each equipment type, JSONB columns may be used for type-specific attributes:

-- Equipment with flexible operating parameters
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer,
                       model, operating_params)
VALUES
(1, 'CNC-001', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', '{
    "max_spindle_rpm": 24000,
    "tool_capacity": 21,
    "axes": 5,
    "max_feed_rate_mm_min": 54000
}'::jsonb),
(1, 'Robot-001', 'robot', 'ABB', 'IRB 6700', '{
    "axes": 6,
    "payload_kg": 150,
    "reach_mm": 2650,
    "repeatability_mm": 0.05
}'::jsonb);

-- Query: Find all robots with payload > 100kg
SELECT name, model, operating_params->>'payload_kg' AS payload
FROM equipment
WHERE equipment_type = 'robot'
  AND (operating_params->>'payload_kg')::numeric > 100;

-- Index for fast JSONB queries
CREATE INDEX idx_equipment_params ON equipment USING GIN(operating_params);

Tagging System for Ad-Hoc Grouping

Beyond the formal hierarchy, teams often need to group sensors by arbitrary criteria, such as "all sensors involved in the Q1 reliability study," "sensors monitored by the ML anomaly-detection model," or "critical sensors requiring 24/7 alerting." A flexible tagging system supports this requirement:

-- Sensors table already has a JSONB 'tags' column
-- Usage examples:
UPDATE sensors SET tags = '{
    "monitoring_group": "critical_24x7",
    "ml_model": "vibration_anomaly_v2",
    "study": "q1_reliability",
    "zone": "high_temperature"
}'::jsonb
WHERE id = 42;

-- Find all sensors in a monitoring group
SELECT s.*, e.name AS equipment
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
WHERE s.tags @> '{"monitoring_group": "critical_24x7"}';

-- Find sensors enrolled in a specific ML model
SELECT s.id, s.name, s.sensor_type
FROM sensors s
WHERE s.tags @> '{"ml_model": "vibration_anomaly_v2"}';

Data Ingestion Pipeline

Reliable transfer of data from sensors into the database is half the work. A production ingestion pipeline typically follows this path:

Sensors → MQTT/Modbus → Kafka/MQTT Broker → Telegraf or Custom Consumer → Database

Telegraf Configuration

Telegraf is a widely used agent for collecting and forwarding sensor data. The configuration below reads from MQTT, enriches with metadata tags, and writes to TimescaleDB:

# telegraf.conf
[[inputs.mqtt_consumer]]
  servers = ["tcp://mqtt-broker:1883"]
  topics = ["sensors/+/readings"]
  data_format = "json"
  tag_keys = ["sensor_id"]
  json_time_key = "timestamp"
  json_time_format = "2006-01-02T15:04:05Z07:00"

# Enrich with metadata from a lookup file (updated periodically)
[[processors.enum]]
  [[processors.enum.mapping]]
    tag = "sensor_id"
    dest = "sensor_type"
    [processors.enum.mapping.value_mappings]
      "S-0001" = "vibration"
      "S-0002" = "temperature"

[[outputs.postgresql]]
  connection = "postgres://user:pass@localhost/sensordb"
  table_template = """
    INSERT INTO sensor_readings (time, sensor_id, value)
    VALUES ({time}, {sensor_id}::integer, {value})
  """

Python Ingestion Script with Validation

For greater control, a custom Python ingestion script can validate sensor IDs against metadata, handle errors, and batch inserts:

import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Optional

import asyncpg
import aiomqtt

logger = logging.getLogger(__name__)


class SensorDataIngester:
    """Ingests sensor readings with metadata validation."""

    def __init__(self, pg_dsn: str, mqtt_host: str, mqtt_port: int = 1883):
        self.pg_dsn = pg_dsn
        self.mqtt_host = mqtt_host
        self.mqtt_port = mqtt_port
        self.pool: Optional[asyncpg.Pool] = None
        self.valid_sensors: set[int] = set()
        self.batch: list[tuple] = []
        self.batch_size = 1000
        self.flush_interval = 5  # seconds

    async def start(self):
        """Initialize connections and start ingestion."""
        self.pool = await asyncpg.create_pool(self.pg_dsn, min_size=2, max_size=10)
        await self._load_valid_sensors()

        # Run batch flusher and MQTT listener concurrently
        await asyncio.gather(
            self._mqtt_listener(),
            self._periodic_flush(),
            self._periodic_sensor_refresh(),
        )

    async def _load_valid_sensors(self):
        """Load active sensor IDs from metadata database."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT id FROM sensors WHERE is_active = TRUE"
            )
            self.valid_sensors = {row['id'] for row in rows}
            logger.info(f"Loaded {len(self.valid_sensors)} active sensors")

    async def _periodic_sensor_refresh(self):
        """Refresh valid sensor list every 5 minutes."""
        while True:
            await asyncio.sleep(300)
            await self._load_valid_sensors()

    async def _mqtt_listener(self):
        """Listen for sensor readings on MQTT."""
        async with aiomqtt.Client(self.mqtt_host, self.mqtt_port) as client:
            await client.subscribe("sensors/+/readings")
            async for message in client.messages:
                try:
                    payload = json.loads(message.payload)
                    sensor_id = int(payload['sensor_id'])

                    # Validate against metadata
                    if sensor_id not in self.valid_sensors:
                        logger.warning(
                            f"Rejected reading from unknown sensor {sensor_id}"
                        )
                        continue

                    timestamp = datetime.fromisoformat(payload['timestamp'])
                    if timestamp.tzinfo is None:
                        timestamp = timestamp.replace(tzinfo=timezone.utc)

                    value = float(payload['value'])

                    self.batch.append((timestamp, sensor_id, value))

                    if len(self.batch) >= self.batch_size:
                        await self._flush_batch()

                except (json.JSONDecodeError, KeyError, ValueError) as e:
                    logger.error(f"Invalid message: {e}")

    async def _periodic_flush(self):
        """Flush batch at regular intervals."""
        while True:
            await asyncio.sleep(self.flush_interval)
            if self.batch:
                await self._flush_batch()

    async def _flush_batch(self):
        """Insert batch of readings into TimescaleDB."""
        if not self.batch:
            return

        batch_to_insert = self.batch.copy()
        self.batch.clear()

        try:
            async with self.pool.acquire() as conn:
                await conn.executemany(
                    """INSERT INTO sensor_readings (time, sensor_id, value)
                       VALUES ($1, $2, $3)""",
                    batch_to_insert
                )
                logger.info(f"Inserted {len(batch_to_insert)} readings")
        except Exception as e:
            logger.error(f"Batch insert failed: {e}")
            # Re-add failed batch for retry
            self.batch.extend(batch_to_insert)


# Data quality checks
async def check_data_quality(pool: asyncpg.Pool):
    """Detect common data quality issues."""
    async with pool.acquire() as conn:
        # Orphaned readings (sensor_id not in sensors table)
        orphaned = await conn.fetchval("""
            SELECT COUNT(DISTINCT r.sensor_id)
            FROM sensor_readings r
            LEFT JOIN sensors s ON s.id = r.sensor_id
            WHERE s.id IS NULL
              AND r.time > NOW() - INTERVAL '24 hours'
        """)

        # Sensors with no recent readings (possible failure)
        silent = await conn.fetch("""
            SELECT s.id, s.name, e.name AS equipment,
                   MAX(r.time) AS last_reading
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            LEFT JOIN sensor_readings r ON r.sensor_id = s.id
                AND r.time > NOW() - INTERVAL '24 hours'
            WHERE s.is_active = TRUE
            GROUP BY s.id, s.name, e.name
            HAVING MAX(r.time) IS NULL
               OR MAX(r.time) < NOW() - INTERVAL '1 hour'
        """)

        # Sensors with values outside their calibrated range
        out_of_range = await conn.fetch("""
            SELECT s.id, s.name, s.min_range, s.max_range,
                   MIN(r.value) AS min_val, MAX(r.value) AS max_val,
                   COUNT(*) AS violation_count
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            WHERE r.time > NOW() - INTERVAL '24 hours'
              AND (r.value < s.min_range OR r.value > s.max_range)
            GROUP BY s.id, s.name, s.min_range, s.max_range
        """)

        return {
            "orphaned_sensor_ids": orphaned,
            "silent_sensors": [dict(r) for r in silent],
            "out_of_range_sensors": [dict(r) for r in out_of_range],
        }
Tip: The _load_valid_sensors() method caches active sensor IDs in memory and refreshes every five minutes. This avoids a database round-trip for every incoming message while ensuring new sensor registrations are detected within a reasonable interval.

Handling Late-Arriving and Out-of-Order Data

In real-world deployments, data do not always arrive in order. Network delays, edge-device buffering, and batch uploads from remote sites all produce out-of-order events. TimescaleDB handles this situation gracefully: inserts are not required to be in time order. If continuous aggregates or materialised views are used, however, a refresh policy must be configured that covers the maximum expected delay:

-- Continuous aggregate that tolerates late data (up to 1 hour)
CREATE MATERIALIZED VIEW hourly_averages
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;

-- Refresh policy: refresh the last 2 hours every 30 minutes
SELECT add_continuous_aggregate_policy('hourly_averages',
    start_offset => INTERVAL '2 hours',
    end_offset => INTERVAL '30 minutes',
    schedule_interval => INTERVAL '30 minutes'
);

Querying Across Metadata and Time-Series

The genuine value of a well-designed schema emerges when queries cross the metadata/time-series boundary. Five common query patterns are presented below, each with complete SQL and Python implementations.

Query Flow: From User Query to Aggregated Result Query Execution Flow User Query "Vibration anomalies in Building A, CNC" Join Metadata PostgreSQL: resolve facility → sensor IDs Filter Time-Series TimescaleDB: scan hypertable by time range Aggregated Result AVG / MAX / P99 enriched with metadata What Happens at Each Step Step 1—User Query Client sends structured request with filters: location, sensor_type, time window Step 2,Metadata JOIN JOIN facilities → equipment → sensors to collect matching sensor_id set. Uses B-tree indexes. Step 3—Time-Series Scan Hypertable chunk pruning by time range. Decompress only matching sensor_id segments. Step 4—Result time_bucket aggregations returned with equipment name, facility, sensor context attached.

All Readings by Location and Sensor Type

-- All vibration readings from sensors in Building A, last 7 days
-- Using TimescaleDB time_bucket for efficient aggregation
SELECT
    time_bucket('15 minutes', r.time) AS period,
    e.name AS equipment,
    s.name AS sensor,
    AVG(r.value) AS avg_vibration,
    MAX(r.value) AS peak_vibration,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY r.value) AS p99_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '7 days'
GROUP BY period, e.name, s.name
ORDER BY period DESC, peak_vibration DESC;

Average Daily Values Grouped by Manufacturer

-- Average daily temperature per facility, grouped by equipment manufacturer
SELECT
    f.name AS facility,
    e.manufacturer,
    time_bucket('1 day', r.time) AS day,
    AVG(r.value) AS avg_temperature,
    COUNT(DISTINCT s.id) AS sensor_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.sensor_type = 'temperature'
  AND r.time > NOW() - INTERVAL '30 days'
GROUP BY f.name, e.manufacturer, day
ORDER BY f.name, e.manufacturer, day;

Equipment with Sensors Exceeding Their Range

-- Find equipment where any sensor exceeded its max_range in the past month
SELECT
    f.name AS facility,
    e.name AS equipment,
    e.manufacturer,
    s.name AS sensor,
    s.sensor_type,
    s.max_range AS threshold,
    MAX(r.value) AS peak_value,
    COUNT(*) FILTER (WHERE r.value > s.max_range) AS exceedance_count,
    MIN(r.time) FILTER (WHERE r.value > s.max_range) AS first_exceedance,
    MAX(r.time) FILTER (WHERE r.value > s.max_range) AS last_exceedance
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE r.time > NOW() - INTERVAL '30 days'
  AND s.max_range IS NOT NULL
GROUP BY f.name, e.name, e.manufacturer, s.name, s.sensor_type, s.max_range
HAVING COUNT(*) FILTER (WHERE r.value > s.max_range) > 0
ORDER BY exceedance_count DESC;

Readings Before and After Maintenance

-- Compare sensor readings 24 hours before and after a maintenance event
WITH maintenance AS (
    SELECT id, equipment_id, performed_at, maintenance_type
    FROM maintenance_logs
    WHERE id = 456  -- specific maintenance event
),
before_maintenance AS (
    SELECT
        s.name AS sensor,
        s.sensor_type,
        AVG(r.value) AS avg_value,
        STDDEV(r.value) AS stddev_value,
        'before' AS period
    FROM sensor_readings r
    JOIN sensors s ON s.id = r.sensor_id
    JOIN maintenance m ON s.equipment_id = m.equipment_id
    WHERE r.time BETWEEN m.performed_at - INTERVAL '24 hours' AND m.performed_at
    GROUP BY s.name, s.sensor_type
),
after_maintenance AS (
    SELECT
        s.name AS sensor,
        s.sensor_type,
        AVG(r.value) AS avg_value,
        STDDEV(r.value) AS stddev_value,
        'after' AS period
    FROM sensor_readings r
    JOIN sensors s ON s.id = r.sensor_id
    JOIN maintenance m ON s.equipment_id = m.equipment_id
    WHERE r.time BETWEEN m.performed_at AND m.performed_at + INTERVAL '24 hours'
    GROUP BY s.name, s.sensor_type
)
SELECT
    b.sensor,
    b.sensor_type,
    b.avg_value AS avg_before,
    a.avg_value AS avg_after,
    ROUND(((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0) * 100)::numeric, 2)
        AS pct_change,
    b.stddev_value AS stddev_before,
    a.stddev_value AS stddev_after
FROM before_maintenance b
JOIN after_maintenance a ON a.sensor = b.sensor
ORDER BY ABS((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0)) DESC;

Anomaly Events with Full Context

-- Anomaly events for FANUC robots installed in 2024, with full context
SELECT
    ae.id AS anomaly_id,
    ae.anomaly_type,
    ae.severity,
    ae.start_time,
    ae.end_time,
    ae.value_at_detection,
    s.name AS sensor,
    s.sensor_type,
    s.max_range,
    e.name AS equipment,
    e.manufacturer,
    e.model,
    e.install_date,
    f.name AS facility
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE e.manufacturer = 'FANUC'
  AND e.equipment_type = 'robot'
  AND e.install_date >= '2024-01-01'
  AND ae.start_time > NOW() - INTERVAL '90 days'
ORDER BY ae.severity DESC, ae.start_time DESC;

Python Query Service

Wrapping these queries in a service class provides a clean interface for application code:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

import asyncpg


@dataclass
class SensorReading:
    time: datetime
    sensor_id: int
    sensor_name: str
    equipment_name: str
    facility_name: str
    sensor_type: str
    value: float
    unit: str


class QueryService:
    """Combines metadata filtering with time-series queries."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def get_readings(
        self,
        facility: Optional[str] = None,
        equipment_type: Optional[str] = None,
        manufacturer: Optional[str] = None,
        sensor_type: Optional[str] = None,
        production_line: Optional[str] = None,
        tags: Optional[dict] = None,
        start: Optional[datetime] = None,
        end: Optional[datetime] = None,
        bucket_interval: str = '1 hour',
    ) -> list[dict]:
        """
        Flexible query combining metadata filters with time-series aggregation.
        """
        if start is None:
            start = datetime.utcnow() - timedelta(hours=24)
        if end is None:
            end = datetime.utcnow()

        conditions = ["r.time >= $1", "r.time <= $2"]
        params: list = [start, end]
        param_idx = 3

        if facility:
            conditions.append(f"f.name = ${param_idx}")
            params.append(facility)
            param_idx += 1

        if equipment_type:
            conditions.append(f"e.equipment_type = ${param_idx}")
            params.append(equipment_type)
            param_idx += 1

        if manufacturer:
            conditions.append(f"e.manufacturer = ${param_idx}")
            params.append(manufacturer)
            param_idx += 1

        if sensor_type:
            conditions.append(f"s.sensor_type = ${param_idx}")
            params.append(sensor_type)
            param_idx += 1

        if production_line:
            conditions.append(f"e.production_line = ${param_idx}")
            params.append(production_line)
            param_idx += 1

        if tags:
            conditions.append(f"s.tags @> ${param_idx}::jsonb")
            params.append(json.dumps(tags))
            param_idx += 1

        where_clause = " AND ".join(conditions)

        query = f"""
            SELECT
                time_bucket('{bucket_interval}', r.time) AS bucket,
                s.id AS sensor_id,
                s.name AS sensor_name,
                s.sensor_type,
                s.unit,
                e.name AS equipment_name,
                e.manufacturer,
                f.name AS facility_name,
                AVG(r.value) AS avg_value,
                MIN(r.value) AS min_value,
                MAX(r.value) AS max_value,
                COUNT(*) AS sample_count
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE {where_clause}
            GROUP BY bucket, s.id, s.name, s.sensor_type, s.unit,
                     e.name, e.manufacturer, f.name
            ORDER BY bucket DESC, sensor_name
        """

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)
            return [dict(r) for r in rows]

    async def get_equipment_health(self, equipment_id: int) -> dict:
        """Get comprehensive health status for a piece of equipment."""
        async with self.pool.acquire() as conn:
            # Equipment metadata
            equipment = await conn.fetchrow("""
                SELECT e.*, f.name AS facility_name
                FROM equipment e
                JOIN facilities f ON f.id = e.facility_id
                WHERE e.id = $1
            """, equipment_id)

            # Latest readings from all sensors
            latest_readings = await conn.fetch("""
                SELECT DISTINCT ON (s.id)
                    s.id AS sensor_id, s.name, s.sensor_type, s.unit,
                    s.min_range, s.max_range,
                    r.time AS last_reading_time,
                    r.value AS last_value,
                    CASE
                        WHEN r.value > s.max_range THEN 'exceeded'
                        WHEN r.value < s.min_range THEN 'below_range'
                        ELSE 'normal'
                    END AS range_status
                FROM sensors s
                LEFT JOIN sensor_readings r ON r.sensor_id = s.id
                    AND r.time > NOW() - INTERVAL '1 hour'
                WHERE s.equipment_id = $1 AND s.is_active = TRUE
                ORDER BY s.id, r.time DESC
            """, equipment_id)

            # Recent anomalies
            anomalies = await conn.fetch("""
                SELECT ae.*, s.name AS sensor_name, s.sensor_type
                FROM anomaly_events ae
                JOIN sensors s ON s.id = ae.sensor_id
                WHERE s.equipment_id = $1
                  AND ae.start_time > NOW() - INTERVAL '7 days'
                ORDER BY ae.start_time DESC
                LIMIT 20
            """, equipment_id)

            # Last maintenance
            last_maintenance = await conn.fetchrow("""
                SELECT * FROM maintenance_logs
                WHERE equipment_id = $1
                ORDER BY performed_at DESC LIMIT 1
            """, equipment_id)

            return {
                "equipment": dict(equipment) if equipment else None,
                "sensors": [dict(r) for r in latest_readings],
                "recent_anomalies": [dict(a) for a in anomalies],
                "last_maintenance": dict(last_maintenance) if last_maintenance else None,
                "overall_status": self._calculate_status(latest_readings, anomalies),
            }

    @staticmethod
    def _calculate_status(readings, anomalies) -> str:
        critical_anomalies = [a for a in anomalies if a['severity'] == 'critical']
        exceeded_sensors = [r for r in readings if r['range_status'] == 'exceeded']

        if critical_anomalies or len(exceeded_sensors) > 2:
            return "critical"
        elif exceeded_sensors or any(a['severity'] == 'high' for a in anomalies):
            return "warning"
        return "healthy"

API Design for Metadata and Time-Series

A well-designed API layer makes the combined metadata/time-series system accessible to dashboards, mobile applications, and other services. A FastAPI implementation that exposes the key endpoints is shown below:

from datetime import datetime, timedelta
from typing import Optional

import asyncpg
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel

app = FastAPI(title="Sensor Data API")
pool: asyncpg.Pool = None


@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/sensordb",
        min_size=5, max_size=20
    )


@app.on_event("shutdown")
async def shutdown():
    await pool.close()


# ---- Pydantic Models ----

class FacilityResponse(BaseModel):
    id: int
    name: str
    location: Optional[str]
    facility_type: str
    status: str
    equipment_count: int


class EquipmentResponse(BaseModel):
    id: int
    name: str
    equipment_type: str
    manufacturer: Optional[str]
    model: Optional[str]
    status: str
    sensor_count: int
    production_line: Optional[str]


class SensorReadingResponse(BaseModel):
    time: datetime
    value: float
    sensor_name: str
    sensor_type: str
    unit: str


class EquipmentHealthResponse(BaseModel):
    equipment_id: int
    equipment_name: str
    facility: str
    status: str
    sensors: list[dict]
    recent_anomalies: list[dict]
    last_maintenance: Optional[dict]


# ---- Endpoints ----

@app.get("/facilities/{facility_id}/equipment",
         response_model=list[EquipmentResponse])
async def list_equipment(facility_id: int):
    """List all equipment in a facility with metadata."""
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT e.id, e.name, e.equipment_type, e.manufacturer,
                   e.model, e.status, e.production_line,
                   COUNT(s.id) AS sensor_count
            FROM equipment e
            LEFT JOIN sensors s ON s.equipment_id = e.id AND s.is_active = TRUE
            WHERE e.facility_id = $1
            GROUP BY e.id
            ORDER BY e.production_line, e.name
        """, facility_id)

        if not rows:
            raise HTTPException(404, "Facility not found or has no equipment")
        return [dict(r) for r in rows]


@app.get("/sensors/{sensor_id}/readings",
         response_model=list[SensorReadingResponse])
async def get_sensor_readings(
    sensor_id: int,
    start: datetime = Query(default_factory=lambda: datetime.utcnow() - timedelta(hours=24)),
    end: datetime = Query(default_factory=datetime.utcnow),
    bucket: str = Query(default="15 minutes",
                        description="Aggregation interval, e.g. '5 minutes', '1 hour'"),
):
    """Get time-series readings for a sensor with metadata context."""
    async with pool.acquire() as conn:
        # Verify sensor exists and get metadata
        sensor = await conn.fetchrow("""
            SELECT s.name, s.sensor_type, s.unit
            FROM sensors s WHERE s.id = $1
        """, sensor_id)

        if not sensor:
            raise HTTPException(404, "Sensor not found")

        readings = await conn.fetch(f"""
            SELECT
                time_bucket('{bucket}', r.time) AS time,
                AVG(r.value) AS value
            FROM sensor_readings r
            WHERE r.sensor_id = $1
              AND r.time BETWEEN $2 AND $3
            GROUP BY time_bucket('{bucket}', r.time)
            ORDER BY time DESC
        """, sensor_id, start, end)

        return [
            {
                "time": r["time"],
                "value": round(r["value"], 4),
                "sensor_name": sensor["name"],
                "sensor_type": sensor["sensor_type"],
                "unit": sensor["unit"],
            }
            for r in readings
        ]


@app.get("/equipment/{equipment_id}/health",
         response_model=EquipmentHealthResponse)
async def get_equipment_health(equipment_id: int):
    """
    Combined health view: latest sensor readings + metadata + anomalies.
    Single endpoint that crosses metadata and time-series boundaries.
    """
    query_service = QueryService(pool)
    health = await query_service.get_equipment_health(equipment_id)

    if not health["equipment"]:
        raise HTTPException(404, "Equipment not found")

    return {
        "equipment_id": equipment_id,
        "equipment_name": health["equipment"]["name"],
        "facility": health["equipment"]["facility_name"],
        "status": health["overall_status"],
        "sensors": health["sensors"],
        "recent_anomalies": health["recent_anomalies"],
        "last_maintenance": health["last_maintenance"],
    }


@app.get("/facilities/{facility_id}/sensors/readings")
async def get_facility_readings(
    facility_id: int,
    sensor_type: Optional[str] = None,
    manufacturer: Optional[str] = None,
    production_line: Optional[str] = None,
    start: datetime = Query(
        default_factory=lambda: datetime.utcnow() - timedelta(hours=24)
    ),
    end: datetime = Query(default_factory=datetime.utcnow),
    bucket: str = "1 hour",
):
    """
    Get aggregated readings for all sensors in a facility,
    with optional metadata filters.
    """
    conditions = ["f.id = $1", "r.time >= $2", "r.time <= $3"]
    params = [facility_id, start, end]
    idx = 4

    if sensor_type:
        conditions.append(f"s.sensor_type = ${idx}")
        params.append(sensor_type)
        idx += 1

    if manufacturer:
        conditions.append(f"e.manufacturer = ${idx}")
        params.append(manufacturer)
        idx += 1

    if production_line:
        conditions.append(f"e.production_line = ${idx}")
        params.append(production_line)
        idx += 1

    where = " AND ".join(conditions)

    async with pool.acquire() as conn:
        rows = await conn.fetch(f"""
            SELECT
                time_bucket('{bucket}', r.time) AS time,
                e.name AS equipment,
                e.manufacturer,
                s.name AS sensor,
                s.sensor_type,
                s.unit,
                AVG(r.value) AS avg_value,
                MAX(r.value) AS max_value,
                MIN(r.value) AS min_value
            FROM sensor_readings r
            JOIN sensors s ON s.id = r.sensor_id
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE {where}
            GROUP BY time_bucket('{bucket}', r.time),
                     e.name, e.manufacturer, s.name, s.sensor_type, s.unit
            ORDER BY time DESC
        """, *params)

        return [dict(r) for r in rows]
Key Takeaway: The /equipment/{id}/health endpoint illustrates the value of combining metadata and time-series in a single API response. A dashboard can render equipment details, live sensor values, anomaly alerts, and maintenance history from a single API call.

Handling Scale

A system with 500 sensors at 1 Hz generates approximately 43 million readings per day. At 10 Hz, the figure rises to 432 million. Over the course of a year, this represents 15 to 150 billion rows. Without a data-lifecycle strategy, storage costs will grow linearly without limit.

Data Retention Policies

Data Tier Resolution Retention Storage Use Case
Raw Full resolution (1-1000 Hz) 30 days TimescaleDB (compressed) Real-time dashboards, debugging
Downsampled 1-minute or 5-minute averages 1 year TimescaleDB continuous aggregate Trend analysis, weekly reports
Aggregated Hourly or daily summaries Forever PostgreSQL regular table Historical comparisons, audits
Archived Full resolution 7 years Parquet on S3/Glacier Compliance, ML retraining

 

Implementing this with TimescaleDB:

-- Continuous aggregate: 5-minute downsampling (auto-maintained)
CREATE MATERIALIZED VIEW readings_5min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('5 minutes', time) AS bucket,
    sensor_id,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) AS median_value,
    COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('readings_5min',
    start_offset => INTERVAL '2 hours',
    end_offset => INTERVAL '30 minutes',
    schedule_interval => INTERVAL '30 minutes'
);

-- Continuous aggregate: hourly (built on top of 5-min aggregate)
CREATE MATERIALIZED VIEW readings_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', bucket) AS bucket,
    sensor_id,
    AVG(avg_value) AS avg_value,
    MIN(min_value) AS min_value,
    MAX(max_value) AS max_value,
    SUM(sample_count) AS sample_count
FROM readings_5min
GROUP BY time_bucket('1 hour', bucket), sensor_id
WITH NO DATA;

SELECT add_continuous_aggregate_policy('readings_hourly',
    start_offset => INTERVAL '4 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

-- Drop raw data after 30 days
SELECT add_retention_policy('sensor_readings', INTERVAL '30 days');

-- Keep 5-minute aggregates for 1 year
SELECT add_retention_policy('readings_5min', INTERVAL '1 year');
Caution: Before enabling retention policies, the archival pipeline must be confirmed to be operational. Once add_retention_policy drops a chunk, the raw data are gone. Export to Parquet on S3 should precede retention if long-term raw data access is required for compliance or ML training.

Real-World Example: Manufacturing Plant

A complete real-world scenario ties the preceding elements together. Consider a manufacturing plant with the following configuration:

  • 3 buildings (A, B, C) on a single campus
  • 50 machines: 20 CNC machines (FANUC, DMG Mori), 15 robots (ABB, KUKA), 10 conveyors, 5 pumps
  • 500 sensors: vibration, temperature, pressure, current, torque, flow rate
  • Average sampling rate: 10 Hz (some vibration sensors at 1 kHz for spectral analysis)

The Schema

-- Seed the metadata
INSERT INTO facilities (name, location, facility_type, commissioned_date, status) VALUES
('Building A', 'North Campus, Chicago IL', 'manufacturing', '2019-03-15', 'active'),
('Building B', 'North Campus, Chicago IL', 'manufacturing', '2021-07-01', 'active'),
('Building C', 'North Campus, Chicago IL', 'warehouse', '2022-01-10', 'active');

-- Sample equipment (showing pattern, not all 50)
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer, model,
                       serial_number, install_date, production_line, status,
                       operating_params) VALUES
(1, 'CNC-A01', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', 'FN-2024-0891',
 '2024-03-15', 'Line 1', 'operational',
 '{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}'),
(1, 'CNC-A02', 'cnc', 'DMG Mori', 'DMU 50', 'DM-2023-4521',
 '2023-09-01', 'Line 1', 'operational',
 '{"max_spindle_rpm": 20000, "tool_capacity": 30, "axes": 5}'),
(1, 'Robot-A01', 'robot', 'ABB', 'IRB 6700', 'ABB-2024-1122',
 '2024-06-10', 'Line 2', 'operational',
 '{"axes": 6, "payload_kg": 150, "reach_mm": 2650}'),
(2, 'CNC-B01', 'cnc', 'FANUC', 'Robodrill a-D21LiB5ADV', 'FN-2024-1205',
 '2024-11-20', 'Line 3', 'operational',
 '{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}');

-- Sensors for CNC-A01 (typical: vibration, temperature, spindle current)
INSERT INTO sensors (equipment_id, name, sensor_type, unit, sampling_rate_hz,
                     min_range, max_range, calibration_date, is_active, tags) VALUES
(1, 'CNC-A01-VIB-X', 'vibration', 'mm/s', 1000, 0, 50,
 '2026-01-15', TRUE, '{"axis": "x", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-VIB-Y', 'vibration', 'mm/s', 1000, 0, 50,
 '2026-01-15', TRUE, '{"axis": "y", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-TEMP-SPINDLE', 'temperature', 'celsius', 1, 10, 85,
 '2026-02-01', TRUE, '{"location": "spindle_bearing"}'),
(1, 'CNC-A01-CURRENT', 'current', 'ampere', 10, 0, 30,
 '2026-02-01', TRUE, '{"phase": "main_spindle"}');

Data Flow

In this plant the data flow proceeds as follows:

  1. Sensors output analogue/digital signals to edge PLCs (programmable logic controllers).
  2. Edge PLCs digitise and publish to an MQTT broker via the Sparkplug B protocol.
  3. Telegraf agents (one per building) subscribe to MQTT, buffer locally, and forward to the central database.
  4. TimescaleDB receives inserts via the Telegraf PostgreSQL output plugin.
  5. The ingestion validator (the Python script described earlier) runs as a sidecar, monitoring for unknown sensor IDs.

With 500 sensors averaging 10 Hz, the system handles approximately 5,000 inserts per second during normal operation, with bursts of up to 50,000 per second when high-frequency vibration captures are triggered. TimescaleDB on a single node (16 vCPU, 64 GB RAM, NVMe SSD) handles this load comfortably with batch inserts.

Dashboard Queries

The operations team uses a Grafana dashboard backed by the following queries:

-- Dashboard Panel 1: Plant Overview — current status of all equipment
SELECT
    f.name AS building,
    e.name AS machine,
    e.equipment_type,
    e.status AS equipment_status,
    COUNT(s.id) FILTER (WHERE s.is_active) AS active_sensors,
    COUNT(ae.id) FILTER (WHERE ae.severity IN ('high', 'critical')
        AND ae.start_time > NOW() - INTERVAL '24 hours') AS critical_anomalies_24h,
    MAX(ml.performed_at) AS last_maintenance
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
LEFT JOIN sensors s ON s.equipment_id = e.id
LEFT JOIN anomaly_events ae ON ae.sensor_id = s.id
LEFT JOIN maintenance_logs ml ON ml.equipment_id = e.id
GROUP BY f.name, e.name, e.equipment_type, e.status
ORDER BY critical_anomalies_24h DESC, f.name, e.name;

-- Dashboard Panel 2: Vibration trends for Line 3 CNC machines (last 24h)
SELECT
    time_bucket('15 minutes', r.time) AS period,
    e.name AS machine,
    AVG(r.value) AS avg_vibration,
    MAX(r.value) AS peak_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
  AND e.equipment_type = 'cnc'
  AND s.sensor_type = 'vibration'
  AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY period, e.name
ORDER BY period, e.name;

-- Dashboard Panel 3: Equipment needing attention
-- (sensors exceeding 80% of their max range)
SELECT
    e.name AS machine,
    s.name AS sensor,
    s.sensor_type,
    s.max_range,
    latest.last_value,
    ROUND((latest.last_value / s.max_range * 100)::numeric, 1) AS pct_of_max
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
CROSS JOIN LATERAL (
    SELECT value AS last_value
    FROM sensor_readings
    WHERE sensor_id = s.id
    ORDER BY time DESC
    LIMIT 1
) latest
WHERE s.is_active = TRUE
  AND s.max_range IS NOT NULL
  AND latest.last_value > s.max_range * 0.8
ORDER BY pct_of_max DESC;

Anomaly Detection Integration

When an ML anomaly-detection model flags unusual behaviour, it writes to the anomaly_events table with full metadata context. A representative Python worker is shown below:

async def record_anomaly(
    pool: asyncpg.Pool,
    sensor_id: int,
    anomaly_type: str,
    severity: str,
    value_at_detection: float,
    model_version: str,
):
    """Record an anomaly event with metadata validation."""
    async with pool.acquire() as conn:
        # Validate sensor exists and get context for logging
        sensor = await conn.fetchrow("""
            SELECT s.name, s.sensor_type, s.max_range,
                   e.name AS equipment, f.name AS facility
            FROM sensors s
            JOIN equipment e ON e.id = s.equipment_id
            JOIN facilities f ON f.id = e.facility_id
            WHERE s.id = $1
        """, sensor_id)

        if not sensor:
            raise ValueError(f"Sensor {sensor_id} not found in metadata")

        anomaly_id = await conn.fetchval("""
            INSERT INTO anomaly_events
                (sensor_id, start_time, anomaly_type, severity,
                 value_at_detection, model_version)
            VALUES ($1, NOW(), $2, $3, $4, $5)
            RETURNING id
        """, sensor_id, anomaly_type, severity, value_at_detection, model_version)

        logger.warning(
            f"Anomaly #{anomaly_id}: {severity} {anomaly_type} on "
            f"{sensor['equipment']}/{sensor['name']} ({sensor['facility']}) "
            f"value={value_at_detection} (max={sensor['max_range']})"
        )

        return anomaly_id

Common Pitfalls

The following errors recur most frequently across the sensor-data architectures the author has reviewed:

Pitfall Impact Solution
Denormalizing metadata into every time-series row 10-20x storage bloat, metadata updates require backfilling billions of rows Store only sensor_id in time-series, JOIN at query time
No foreign key validation Orphaned readings accumulate, 10-20% of data becomes unlinkable Validate sensor_id at ingestion, run periodic quality checks
Single database for everything Either metadata or time-series queries suffer poor performance Use TimescaleDB (best of both) or a split architecture
Not planning for sensor changes Historical data misinterpreted after recalibration or replacement Implement SCD Type 2 for sensor history
Ignoring time zones Time shifts corrupt analysis, especially across multi-site deployments Always use TIMESTAMPTZ, store in UTC, convert at display time
Missing indexes on JOIN columns Cross-domain queries take minutes instead of milliseconds Index (sensor_id, time DESC) on time-series, all FKs on metadata
No retention policy Storage costs grow linearly forever, query performance degrades Tiered retention: raw (30d) → downsampled (1y) → archived (S3)
String-based sensor identification Name changes break links, inconsistent naming across teams Use integer IDs as primary key, names as human-readable labels

 

Tip: The data-quality checks from the ingestion script should be run on a daily schedule. Alerts should be configured for orphaned sensor IDs (readings from sensors not in the metadata registry) and silent sensors (registered sensors with no recent readings). These are early indicators of infrastructure problems.

Final Thoughts

Managing metadata and time-series data together is not a luxury; it is a fundamental requirement for any system seeking to derive actionable insights from sensor data. The sensor_id is the bridge between what the sensors are (metadata) and what they are measuring (time-series), and the architecture must make crossing that bridge in both directions straightforward.

For most teams, PostgreSQL with TimescaleDB is the appropriate starting point. It offers native SQL JOINs across metadata and time-series tables, a single connection string, familiar tooling, and excellent performance up to terabyte scale. Once metadata and sensor data are properly connected, feeding the data into modern time-series forecasting models becomes substantially simpler. When the system outgrows that platform, the patterns for InfluxDB integration, Parquet data lakes, and TDengine super tables provide a clear upgrade path.

The principal design principles are as follows:

  • Separate but connected: Metadata in relational tables, time-series in optimised storage, linked by sensor_id.
  • Sensor registry: Sensors should be treated as first-class entities with rich metadata (type, unit, range, calibration, sampling rate).
  • Slowly changing dimensions: Metadata changes should be tracked over time so that historical data can be interpreted correctly.
  • Validate at ingestion: A time-series reading should never be inserted without confirmation that the sensor exists in metadata.
  • Tiered retention: Raw data (30 days) → downsampled (1 year) → aggregated (indefinite) → archived (cold storage). For the archival tier, an InfluxDB-to-Iceberg pipeline can move older data to S3 at a fraction of the cost.
  • Index the bridge: Composite indexes on (sensor_id, time DESC) render cross-domain queries fast.

The complete schema, ingestion pipeline, query patterns, and API design in this guide provide a production-ready blueprint. The recommended sequence is to begin with the PostgreSQL + TimescaleDB pattern, add the sensor registry and validation layer, implement continuous aggregates for downsampling, and construct the API layer with FastAPI. The resulting system will be one in which "show me all vibration anomalies from Building A's CNC machines installed after 2023" is a query that returns results in milliseconds rather than a question that leaves the team unable to respond.

References

You Might Also Like

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *