Summary
What this post covers: A complete reference for designing systems that store facility metadata and high-frequency sensor time-series together, with SQL schemas, ingestion pipelines, Python code, and a manufacturing case study.
Key insights:
- Metadata and time-series have fundamentally incompatible workloads — relational/hierarchical/slow-changing versus append-only/time-partitioned/high-volume — so forcing both into one storage engine produces queries that take minutes instead of milliseconds.
- The correct architecture pairs PostgreSQL for metadata (facilities, equipment, sensors, maintenance logs) with TimescaleDB hypertables for measurements, bridged only by a sensor_id foreign key — not by embedding metadata into every reading.
- Cross-domain queries like “show vibration anomalies on Building A’s CNC machines installed after 2023” should be answered with a metadata-filter-first pattern that resolves sensor IDs in PostgreSQL, then performs a time-windowed scan in TimescaleDB.
- Scaling beyond billions of rows requires compressing chunks after roughly seven days, materializing continuous aggregates for dashboards, and pushing tag-rich metadata into a JSONB column to avoid schema explosion.
- The most common failure modes are duplicating metadata in every time-series row, leaving orphaned sensor IDs when assets are retired, and skipping API-level joins so callers have to manually correlate two opaque payloads.
Main topics: Introduction, The Data Model Challenge, Architecture Patterns, Detailed Schema Design Best Practices, Data Ingestion Pipeline, Querying Across Metadata and Time-Series, API Design for Metadata + Time-Series, Handling Scale, Real-World Example: Manufacturing Plant, Common Pitfalls, Final Thoughts, References.
Introduction
Consider a factory floor with 500 sensors generating 2.6 billion data points per year. Every vibration reading, every temperature spike, and every pressure anomaly is faithfully captured and stored. When an engineer asks a straightforward question—”Show me all vibration anomalies from Building A’s CNC machines installed after 2023″—the team is unable to provide an answer in under ten minutes. The data exist, scattered across three different systems, but nobody can extract them quickly.
This scenario recurs in manufacturing plants, energy grids, building management systems, and IoT deployments worldwide. The root cause is consistently the same: the team treated metadata and time-series data as separate problems and never designed the bridge between them. The choice of storage layer is an important first step, and the comparison of databases for preprocessed time-series data covers the options in depth.
Any industrial, manufacturing, or IoT system involves two fundamentally different types of data that must work in concert. First, there is metadata: information about facilities, equipment, sensors, locations, configurations, maintenance history, and calibration records. These data are relational, hierarchical, and change slowly. Second, there is time-series data: the actual sensor signals (temperature, vibration, pressure, torque, current, flow rate) streaming in at high frequency, sometimes thousands of readings per second. These data are append-only, voluminous, and indexed by time.
The relationship between these two data types is what enables the system to function. A sensor reading of “47.3” means nothing without the knowledge that sensor S-0142 is a thermocouple mounted on a FANUC CNC spindle in Building A, calibrated last month, with an operating range of 15 to 85 °C. The sensor_id is the bridge: metadata indicate what, while time-series indicate when and how much.
Most teams handle this relationship incorrectly. They embed metadata in every time-series row, creating substantial bloat; they separate the two completely without proper foreign keys, creating orphaned data; or they force everything into a single database that performs poorly on at least one workload. The outcome is consistent: queries that should take milliseconds take minutes, data that should be connected remain isolated, and engineers who should be detecting anomalies instead contend with data infrastructure.
This guide provides a reference for designing a system that manages metadata and time-series data together correctly. It examines four architecture patterns, complete SQL schemas, Python code using SQLAlchemy and FastAPI, ingestion pipelines, query optimisation strategies, and a real-world manufacturing example. By the conclusion, the reader will have the necessary material to build a system in which the “CNC vibration anomalies in Building A” query returns results in less than a second.
The Data Model Challenge
Before considering solutions, it is necessary to understand clearly why these two data types are difficult to manage together. They have fundamentally different characteristics, and a database architecture that is optimal for one is almost always suboptimal for the other.
Metadata: Relational, Hierarchical, and Slowly Changing
Facility and sensor metadata follow a natural hierarchy. A typical industrial deployment is structured as follows:
Organisation → Site → Building → Production Line → Machine → Component → Sensor
Each level in this hierarchy carries substantial attributes. A sensor record may include sensor type, unit of measurement, sampling rate in Hz, minimum and maximum operating range, calibration date, firmware version, installation date, and the equipment on which it is mounted. A machine record includes manufacturer, model, serial number, commissioning date, maintenance schedule, and operating parameters.
These data are relational—sensors belong to equipment, equipment belongs to production lines, and production lines belong to buildings. They are hierarchical—queries such as “all sensors in Building A” require tree traversal. They are slowly changing—sensors are recalibrated, machines are moved to different production lines, and firmware is updated. They are schema-rich—each entity type has many attributes with different data types, constraints, and relationships.
Time-Series: Append-Only, High Volume, and Time-Indexed
Sensor readings are the opposite in nearly every respect. A typical reading consists of just three fields: timestamp, sensor_id, and value. A few additional channels may exist for multi-axis sensors (x, y, z for accelerometers). The schema is narrow and rarely changes.
The volume, however, is substantial. A single vibration sensor sampling at 1 kHz generates 86.4 million readings per day. Even at a modest 1 Hz sampling rate, 500 sensors produce 43.2 million readings per day—approximately 15.8 billion per year. These data are append-only (historical readings are almost never updated), time-indexed (every query includes a time range), and write-heavy (ingestion throughput is important).
Characteristics Comparison
| Characteristic | Metadata | Time-Series |
|---|---|---|
| Schema | Wide, complex, many tables | Narrow (timestamp, id, value) |
| Volume | Thousands to millions of rows | Billions to trillions of rows |
| Write pattern | Infrequent updates, inserts | Continuous high-throughput appends |
| Read pattern | Lookups, JOINs, tree traversal | Range scans by time, aggregations |
| Relationships | Rich foreign keys, hierarchies | Single FK (sensor_id) |
| Mutability | Updates and deletes common | Append-only, rarely modified |
| Indexing | B-tree, GIN, full-text | Time-partitioned, BRIN |
| Retention | Keep forever | Tiered (raw → downsampled → archived) |
Common Mistakes
Teams typically fall into one of three traps:
Mistake 1: Embedding metadata in every time-series row. Instead of storing (timestamp, sensor_id, value), the row stores (timestamp, sensor_id, value, building_name, machine_name, manufacturer, sensor_type, unit, ...). A row that should be 24 bytes becomes 500 bytes. With billions of rows, this results in terabytes of redundant data, slower queries, and serious difficulty when metadata change (does one backfill every historical row?).
Mistake 2: Complete separation without proper linking. Metadata reside in PostgreSQL, time-series in InfluxDB, and the only link is a sensor-name string entered manually. For teams operating this kind of split architecture and considering migration of the InfluxDB side to a lakehouse, the InfluxDB-to-AWS Iceberg pipeline guide describes how to do so while preserving the sensor-id bridge. Sensor names change, new sensors are added to the time-series database without being registered in the metadata database, and suddenly 15 per cent of readings are orphaned—data exist for sensors absent from the metadata system.
Mistake 3: Using one database for everything. Forcing all data into PostgreSQL makes time-series queries slow (no time-partitioning, no columnar compression). Forcing everything into InfluxDB makes metadata queries impossible (no JOINs, no foreign keys, no transactions). Neither database excels at the other’s workload.
sensor_id is the bridge between metadata and time-series. The architecture must make it straightforward to begin from either side—filtering by metadata attributes and then fetching time-series, or detecting time-series anomalies and then retrieving the metadata context.
Architecture Patterns
There is no single “right” architecture for combining metadata and time-series data. The most appropriate choice depends on scale, team expertise, existing infrastructure, and query patterns. Four proven patterns are described below, from the most commonly recommended to the most specialised.
Pattern 1: PostgreSQL with TimescaleDB (Recommended)
This is the pattern recommended for most teams and the one to which the discussion devotes the most attention. TimescaleDB is a PostgreSQL extension that adds time-series capabilities—hypertables, automatic time partitioning, continuous aggregates, and compression—while preserving full PostgreSQL functionality. Because it runs within PostgreSQL, native SQL JOINs are available between metadata tables and time-series hypertables.
The complete schema is shown below:
-- Enable TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- ============================================
-- METADATA TABLES
-- ============================================
CREATE TABLE facilities (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
location VARCHAR(500),
facility_type VARCHAR(50) NOT NULL, -- 'manufacturing', 'warehouse', 'office'
commissioned_date DATE,
status VARCHAR(20) DEFAULT 'active',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE equipment (
id SERIAL PRIMARY KEY,
facility_id INTEGER NOT NULL REFERENCES facilities(id),
name VARCHAR(200) NOT NULL,
equipment_type VARCHAR(50) NOT NULL, -- 'cnc', 'robot', 'conveyor', 'pump'
manufacturer VARCHAR(200),
model VARCHAR(200),
serial_number VARCHAR(100) UNIQUE,
install_date DATE,
production_line VARCHAR(100),
status VARCHAR(20) DEFAULT 'operational',
operating_params JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_equipment_facility ON equipment(facility_id);
CREATE INDEX idx_equipment_type ON equipment(equipment_type);
CREATE INDEX idx_equipment_manufacturer ON equipment(manufacturer);
CREATE INDEX idx_equipment_line ON equipment(production_line);
CREATE TABLE sensors (
id SERIAL PRIMARY KEY,
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
name VARCHAR(200) NOT NULL,
sensor_type VARCHAR(50) NOT NULL, -- 'temperature', 'vibration', 'pressure'
unit VARCHAR(20) NOT NULL, -- 'celsius', 'mm/s', 'bar', 'A'
sampling_rate_hz REAL DEFAULT 1.0,
min_range REAL,
max_range REAL,
calibration_date DATE,
firmware_version VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
tags JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_sensors_equipment ON sensors(equipment_id);
CREATE INDEX idx_sensors_type ON sensors(sensor_type);
CREATE INDEX idx_sensors_active ON sensors(is_active) WHERE is_active = TRUE;
CREATE INDEX idx_sensors_tags ON sensors USING GIN(tags);
CREATE TABLE maintenance_logs (
id SERIAL PRIMARY KEY,
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
maintenance_type VARCHAR(50) NOT NULL, -- 'preventive', 'corrective', 'calibration'
description TEXT,
performed_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
technician VARCHAR(200),
parts_replaced JSONB DEFAULT '[]',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_maintenance_equipment ON maintenance_logs(equipment_id);
CREATE INDEX idx_maintenance_time ON maintenance_logs(performed_at);
-- ============================================
-- TIME-SERIES TABLES (TimescaleDB Hypertables)
-- ============================================
CREATE TABLE sensor_readings (
time TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
value DOUBLE PRECISION NOT NULL
);
SELECT create_hypertable('sensor_readings', 'time');
CREATE INDEX idx_readings_sensor_time ON sensor_readings (sensor_id, time DESC);
-- Enable compression (after 7 days)
ALTER TABLE sensor_readings SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');
-- Anomaly events table
CREATE TABLE anomaly_events (
id SERIAL PRIMARY KEY,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
anomaly_type VARCHAR(50) NOT NULL, -- 'threshold', 'trend', 'pattern'
severity VARCHAR(20) NOT NULL, -- 'low', 'medium', 'high', 'critical'
value_at_detection DOUBLE PRECISION,
model_version VARCHAR(50),
notes TEXT,
acknowledged BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_anomaly_sensor ON anomaly_events(sensor_id);
CREATE INDEX idx_anomaly_time ON anomaly_events(start_time);
Populating the anomaly_events table in real time is a natural fit for complex event processing with Apache Flink CEP, which can detect multi-event anomaly patterns across thousands of sensor streams with millisecond latency.
compress_segmentby = 'sensor_id' setting is important. It instructs TimescaleDB to group compressed data by sensor, which means queries filtered by sensor_id only decompress the relevant segments. Without this setting, every query would decompress entire chunks.
The power of native JOINs is illustrated below. The following queries cross the metadata/time-series boundary without difficulty:
-- Query 1: Average temperature for all sensors in Building A, last 24 hours
SELECT
f.name AS facility,
e.name AS equipment,
s.name AS sensor,
AVG(r.value) AS avg_temp,
MIN(r.value) AS min_temp,
MAX(r.value) AS max_temp
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
AND s.sensor_type = 'temperature'
AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY f.name, e.name, s.name
ORDER BY avg_temp DESC;
-- Query 2: FANUC machines with vibration exceeding threshold
SELECT
e.name AS machine,
e.model,
s.name AS sensor,
s.max_range AS threshold,
MAX(r.value) AS peak_vibration,
COUNT(*) AS exceedance_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.manufacturer = 'FANUC'
AND s.sensor_type = 'vibration'
AND r.value > s.max_range
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, e.model, s.name, s.max_range
ORDER BY peak_vibration DESC;
-- Query 3: Compare vibration across CNC machines on Production Line 3
SELECT
e.name AS machine,
time_bucket('1 hour', r.time) AS hour,
AVG(r.value) AS avg_vibration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY r.value) AS p95_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
AND e.equipment_type = 'cnc'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY e.name, hour
ORDER BY e.name, hour;
Each query seamlessly combines metadata filters (facility name, manufacturer, production line, sensor type) with time-series operations (time ranges, aggregations, percentiles). This is the principal advantage of the PostgreSQL + TimescaleDB pattern: a single SQL statement can traverse the entire data model.
Pattern 2: PostgreSQL with InfluxDB
When InfluxDB is already part of the stack, or when write throughput exceeds what PostgreSQL can handle (generally above 500,000 inserts per second on a single node), a split architecture is appropriate. Metadata remain in PostgreSQL, time-series move to InfluxDB, and the application performs the JOIN.
import asyncpg
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
class DualDatabaseQuery:
def __init__(self, pg_dsn: str, influx_url: str, influx_token: str, influx_org: str):
self.pg_dsn = pg_dsn
self.influx = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
self.query_api = self.influx.query_api()
async def get_readings_by_facility(
self, facility_name: str, sensor_type: str, hours: int = 24
):
# Step 1: Query metadata from PostgreSQL
conn = await asyncpg.connect(self.pg_dsn)
sensors = await conn.fetch("""
SELECT s.id, s.name, e.name AS equipment_name
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = $1 AND s.sensor_type = $2 AND s.is_active = TRUE
""", facility_name, sensor_type)
await conn.close()
if not sensors:
return []
# Step 2: Query time-series from InfluxDB, filtered by sensor IDs
sensor_ids = [str(s['id']) for s in sensors]
sensor_filter = ' or '.join(
f'r["sensor_id"] == "{sid}"' for sid in sensor_ids
)
flux_query = f'''
from(bucket: "sensor_data")
|> range(start: -{hours}h)
|> filter(fn: (r) => r["_measurement"] == "readings")
|> filter(fn: (r) => {sensor_filter})
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
'''
tables = self.query_api.query(flux_query)
# Step 3: Merge metadata with time-series results
sensor_lookup = {str(s['id']): s for s in sensors}
results = []
for table in tables:
for record in table.records:
sid = record.values.get("sensor_id")
meta = sensor_lookup.get(sid, {})
results.append({
"time": record.get_time(),
"sensor_id": sid,
"sensor_name": meta.get("name"),
"equipment": meta.get("equipment_name"),
"value": record.get_value(),
})
return results
The PostgreSQL + InfluxDB pattern works but sacrifices the elegance of native JOINs. Every cross-domain query requires two round-trips, and complex queries (such as “compare vibration patterns across machines by manufacturer”) require substantial application-level logic. This pattern is appropriate when InfluxDB is already in production and migration is not feasible, or when write throughput genuinely exceeds PostgreSQL/TimescaleDB limits.
Pattern 3: PostgreSQL with Parquet/Iceberg on S3
For very large-scale deployments (terabytes of time-series data) or when the primary consumer is batch ML training pipelines, storing time-series data as Parquet files on S3 is cost-effective and scalable. Metadata remain in PostgreSQL, and joins are performed at query time using DuckDB, Athena, or Spark.
import duckdb
import asyncpg
from pathlib import Path
class ParquetTimeSeriesQuery:
"""
Time-series stored as Parquet files on S3, partitioned by:
s3://data-lake/sensor_readings/sensor_id={id}/date={YYYY-MM-DD}/data.parquet
"""
def __init__(self, pg_dsn: str, s3_base: str):
self.pg_dsn = pg_dsn
self.s3_base = s3_base
self.duck = duckdb.connect()
self.duck.execute("INSTALL httpfs; LOAD httpfs;")
self.duck.execute("SET s3_region='us-east-1';")
async def query_with_metadata(
self, facility_name: str, sensor_type: str, start_date: str, end_date: str
):
# Step 1: Get relevant sensor IDs from PostgreSQL
conn = await asyncpg.connect(self.pg_dsn)
sensors = await conn.fetch("""
SELECT s.id, s.name, s.unit, e.name AS equipment,
e.manufacturer, f.name AS facility
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = $1 AND s.sensor_type = $2
""", facility_name, sensor_type)
await conn.close()
# Step 2: Build Parquet glob paths for relevant sensors
sensor_ids = [s['id'] for s in sensors]
paths = [
f"{self.s3_base}/sensor_id={sid}/date=*/data.parquet"
for sid in sensor_ids
]
# Step 3: Query with DuckDB
result = self.duck.execute(f"""
SELECT
sensor_id,
date_trunc('hour', time) AS hour,
AVG(value) AS avg_value,
MAX(value) AS max_value,
COUNT(*) AS reading_count
FROM parquet_scan({paths})
WHERE time BETWEEN '{start_date}' AND '{end_date}'
GROUP BY sensor_id, hour
ORDER BY sensor_id, hour
""").fetchdf()
# Step 4: Merge with metadata
sensor_lookup = {s['id']: dict(s) for s in sensors}
result['equipment'] = result['sensor_id'].map(
lambda sid: sensor_lookup.get(sid, {}).get('equipment')
)
result['facility'] = result['sensor_id'].map(
lambda sid: sensor_lookup.get(sid, {}).get('facility')
)
return result
This pattern is best suited to data lakes and ML training pipelines requiring cost-effective processing of large volumes of historical data. Parquet’s columnar format provides excellent compression (ten to twenty times that of CSV), and partitioning by sensor_id and date ensures that queries read only the relevant files. The pattern is poorly suited, however, to real-time queries or dashboards that require sub-second response times.
Pattern 4: TDengine Super Tables
TDengine takes a substantially different approach. Its “super table” concept embeds metadata as tags directly alongside time-series data. Each physical sensor receives a sub-table inheriting from a super table, and tags (metadata) are stored only once per sub-table rather than repeated in every row.
-- Create a super table with tags (metadata) and columns (time-series)
CREATE STABLE sensor_readings (
ts TIMESTAMP,
value DOUBLE,
quality INT
) TAGS (
facility NCHAR(200),
building NCHAR(100),
equipment NCHAR(200),
manufacturer NCHAR(200),
sensor_type NCHAR(50),
unit NCHAR(20),
line NCHAR(100)
);
-- Create sub-tables for each sensor (tags are set once)
CREATE TABLE sensor_0001 USING sensor_readings TAGS (
'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'vibration', 'mm/s', 'Line 3'
);
CREATE TABLE sensor_0002 USING sensor_readings TAGS (
'Plant Chicago', 'Building A', 'CNC-001', 'FANUC', 'temperature', 'celsius', 'Line 3'
);
-- Insert data (just timestamp + values, no metadata repetition)
INSERT INTO sensor_0001 VALUES (NOW(), 4.52, 100);
INSERT INTO sensor_0002 VALUES (NOW(), 67.3, 100);
-- Query across all sensors using metadata tags
SELECT
facility,
equipment,
AVG(value) AS avg_vibration
FROM sensor_readings
WHERE sensor_type = 'vibration'
AND facility = 'Plant Chicago'
AND ts > NOW() - 24h
GROUP BY facility, equipment;
TDengine’s approach is elegant for IoT: metadata reside alongside the data, tags are indexed automatically, and a separate metadata database is not required. The disadvantage is that complex metadata relationships (maintenance logs, calibration history, hierarchical queries) are difficult to model with flat tags. If the metadata are simple and relatively static, TDengine is worth considering; if rich relational metadata are required, Pattern 1 or Pattern 2 should be preferred.
Pattern Comparison
| Criteria | PG + TimescaleDB | PG + InfluxDB | PG + Parquet/S3 | TDengine |
|---|---|---|---|---|
| Complexity | Low | Medium | Medium-High | Low |
| Native JOINs | Yes | No (app-level) | No (query engine) | Tags only |
| Write throughput | 100K-500K rows/s | 1M+ rows/s | Batch (unlimited) | 1M+ rows/s |
| Query flexibility | Full SQL | Flux + SQL | SQL (DuckDB/Athena) | SQL subset |
| Metadata richness | Full relational | Full relational | Full relational | Flat tags only |
| Scalability | TB scale | TB scale | PB scale | TB scale |
| Best for | Most teams | Existing InfluxDB | Data lakes, ML | Simple IoT |
Detailed Schema Design Best Practices
Regardless of the architecture pattern chosen, certain schema-design principles apply universally. The most important are discussed below.
Hierarchical Facility Modelling
Facility hierarchies are inherently tree-structured. Queries such as “all sensors in Building A” must be answered efficiently, which requires identifying every piece of equipment in every production line in that building. Two effective approaches exist in PostgreSQL.
Approach 1: the ltree extension.
CREATE EXTENSION IF NOT EXISTS ltree;
-- Add a path column to each entity
ALTER TABLE facilities ADD COLUMN path ltree;
ALTER TABLE equipment ADD COLUMN path ltree;
ALTER TABLE sensors ADD COLUMN path ltree;
-- Example paths
-- Facility: 'org.chicago'
-- Equipment: 'org.chicago.building_a.line_3.cnc_001'
-- Sensor: 'org.chicago.building_a.line_3.cnc_001.vibration_x'
CREATE INDEX idx_facility_path ON facilities USING GIST(path);
CREATE INDEX idx_equipment_path ON equipment USING GIST(path);
CREATE INDEX idx_sensor_path ON sensors USING GIST(path);
-- Find all sensors under Building A (any depth)
SELECT s.* FROM sensors s
WHERE s.path <@ 'org.chicago.building_a';
-- Find all equipment exactly 2 levels below org.chicago
SELECT e.* FROM equipment e
WHERE e.path ~ 'org.chicago.*{2}';
Approach 2: recursive CTEs with adjacency list.
If extensions are to be avoided, recursive CTEs work well for moderate-sized hierarchies:
-- Find all equipment under a specific facility, including nested structures
WITH RECURSIVE facility_tree AS (
-- Base case: the target facility
SELECT id, name, facility_type, id AS root_id
FROM facilities
WHERE name = 'Building A'
UNION ALL
-- Recursive case: equipment belonging to facilities in the tree
SELECT e.id, e.name, e.equipment_type, ft.root_id
FROM equipment e
JOIN facility_tree ft ON e.facility_id = ft.id
)
SELECT * FROM facility_tree;
Slowly Changing Dimensions (SCD Type 2)
Equipment moves between production lines, sensors are recalibrated, and firmware is updated. Simply overwriting the old value removes the ability to interpret historical data correctly. A vibration reading from last month should be evaluated against the calibration that was active at that time, not against today's calibration.
SCD Type 2 addresses this requirement by maintaining a history of changes with effective date ranges:
CREATE TABLE sensor_history (
id SERIAL PRIMARY KEY,
sensor_id INTEGER NOT NULL REFERENCES sensors(id),
equipment_id INTEGER NOT NULL REFERENCES equipment(id),
calibration_date DATE,
min_range REAL,
max_range REAL,
firmware_version VARCHAR(50),
effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
effective_to TIMESTAMPTZ, -- NULL means "current"
is_current BOOLEAN DEFAULT TRUE
);
CREATE INDEX idx_sensor_history_current
ON sensor_history(sensor_id) WHERE is_current = TRUE;
CREATE INDEX idx_sensor_history_range
ON sensor_history(sensor_id, effective_from, effective_to);
-- When recalibrating a sensor:
-- Step 1: Close the current record
UPDATE sensor_history
SET effective_to = NOW(), is_current = FALSE
WHERE sensor_id = 42 AND is_current = TRUE;
-- Step 2: Insert new record
INSERT INTO sensor_history
(sensor_id, equipment_id, calibration_date, min_range, max_range,
firmware_version, effective_from, is_current)
VALUES
(42, 15, '2026-04-01', 0, 100, 'v3.2.1', NOW(), TRUE);
-- Query: What was the calibration when this anomaly was detected?
SELECT sh.*
FROM sensor_history sh
JOIN anomaly_events ae ON ae.sensor_id = sh.sensor_id
WHERE ae.id = 789
AND ae.start_time BETWEEN sh.effective_from
AND COALESCE(sh.effective_to, '9999-12-31'::timestamptz);
JSONB for Flexible Attributes
Not every piece of equipment shares the same attributes. A CNC machine has spindle speed and tool count; a conveyor has belt speed and length; a robot has axis count and payload capacity. Rather than creating separate tables for each equipment type, JSONB columns may be used for type-specific attributes:
-- Equipment with flexible operating parameters
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer,
model, operating_params)
VALUES
(1, 'CNC-001', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', '{
"max_spindle_rpm": 24000,
"tool_capacity": 21,
"axes": 5,
"max_feed_rate_mm_min": 54000
}'::jsonb),
(1, 'Robot-001', 'robot', 'ABB', 'IRB 6700', '{
"axes": 6,
"payload_kg": 150,
"reach_mm": 2650,
"repeatability_mm": 0.05
}'::jsonb);
-- Query: Find all robots with payload > 100kg
SELECT name, model, operating_params->>'payload_kg' AS payload
FROM equipment
WHERE equipment_type = 'robot'
AND (operating_params->>'payload_kg')::numeric > 100;
-- Index for fast JSONB queries
CREATE INDEX idx_equipment_params ON equipment USING GIN(operating_params);
Tagging System for Ad-Hoc Grouping
Beyond the formal hierarchy, teams often need to group sensors by arbitrary criteria, such as "all sensors involved in the Q1 reliability study," "sensors monitored by the ML anomaly-detection model," or "critical sensors requiring 24/7 alerting." A flexible tagging system supports this requirement:
-- Sensors table already has a JSONB 'tags' column
-- Usage examples:
UPDATE sensors SET tags = '{
"monitoring_group": "critical_24x7",
"ml_model": "vibration_anomaly_v2",
"study": "q1_reliability",
"zone": "high_temperature"
}'::jsonb
WHERE id = 42;
-- Find all sensors in a monitoring group
SELECT s.*, e.name AS equipment
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
WHERE s.tags @> '{"monitoring_group": "critical_24x7"}';
-- Find sensors enrolled in a specific ML model
SELECT s.id, s.name, s.sensor_type
FROM sensors s
WHERE s.tags @> '{"ml_model": "vibration_anomaly_v2"}';
Data Ingestion Pipeline
Reliable transfer of data from sensors into the database is half the work. A production ingestion pipeline typically follows this path:
Sensors → MQTT/Modbus → Kafka/MQTT Broker → Telegraf or Custom Consumer → Database
Telegraf Configuration
Telegraf is a widely used agent for collecting and forwarding sensor data. The configuration below reads from MQTT, enriches with metadata tags, and writes to TimescaleDB:
# telegraf.conf
[[inputs.mqtt_consumer]]
servers = ["tcp://mqtt-broker:1883"]
topics = ["sensors/+/readings"]
data_format = "json"
tag_keys = ["sensor_id"]
json_time_key = "timestamp"
json_time_format = "2006-01-02T15:04:05Z07:00"
# Enrich with metadata from a lookup file (updated periodically)
[[processors.enum]]
[[processors.enum.mapping]]
tag = "sensor_id"
dest = "sensor_type"
[processors.enum.mapping.value_mappings]
"S-0001" = "vibration"
"S-0002" = "temperature"
[[outputs.postgresql]]
connection = "postgres://user:pass@localhost/sensordb"
table_template = """
INSERT INTO sensor_readings (time, sensor_id, value)
VALUES ({time}, {sensor_id}::integer, {value})
"""
Python Ingestion Script with Validation
For greater control, a custom Python ingestion script can validate sensor IDs against metadata, handle errors, and batch inserts:
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import Optional
import asyncpg
import aiomqtt
logger = logging.getLogger(__name__)
class SensorDataIngester:
"""Ingests sensor readings with metadata validation."""
def __init__(self, pg_dsn: str, mqtt_host: str, mqtt_port: int = 1883):
self.pg_dsn = pg_dsn
self.mqtt_host = mqtt_host
self.mqtt_port = mqtt_port
self.pool: Optional[asyncpg.Pool] = None
self.valid_sensors: set[int] = set()
self.batch: list[tuple] = []
self.batch_size = 1000
self.flush_interval = 5 # seconds
async def start(self):
"""Initialize connections and start ingestion."""
self.pool = await asyncpg.create_pool(self.pg_dsn, min_size=2, max_size=10)
await self._load_valid_sensors()
# Run batch flusher and MQTT listener concurrently
await asyncio.gather(
self._mqtt_listener(),
self._periodic_flush(),
self._periodic_sensor_refresh(),
)
async def _load_valid_sensors(self):
"""Load active sensor IDs from metadata database."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id FROM sensors WHERE is_active = TRUE"
)
self.valid_sensors = {row['id'] for row in rows}
logger.info(f"Loaded {len(self.valid_sensors)} active sensors")
async def _periodic_sensor_refresh(self):
"""Refresh valid sensor list every 5 minutes."""
while True:
await asyncio.sleep(300)
await self._load_valid_sensors()
async def _mqtt_listener(self):
"""Listen for sensor readings on MQTT."""
async with aiomqtt.Client(self.mqtt_host, self.mqtt_port) as client:
await client.subscribe("sensors/+/readings")
async for message in client.messages:
try:
payload = json.loads(message.payload)
sensor_id = int(payload['sensor_id'])
# Validate against metadata
if sensor_id not in self.valid_sensors:
logger.warning(
f"Rejected reading from unknown sensor {sensor_id}"
)
continue
timestamp = datetime.fromisoformat(payload['timestamp'])
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
value = float(payload['value'])
self.batch.append((timestamp, sensor_id, value))
if len(self.batch) >= self.batch_size:
await self._flush_batch()
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.error(f"Invalid message: {e}")
async def _periodic_flush(self):
"""Flush batch at regular intervals."""
while True:
await asyncio.sleep(self.flush_interval)
if self.batch:
await self._flush_batch()
async def _flush_batch(self):
"""Insert batch of readings into TimescaleDB."""
if not self.batch:
return
batch_to_insert = self.batch.copy()
self.batch.clear()
try:
async with self.pool.acquire() as conn:
await conn.executemany(
"""INSERT INTO sensor_readings (time, sensor_id, value)
VALUES ($1, $2, $3)""",
batch_to_insert
)
logger.info(f"Inserted {len(batch_to_insert)} readings")
except Exception as e:
logger.error(f"Batch insert failed: {e}")
# Re-add failed batch for retry
self.batch.extend(batch_to_insert)
# Data quality checks
async def check_data_quality(pool: asyncpg.Pool):
"""Detect common data quality issues."""
async with pool.acquire() as conn:
# Orphaned readings (sensor_id not in sensors table)
orphaned = await conn.fetchval("""
SELECT COUNT(DISTINCT r.sensor_id)
FROM sensor_readings r
LEFT JOIN sensors s ON s.id = r.sensor_id
WHERE s.id IS NULL
AND r.time > NOW() - INTERVAL '24 hours'
""")
# Sensors with no recent readings (possible failure)
silent = await conn.fetch("""
SELECT s.id, s.name, e.name AS equipment,
MAX(r.time) AS last_reading
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
LEFT JOIN sensor_readings r ON r.sensor_id = s.id
AND r.time > NOW() - INTERVAL '24 hours'
WHERE s.is_active = TRUE
GROUP BY s.id, s.name, e.name
HAVING MAX(r.time) IS NULL
OR MAX(r.time) < NOW() - INTERVAL '1 hour'
""")
# Sensors with values outside their calibrated range
out_of_range = await conn.fetch("""
SELECT s.id, s.name, s.min_range, s.max_range,
MIN(r.value) AS min_val, MAX(r.value) AS max_val,
COUNT(*) AS violation_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
WHERE r.time > NOW() - INTERVAL '24 hours'
AND (r.value < s.min_range OR r.value > s.max_range)
GROUP BY s.id, s.name, s.min_range, s.max_range
""")
return {
"orphaned_sensor_ids": orphaned,
"silent_sensors": [dict(r) for r in silent],
"out_of_range_sensors": [dict(r) for r in out_of_range],
}
_load_valid_sensors() method caches active sensor IDs in memory and refreshes every five minutes. This avoids a database round-trip for every incoming message while ensuring new sensor registrations are detected within a reasonable interval.
Handling Late-Arriving and Out-of-Order Data
In real-world deployments, data do not always arrive in order. Network delays, edge-device buffering, and batch uploads from remote sites all produce out-of-order events. TimescaleDB handles this situation gracefully: inserts are not required to be in time order. If continuous aggregates or materialised views are used, however, a refresh policy must be configured that covers the maximum expected delay:
-- Continuous aggregate that tolerates late data (up to 1 hour)
CREATE MATERIALIZED VIEW hourly_averages
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;
-- Refresh policy: refresh the last 2 hours every 30 minutes
SELECT add_continuous_aggregate_policy('hourly_averages',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '30 minutes',
schedule_interval => INTERVAL '30 minutes'
);
Querying Across Metadata and Time-Series
The genuine value of a well-designed schema emerges when queries cross the metadata/time-series boundary. Five common query patterns are presented below, each with complete SQL and Python implementations.
All Readings by Location and Sensor Type
-- All vibration readings from sensors in Building A, last 7 days
-- Using TimescaleDB time_bucket for efficient aggregation
SELECT
time_bucket('15 minutes', r.time) AS period,
e.name AS equipment,
s.name AS sensor,
AVG(r.value) AS avg_vibration,
MAX(r.value) AS peak_vibration,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY r.value) AS p99_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE f.name = 'Building A'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '7 days'
GROUP BY period, e.name, s.name
ORDER BY period DESC, peak_vibration DESC;
Average Daily Values Grouped by Manufacturer
-- Average daily temperature per facility, grouped by equipment manufacturer
SELECT
f.name AS facility,
e.manufacturer,
time_bucket('1 day', r.time) AS day,
AVG(r.value) AS avg_temperature,
COUNT(DISTINCT s.id) AS sensor_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.sensor_type = 'temperature'
AND r.time > NOW() - INTERVAL '30 days'
GROUP BY f.name, e.manufacturer, day
ORDER BY f.name, e.manufacturer, day;
Equipment with Sensors Exceeding Their Range
-- Find equipment where any sensor exceeded its max_range in the past month
SELECT
f.name AS facility,
e.name AS equipment,
e.manufacturer,
s.name AS sensor,
s.sensor_type,
s.max_range AS threshold,
MAX(r.value) AS peak_value,
COUNT(*) FILTER (WHERE r.value > s.max_range) AS exceedance_count,
MIN(r.time) FILTER (WHERE r.value > s.max_range) AS first_exceedance,
MAX(r.time) FILTER (WHERE r.value > s.max_range) AS last_exceedance
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE r.time > NOW() - INTERVAL '30 days'
AND s.max_range IS NOT NULL
GROUP BY f.name, e.name, e.manufacturer, s.name, s.sensor_type, s.max_range
HAVING COUNT(*) FILTER (WHERE r.value > s.max_range) > 0
ORDER BY exceedance_count DESC;
Readings Before and After Maintenance
-- Compare sensor readings 24 hours before and after a maintenance event
WITH maintenance AS (
SELECT id, equipment_id, performed_at, maintenance_type
FROM maintenance_logs
WHERE id = 456 -- specific maintenance event
),
before_maintenance AS (
SELECT
s.name AS sensor,
s.sensor_type,
AVG(r.value) AS avg_value,
STDDEV(r.value) AS stddev_value,
'before' AS period
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN maintenance m ON s.equipment_id = m.equipment_id
WHERE r.time BETWEEN m.performed_at - INTERVAL '24 hours' AND m.performed_at
GROUP BY s.name, s.sensor_type
),
after_maintenance AS (
SELECT
s.name AS sensor,
s.sensor_type,
AVG(r.value) AS avg_value,
STDDEV(r.value) AS stddev_value,
'after' AS period
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN maintenance m ON s.equipment_id = m.equipment_id
WHERE r.time BETWEEN m.performed_at AND m.performed_at + INTERVAL '24 hours'
GROUP BY s.name, s.sensor_type
)
SELECT
b.sensor,
b.sensor_type,
b.avg_value AS avg_before,
a.avg_value AS avg_after,
ROUND(((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0) * 100)::numeric, 2)
AS pct_change,
b.stddev_value AS stddev_before,
a.stddev_value AS stddev_after
FROM before_maintenance b
JOIN after_maintenance a ON a.sensor = b.sensor
ORDER BY ABS((a.avg_value - b.avg_value) / NULLIF(b.avg_value, 0)) DESC;
Anomaly Events with Full Context
-- Anomaly events for FANUC robots installed in 2024, with full context
SELECT
ae.id AS anomaly_id,
ae.anomaly_type,
ae.severity,
ae.start_time,
ae.end_time,
ae.value_at_detection,
s.name AS sensor,
s.sensor_type,
s.max_range,
e.name AS equipment,
e.manufacturer,
e.model,
e.install_date,
f.name AS facility
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE e.manufacturer = 'FANUC'
AND e.equipment_type = 'robot'
AND e.install_date >= '2024-01-01'
AND ae.start_time > NOW() - INTERVAL '90 days'
ORDER BY ae.severity DESC, ae.start_time DESC;
Python Query Service
Wrapping these queries in a service class provides a clean interface for application code:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncpg
@dataclass
class SensorReading:
time: datetime
sensor_id: int
sensor_name: str
equipment_name: str
facility_name: str
sensor_type: str
value: float
unit: str
class QueryService:
"""Combines metadata filtering with time-series queries."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_readings(
self,
facility: Optional[str] = None,
equipment_type: Optional[str] = None,
manufacturer: Optional[str] = None,
sensor_type: Optional[str] = None,
production_line: Optional[str] = None,
tags: Optional[dict] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
bucket_interval: str = '1 hour',
) -> list[dict]:
"""
Flexible query combining metadata filters with time-series aggregation.
"""
if start is None:
start = datetime.utcnow() - timedelta(hours=24)
if end is None:
end = datetime.utcnow()
conditions = ["r.time >= $1", "r.time <= $2"]
params: list = [start, end]
param_idx = 3
if facility:
conditions.append(f"f.name = ${param_idx}")
params.append(facility)
param_idx += 1
if equipment_type:
conditions.append(f"e.equipment_type = ${param_idx}")
params.append(equipment_type)
param_idx += 1
if manufacturer:
conditions.append(f"e.manufacturer = ${param_idx}")
params.append(manufacturer)
param_idx += 1
if sensor_type:
conditions.append(f"s.sensor_type = ${param_idx}")
params.append(sensor_type)
param_idx += 1
if production_line:
conditions.append(f"e.production_line = ${param_idx}")
params.append(production_line)
param_idx += 1
if tags:
conditions.append(f"s.tags @> ${param_idx}::jsonb")
params.append(json.dumps(tags))
param_idx += 1
where_clause = " AND ".join(conditions)
query = f"""
SELECT
time_bucket('{bucket_interval}', r.time) AS bucket,
s.id AS sensor_id,
s.name AS sensor_name,
s.sensor_type,
s.unit,
e.name AS equipment_name,
e.manufacturer,
f.name AS facility_name,
AVG(r.value) AS avg_value,
MIN(r.value) AS min_value,
MAX(r.value) AS max_value,
COUNT(*) AS sample_count
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE {where_clause}
GROUP BY bucket, s.id, s.name, s.sensor_type, s.unit,
e.name, e.manufacturer, f.name
ORDER BY bucket DESC, sensor_name
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [dict(r) for r in rows]
async def get_equipment_health(self, equipment_id: int) -> dict:
"""Get comprehensive health status for a piece of equipment."""
async with self.pool.acquire() as conn:
# Equipment metadata
equipment = await conn.fetchrow("""
SELECT e.*, f.name AS facility_name
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
WHERE e.id = $1
""", equipment_id)
# Latest readings from all sensors
latest_readings = await conn.fetch("""
SELECT DISTINCT ON (s.id)
s.id AS sensor_id, s.name, s.sensor_type, s.unit,
s.min_range, s.max_range,
r.time AS last_reading_time,
r.value AS last_value,
CASE
WHEN r.value > s.max_range THEN 'exceeded'
WHEN r.value < s.min_range THEN 'below_range'
ELSE 'normal'
END AS range_status
FROM sensors s
LEFT JOIN sensor_readings r ON r.sensor_id = s.id
AND r.time > NOW() - INTERVAL '1 hour'
WHERE s.equipment_id = $1 AND s.is_active = TRUE
ORDER BY s.id, r.time DESC
""", equipment_id)
# Recent anomalies
anomalies = await conn.fetch("""
SELECT ae.*, s.name AS sensor_name, s.sensor_type
FROM anomaly_events ae
JOIN sensors s ON s.id = ae.sensor_id
WHERE s.equipment_id = $1
AND ae.start_time > NOW() - INTERVAL '7 days'
ORDER BY ae.start_time DESC
LIMIT 20
""", equipment_id)
# Last maintenance
last_maintenance = await conn.fetchrow("""
SELECT * FROM maintenance_logs
WHERE equipment_id = $1
ORDER BY performed_at DESC LIMIT 1
""", equipment_id)
return {
"equipment": dict(equipment) if equipment else None,
"sensors": [dict(r) for r in latest_readings],
"recent_anomalies": [dict(a) for a in anomalies],
"last_maintenance": dict(last_maintenance) if last_maintenance else None,
"overall_status": self._calculate_status(latest_readings, anomalies),
}
@staticmethod
def _calculate_status(readings, anomalies) -> str:
critical_anomalies = [a for a in anomalies if a['severity'] == 'critical']
exceeded_sensors = [r for r in readings if r['range_status'] == 'exceeded']
if critical_anomalies or len(exceeded_sensors) > 2:
return "critical"
elif exceeded_sensors or any(a['severity'] == 'high' for a in anomalies):
return "warning"
return "healthy"
API Design for Metadata and Time-Series
A well-designed API layer makes the combined metadata/time-series system accessible to dashboards, mobile applications, and other services. A FastAPI implementation that exposes the key endpoints is shown below:
from datetime import datetime, timedelta
from typing import Optional
import asyncpg
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
app = FastAPI(title="Sensor Data API")
pool: asyncpg.Pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/sensordb",
min_size=5, max_size=20
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
# ---- Pydantic Models ----
class FacilityResponse(BaseModel):
id: int
name: str
location: Optional[str]
facility_type: str
status: str
equipment_count: int
class EquipmentResponse(BaseModel):
id: int
name: str
equipment_type: str
manufacturer: Optional[str]
model: Optional[str]
status: str
sensor_count: int
production_line: Optional[str]
class SensorReadingResponse(BaseModel):
time: datetime
value: float
sensor_name: str
sensor_type: str
unit: str
class EquipmentHealthResponse(BaseModel):
equipment_id: int
equipment_name: str
facility: str
status: str
sensors: list[dict]
recent_anomalies: list[dict]
last_maintenance: Optional[dict]
# ---- Endpoints ----
@app.get("/facilities/{facility_id}/equipment",
response_model=list[EquipmentResponse])
async def list_equipment(facility_id: int):
"""List all equipment in a facility with metadata."""
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT e.id, e.name, e.equipment_type, e.manufacturer,
e.model, e.status, e.production_line,
COUNT(s.id) AS sensor_count
FROM equipment e
LEFT JOIN sensors s ON s.equipment_id = e.id AND s.is_active = TRUE
WHERE e.facility_id = $1
GROUP BY e.id
ORDER BY e.production_line, e.name
""", facility_id)
if not rows:
raise HTTPException(404, "Facility not found or has no equipment")
return [dict(r) for r in rows]
@app.get("/sensors/{sensor_id}/readings",
response_model=list[SensorReadingResponse])
async def get_sensor_readings(
sensor_id: int,
start: datetime = Query(default_factory=lambda: datetime.utcnow() - timedelta(hours=24)),
end: datetime = Query(default_factory=datetime.utcnow),
bucket: str = Query(default="15 minutes",
description="Aggregation interval, e.g. '5 minutes', '1 hour'"),
):
"""Get time-series readings for a sensor with metadata context."""
async with pool.acquire() as conn:
# Verify sensor exists and get metadata
sensor = await conn.fetchrow("""
SELECT s.name, s.sensor_type, s.unit
FROM sensors s WHERE s.id = $1
""", sensor_id)
if not sensor:
raise HTTPException(404, "Sensor not found")
readings = await conn.fetch(f"""
SELECT
time_bucket('{bucket}', r.time) AS time,
AVG(r.value) AS value
FROM sensor_readings r
WHERE r.sensor_id = $1
AND r.time BETWEEN $2 AND $3
GROUP BY time_bucket('{bucket}', r.time)
ORDER BY time DESC
""", sensor_id, start, end)
return [
{
"time": r["time"],
"value": round(r["value"], 4),
"sensor_name": sensor["name"],
"sensor_type": sensor["sensor_type"],
"unit": sensor["unit"],
}
for r in readings
]
@app.get("/equipment/{equipment_id}/health",
response_model=EquipmentHealthResponse)
async def get_equipment_health(equipment_id: int):
"""
Combined health view: latest sensor readings + metadata + anomalies.
Single endpoint that crosses metadata and time-series boundaries.
"""
query_service = QueryService(pool)
health = await query_service.get_equipment_health(equipment_id)
if not health["equipment"]:
raise HTTPException(404, "Equipment not found")
return {
"equipment_id": equipment_id,
"equipment_name": health["equipment"]["name"],
"facility": health["equipment"]["facility_name"],
"status": health["overall_status"],
"sensors": health["sensors"],
"recent_anomalies": health["recent_anomalies"],
"last_maintenance": health["last_maintenance"],
}
@app.get("/facilities/{facility_id}/sensors/readings")
async def get_facility_readings(
facility_id: int,
sensor_type: Optional[str] = None,
manufacturer: Optional[str] = None,
production_line: Optional[str] = None,
start: datetime = Query(
default_factory=lambda: datetime.utcnow() - timedelta(hours=24)
),
end: datetime = Query(default_factory=datetime.utcnow),
bucket: str = "1 hour",
):
"""
Get aggregated readings for all sensors in a facility,
with optional metadata filters.
"""
conditions = ["f.id = $1", "r.time >= $2", "r.time <= $3"]
params = [facility_id, start, end]
idx = 4
if sensor_type:
conditions.append(f"s.sensor_type = ${idx}")
params.append(sensor_type)
idx += 1
if manufacturer:
conditions.append(f"e.manufacturer = ${idx}")
params.append(manufacturer)
idx += 1
if production_line:
conditions.append(f"e.production_line = ${idx}")
params.append(production_line)
idx += 1
where = " AND ".join(conditions)
async with pool.acquire() as conn:
rows = await conn.fetch(f"""
SELECT
time_bucket('{bucket}', r.time) AS time,
e.name AS equipment,
e.manufacturer,
s.name AS sensor,
s.sensor_type,
s.unit,
AVG(r.value) AS avg_value,
MAX(r.value) AS max_value,
MIN(r.value) AS min_value
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE {where}
GROUP BY time_bucket('{bucket}', r.time),
e.name, e.manufacturer, s.name, s.sensor_type, s.unit
ORDER BY time DESC
""", *params)
return [dict(r) for r in rows]
/equipment/{id}/health endpoint illustrates the value of combining metadata and time-series in a single API response. A dashboard can render equipment details, live sensor values, anomaly alerts, and maintenance history from a single API call.
Handling Scale
A system with 500 sensors at 1 Hz generates approximately 43 million readings per day. At 10 Hz, the figure rises to 432 million. Over the course of a year, this represents 15 to 150 billion rows. Without a data-lifecycle strategy, storage costs will grow linearly without limit.
Data Retention Policies
| Data Tier | Resolution | Retention | Storage | Use Case |
|---|---|---|---|---|
| Raw | Full resolution (1-1000 Hz) | 30 days | TimescaleDB (compressed) | Real-time dashboards, debugging |
| Downsampled | 1-minute or 5-minute averages | 1 year | TimescaleDB continuous aggregate | Trend analysis, weekly reports |
| Aggregated | Hourly or daily summaries | Forever | PostgreSQL regular table | Historical comparisons, audits |
| Archived | Full resolution | 7 years | Parquet on S3/Glacier | Compliance, ML retraining |
Implementing this with TimescaleDB:
-- Continuous aggregate: 5-minute downsampling (auto-maintained)
CREATE MATERIALIZED VIEW readings_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
sensor_id,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) AS median_value,
COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id
WITH NO DATA;
SELECT add_continuous_aggregate_policy('readings_5min',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '30 minutes',
schedule_interval => INTERVAL '30 minutes'
);
-- Continuous aggregate: hourly (built on top of 5-min aggregate)
CREATE MATERIALIZED VIEW readings_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', bucket) AS bucket,
sensor_id,
AVG(avg_value) AS avg_value,
MIN(min_value) AS min_value,
MAX(max_value) AS max_value,
SUM(sample_count) AS sample_count
FROM readings_5min
GROUP BY time_bucket('1 hour', bucket), sensor_id
WITH NO DATA;
SELECT add_continuous_aggregate_policy('readings_hourly',
start_offset => INTERVAL '4 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Drop raw data after 30 days
SELECT add_retention_policy('sensor_readings', INTERVAL '30 days');
-- Keep 5-minute aggregates for 1 year
SELECT add_retention_policy('readings_5min', INTERVAL '1 year');
add_retention_policy drops a chunk, the raw data are gone. Export to Parquet on S3 should precede retention if long-term raw data access is required for compliance or ML training.
Real-World Example: Manufacturing Plant
A complete real-world scenario ties the preceding elements together. Consider a manufacturing plant with the following configuration:
- 3 buildings (A, B, C) on a single campus
- 50 machines: 20 CNC machines (FANUC, DMG Mori), 15 robots (ABB, KUKA), 10 conveyors, 5 pumps
- 500 sensors: vibration, temperature, pressure, current, torque, flow rate
- Average sampling rate: 10 Hz (some vibration sensors at 1 kHz for spectral analysis)
The Schema
-- Seed the metadata
INSERT INTO facilities (name, location, facility_type, commissioned_date, status) VALUES
('Building A', 'North Campus, Chicago IL', 'manufacturing', '2019-03-15', 'active'),
('Building B', 'North Campus, Chicago IL', 'manufacturing', '2021-07-01', 'active'),
('Building C', 'North Campus, Chicago IL', 'warehouse', '2022-01-10', 'active');
-- Sample equipment (showing pattern, not all 50)
INSERT INTO equipment (facility_id, name, equipment_type, manufacturer, model,
serial_number, install_date, production_line, status,
operating_params) VALUES
(1, 'CNC-A01', 'cnc', 'FANUC', 'Robodrill a-D21MiB5', 'FN-2024-0891',
'2024-03-15', 'Line 1', 'operational',
'{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}'),
(1, 'CNC-A02', 'cnc', 'DMG Mori', 'DMU 50', 'DM-2023-4521',
'2023-09-01', 'Line 1', 'operational',
'{"max_spindle_rpm": 20000, "tool_capacity": 30, "axes": 5}'),
(1, 'Robot-A01', 'robot', 'ABB', 'IRB 6700', 'ABB-2024-1122',
'2024-06-10', 'Line 2', 'operational',
'{"axes": 6, "payload_kg": 150, "reach_mm": 2650}'),
(2, 'CNC-B01', 'cnc', 'FANUC', 'Robodrill a-D21LiB5ADV', 'FN-2024-1205',
'2024-11-20', 'Line 3', 'operational',
'{"max_spindle_rpm": 24000, "tool_capacity": 21, "axes": 5}');
-- Sensors for CNC-A01 (typical: vibration, temperature, spindle current)
INSERT INTO sensors (equipment_id, name, sensor_type, unit, sampling_rate_hz,
min_range, max_range, calibration_date, is_active, tags) VALUES
(1, 'CNC-A01-VIB-X', 'vibration', 'mm/s', 1000, 0, 50,
'2026-01-15', TRUE, '{"axis": "x", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-VIB-Y', 'vibration', 'mm/s', 1000, 0, 50,
'2026-01-15', TRUE, '{"axis": "y", "monitoring_group": "critical_24x7"}'),
(1, 'CNC-A01-TEMP-SPINDLE', 'temperature', 'celsius', 1, 10, 85,
'2026-02-01', TRUE, '{"location": "spindle_bearing"}'),
(1, 'CNC-A01-CURRENT', 'current', 'ampere', 10, 0, 30,
'2026-02-01', TRUE, '{"phase": "main_spindle"}');
Data Flow
In this plant the data flow proceeds as follows:
- Sensors output analogue/digital signals to edge PLCs (programmable logic controllers).
- Edge PLCs digitise and publish to an MQTT broker via the Sparkplug B protocol.
- Telegraf agents (one per building) subscribe to MQTT, buffer locally, and forward to the central database.
- TimescaleDB receives inserts via the Telegraf PostgreSQL output plugin.
- The ingestion validator (the Python script described earlier) runs as a sidecar, monitoring for unknown sensor IDs.
With 500 sensors averaging 10 Hz, the system handles approximately 5,000 inserts per second during normal operation, with bursts of up to 50,000 per second when high-frequency vibration captures are triggered. TimescaleDB on a single node (16 vCPU, 64 GB RAM, NVMe SSD) handles this load comfortably with batch inserts.
Dashboard Queries
The operations team uses a Grafana dashboard backed by the following queries:
-- Dashboard Panel 1: Plant Overview — current status of all equipment
SELECT
f.name AS building,
e.name AS machine,
e.equipment_type,
e.status AS equipment_status,
COUNT(s.id) FILTER (WHERE s.is_active) AS active_sensors,
COUNT(ae.id) FILTER (WHERE ae.severity IN ('high', 'critical')
AND ae.start_time > NOW() - INTERVAL '24 hours') AS critical_anomalies_24h,
MAX(ml.performed_at) AS last_maintenance
FROM equipment e
JOIN facilities f ON f.id = e.facility_id
LEFT JOIN sensors s ON s.equipment_id = e.id
LEFT JOIN anomaly_events ae ON ae.sensor_id = s.id
LEFT JOIN maintenance_logs ml ON ml.equipment_id = e.id
GROUP BY f.name, e.name, e.equipment_type, e.status
ORDER BY critical_anomalies_24h DESC, f.name, e.name;
-- Dashboard Panel 2: Vibration trends for Line 3 CNC machines (last 24h)
SELECT
time_bucket('15 minutes', r.time) AS period,
e.name AS machine,
AVG(r.value) AS avg_vibration,
MAX(r.value) AS peak_vibration
FROM sensor_readings r
JOIN sensors s ON s.id = r.sensor_id
JOIN equipment e ON e.id = s.equipment_id
WHERE e.production_line = 'Line 3'
AND e.equipment_type = 'cnc'
AND s.sensor_type = 'vibration'
AND r.time > NOW() - INTERVAL '24 hours'
GROUP BY period, e.name
ORDER BY period, e.name;
-- Dashboard Panel 3: Equipment needing attention
-- (sensors exceeding 80% of their max range)
SELECT
e.name AS machine,
s.name AS sensor,
s.sensor_type,
s.max_range,
latest.last_value,
ROUND((latest.last_value / s.max_range * 100)::numeric, 1) AS pct_of_max
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
CROSS JOIN LATERAL (
SELECT value AS last_value
FROM sensor_readings
WHERE sensor_id = s.id
ORDER BY time DESC
LIMIT 1
) latest
WHERE s.is_active = TRUE
AND s.max_range IS NOT NULL
AND latest.last_value > s.max_range * 0.8
ORDER BY pct_of_max DESC;
Anomaly Detection Integration
When an ML anomaly-detection model flags unusual behaviour, it writes to the anomaly_events table with full metadata context. A representative Python worker is shown below:
async def record_anomaly(
pool: asyncpg.Pool,
sensor_id: int,
anomaly_type: str,
severity: str,
value_at_detection: float,
model_version: str,
):
"""Record an anomaly event with metadata validation."""
async with pool.acquire() as conn:
# Validate sensor exists and get context for logging
sensor = await conn.fetchrow("""
SELECT s.name, s.sensor_type, s.max_range,
e.name AS equipment, f.name AS facility
FROM sensors s
JOIN equipment e ON e.id = s.equipment_id
JOIN facilities f ON f.id = e.facility_id
WHERE s.id = $1
""", sensor_id)
if not sensor:
raise ValueError(f"Sensor {sensor_id} not found in metadata")
anomaly_id = await conn.fetchval("""
INSERT INTO anomaly_events
(sensor_id, start_time, anomaly_type, severity,
value_at_detection, model_version)
VALUES ($1, NOW(), $2, $3, $4, $5)
RETURNING id
""", sensor_id, anomaly_type, severity, value_at_detection, model_version)
logger.warning(
f"Anomaly #{anomaly_id}: {severity} {anomaly_type} on "
f"{sensor['equipment']}/{sensor['name']} ({sensor['facility']}) "
f"value={value_at_detection} (max={sensor['max_range']})"
)
return anomaly_id
Common Pitfalls
The following errors recur most frequently across the sensor-data architectures the author has reviewed:
| Pitfall | Impact | Solution |
|---|---|---|
| Denormalizing metadata into every time-series row | 10-20x storage bloat, metadata updates require backfilling billions of rows | Store only sensor_id in time-series, JOIN at query time |
| No foreign key validation | Orphaned readings accumulate, 10-20% of data becomes unlinkable | Validate sensor_id at ingestion, run periodic quality checks |
| Single database for everything | Either metadata or time-series queries suffer poor performance | Use TimescaleDB (best of both) or a split architecture |
| Not planning for sensor changes | Historical data misinterpreted after recalibration or replacement | Implement SCD Type 2 for sensor history |
| Ignoring time zones | Time shifts corrupt analysis, especially across multi-site deployments | Always use TIMESTAMPTZ, store in UTC, convert at display time |
| Missing indexes on JOIN columns | Cross-domain queries take minutes instead of milliseconds | Index (sensor_id, time DESC) on time-series, all FKs on metadata |
| No retention policy | Storage costs grow linearly forever, query performance degrades | Tiered retention: raw (30d) → downsampled (1y) → archived (S3) |
| String-based sensor identification | Name changes break links, inconsistent naming across teams | Use integer IDs as primary key, names as human-readable labels |
Final Thoughts
Managing metadata and time-series data together is not a luxury; it is a fundamental requirement for any system seeking to derive actionable insights from sensor data. The sensor_id is the bridge between what the sensors are (metadata) and what they are measuring (time-series), and the architecture must make crossing that bridge in both directions straightforward.
For most teams, PostgreSQL with TimescaleDB is the appropriate starting point. It offers native SQL JOINs across metadata and time-series tables, a single connection string, familiar tooling, and excellent performance up to terabyte scale. Once metadata and sensor data are properly connected, feeding the data into modern time-series forecasting models becomes substantially simpler. When the system outgrows that platform, the patterns for InfluxDB integration, Parquet data lakes, and TDengine super tables provide a clear upgrade path.
The principal design principles are as follows:
- Separate but connected: Metadata in relational tables, time-series in optimised storage, linked by
sensor_id. - Sensor registry: Sensors should be treated as first-class entities with rich metadata (type, unit, range, calibration, sampling rate).
- Slowly changing dimensions: Metadata changes should be tracked over time so that historical data can be interpreted correctly.
- Validate at ingestion: A time-series reading should never be inserted without confirmation that the sensor exists in metadata.
- Tiered retention: Raw data (30 days) → downsampled (1 year) → aggregated (indefinite) → archived (cold storage). For the archival tier, an InfluxDB-to-Iceberg pipeline can move older data to S3 at a fraction of the cost.
- Index the bridge: Composite indexes on
(sensor_id, time DESC)render cross-domain queries fast.
The complete schema, ingestion pipeline, query patterns, and API design in this guide provide a production-ready blueprint. The recommended sequence is to begin with the PostgreSQL + TimescaleDB pattern, add the sensor registry and validation layer, implement continuous aggregates for downsampling, and construct the API layer with FastAPI. The resulting system will be one in which "show me all vibration anomalies from Building A's CNC machines installed after 2023" is a query that returns results in milliseconds rather than a question that leaves the team unable to respond.
References
- TimescaleDB Documentation — Official docs for hypertables, continuous aggregates, compression, and retention policies
- PostgreSQL ltree Extension — Hierarchical tree-like data type for modeling facility structures
- InfluxDB Documentation — Time-series database documentation including Flux query language
- TDengine Super Table Concepts — Understanding super tables, sub-tables, and tags
- Apache Parquet Format — Columnar storage format specification for data lake architectures
- DuckDB Documentation — In-process analytical database for querying Parquet files
- FastAPI Documentation — Modern Python web framework used in the API design examples
- SQLAlchemy Documentation — Python ORM for metadata table management
- Telegraf Plugin Documentation — Agent for collecting and writing metrics from MQTT, Modbus, and other sources
- MQTT Specification — Lightweight messaging protocol widely used in IoT sensor networks
Leave a Reply