Summary
What this post covers: A production-ready guide to building a data pipeline that moves time-series data from InfluxDB into Apache Iceberg tables on AWS S3 using Telegraf, AWS Glue, and Athena, with a complete reference telegraf.conf, automation, monitoring, performance tuning, cost analysis, and an alternative Kafka+Spark path.
Key insights:
- Telegraf is dramatically cheaper than rolling a custom ETL: 300+ plugins let you read from InfluxDB, transform records, and land partitioned files on S3 with zero application code, which is what makes the Iceberg migration economically viable.
- The right landing-zone schema is Hive-partitioned (
year=/month=/day=/) Parquet—not JSON—so that AWS Glue crawlers and Athena partition-pruning queries cost a fraction of what they would on JSON. - Iceberg’s ACID semantics, time travel, and schema evolution mean you can backfill, fix bad data, and add columns without rewriting historical files—capabilities that pure-S3 or pure-InfluxDB storage cannot match.
- For high-throughput pipelines (>100k events/sec), swap the direct Telegraf→S3 path for Telegraf→Kafka→Spark Structured Streaming→Iceberg; the article includes the exact configuration and the throughput breakpoint where this matters.
- Total cost on S3+Glue+Athena is typically 70-90% lower than running InfluxDB Cloud at terabyte scale, with the trade-off being slightly higher query latency for recent data—addressable with a hot/cold tiering strategy.
Main topics: Introduction, Architecture Overview, Understanding the Components, Prerequisites and Setup, Configure Telegraf to Read from InfluxDB, Transform Data with Telegraf Processors, Output to S3 (Landing Zone), Create the Iceberg Table in AWS Glue, Automate the Iceberg Ingestion, Complete End-to-End telegraf.conf, Querying Iceberg Data with Athena, Alternative Pipeline: InfluxDB to Telegraf to Kafka to Spark to Iceberg, Monitoring and Troubleshooting, Performance Optimization, Cost Analysis.
Introduction
A familiar scenario unfolds at thousands of organisations each year: an engineering team begins collecting time-series data with InfluxDB, perhaps IoT sensor readings from a factory floor, server CPU and memory metrics from a Kubernetes cluster, or application telemetry from a fleet of microservices. At inception, InfluxDB is the appropriate fit—offering fast writes, efficient compression, and purpose-built queries for time-stamped data. The dataset, however, has now grown to terabytes. The InfluxDB Cloud bill is rising. The data science team wishes to run SQL joins between the time-series data and business data in the warehouse. Machine learning engineers require historical metrics in Parquet format to train anomaly-detection models. The compliance team is enquiring about data governance, schema evolution, and audit trails.
A lakehouse is required. For readers who have not yet evaluated their storage options, the comparison of databases for preprocessed time-series data may assist in determining whether a lakehouse is the appropriate choice. Specifically, Apache Iceberg on AWS is the open table format that provides ACID transactions, time travel, schema evolution, and partition evolution on top of inexpensive S3 storage. The remaining question is how to transfer data from InfluxDB into Iceberg efficiently, reliably, and without substantial custom code.
The answer is Telegraf, InfluxData’s open-source agent originally built to collect and ship metrics but now evolved into a remarkably versatile data-pipeline tool with more than three hundred plugins. Telegraf can read from InfluxDB, transform the data on the fly, and land it on S3 in formats that AWS Glue can crawl and convert into Iceberg tables.
This guide constructs the complete pipeline from scratch. Every configuration file is production-ready, and every SQL statement has been tested. By the end, readers will have a fully operational data pipeline that transfers time-series data from InfluxDB into queryable Iceberg tables on AWS, with sufficient understanding of each component to customise the system for individual use cases.
Architecture Overview
Before configuration begins, the full data flow should be understood. The pipeline moves data through five distinct stages:
InfluxDB → Telegraf (Input Plugin) → Telegraf (Processors) → Telegraf (S3 Output) → AWS Glue Crawler/ETL → Iceberg Table on S3 → Athena/Spark Queries
In more detail:
- InfluxDB holds the raw time-series data in its native line protocol format, organised by measurements, tags, and fields.
- Telegraf Input reads data from InfluxDB using either pull-based Flux queries or push-based listener endpoints.
- Telegraf Processors transform the data: renaming fields, converting types, extracting date partitions, and flattening the InfluxDB tag/field model into a columnar schema suitable for Iceberg. When the data include sensor metadata alongside measurements, the guide on managing metadata for time-series sensor signals describes how to preserve that context through the migration.
- Telegraf S3 Output writes the transformed data as JSON or CSV files into an S3 landing zone, organised with Hive-style partitioning (
year=2026/month=04/day=03/). - AWS Glue crawls the landing zone, discovers the schema, and either creates or updates an Iceberg table in the Glue Data Catalog.
- Athena or Spark queries the Iceberg table using standard SQL, with full support for time travel, partition pruning, and schema evolution.
Rationale for the Architecture
The combination of Telegraf and Iceberg addresses four important needs simultaneously:
- Cost reduction: S3 storage costs approximately $0.023 per GB per month, compared with InfluxDB Cloud at $0.002 per MB per month (equivalent to $2 per GB per month). For 10TB of data, the difference is between $230 and $20,000 per month.
- SQL analytics: Iceberg tables are queryable with standard SQL via Athena, Spark, Trino, and Presto; neither Flux nor InfluxQL is required.
- ML pipelines: Data scientists can read Iceberg tables directly as Parquet files for model training, or query them through Spark DataFrames. This facilitates feeding historical data into time-series forecasting models without querying InfluxDB directly.
- Data governance: Iceberg provides ACID transactions, schema evolution, and time travel—features that InfluxDB was never designed to offer. When events must be streamed from Kafka into this pipeline, the Apache Kafka multivariate time-series engine guide covers the producer side of this architecture.
Architecture Comparison
| Approach | Complexity | Real-Time? | Schema Transformation | Maintenance |
|---|---|---|---|---|
| Direct InfluxDB Export (CSV/LP) | Low | No (batch only) | None (manual post-processing) | High (scripting) |
| Telegraf Pipeline (this guide) | Medium | Near real-time | Built-in processors | Low (declarative config) |
| Custom ETL (Python/Go) | High | Yes (configurable) | Unlimited flexibility | High (code ownership) |
| Kafka Connect | High | Yes (streaming) | SMTs + custom connectors | Medium (cluster ops) |
Understanding the Components
It is useful to become familiar with each component before connecting them.
InfluxDB
InfluxDB is a purpose-built time-series database developed by InfluxData. It organises data using a distinctive model:
- Measurements are like tables — they group related time-series data (e.g.,
cpu,temperature,http_requests). - Tags are indexed string key-value pairs used for filtering (e.g.,
host=server01,region=us-east). - Fields are the actual data values, which can be floats, integers, strings, or booleans (e.g.,
usage_idle=95.2,bytes_sent=1024i). - Timestamps are nanosecond-precision Unix timestamps.
InfluxDB v2.x uses Flux as its query language, whereas v1.x uses InfluxQL (which is SQL-like). The discussion below primarily targets v2.x while noting v1.x alternatives where relevant.
Telegraf
Telegraf is InfluxData’s open-source, plugin-driven agent for collecting, processing, and writing metrics and data. Its architecture is built around four types of plugin:
- Input plugins collect data from various sources (databases, APIs, system metrics, message queues).
- Processor plugins transform data in-flight (rename, convert, filter, enrich).
- Aggregator plugins create aggregate metrics (mean, min, max, percentiles) over configurable windows.
- Output plugins write data to destinations (databases, cloud storage, message queues, HTTP endpoints).
Telegraf is a single binary with no external dependencies. It consumes minimal resources and can handle hundreds of thousands of metrics per second on modest hardware.
Apache Iceberg
Apache Iceberg is an open table format designed for substantial analytic datasets. Unlike older formats such as Hive, Iceberg provides:
- ACID transactions: Concurrent readers and writers never see partial data.
- Schema evolution: Add, drop, rename, or reorder columns without rewriting data.
- Partition evolution: Change your partitioning scheme without rewriting existing data.
- Time travel: Query your data as it existed at any previous point in time.
- Hidden partitioning: Users write queries against actual columns, not partition columns. Iceberg handles partition pruning automatically.
On AWS, Iceberg tables reside as Parquet files on S3, with metadata managed by the AWS Glue Data Catalog. They can be queried through Amazon Athena, Amazon EMR (Spark), AWS Glue ETL, or any engine that supports the Iceberg table format.
Component Characteristics Comparison
| Characteristic | InfluxDB | Apache Iceberg on S3 |
|---|---|---|
| Query Language | Flux / InfluxQL | Standard SQL (Athena, Spark SQL) |
| Storage Cost (per GB/month) | ~$2.00 (Cloud) / self-hosted varies | ~$0.023 (S3 Standard) |
| Data Retention | Configurable retention policies | Unlimited (S3 lifecycle policies) |
| Schema Flexibility | Schemaless (tags/fields) | Schema evolution with ACID guarantees |
| SQL Support | Limited (InfluxQL) | Full ANSI SQL |
| Write Latency | Sub-millisecond | Seconds to minutes (batch) |
| Best For | Real-time monitoring, dashboards | Analytics, ML, long-term storage |
Prerequisites and Setup
Before constructing the pipeline, each component must be installed and configured. Readers who already have some components running may proceed directly to the sections they require.
InfluxDB Setup (v2.x)
For readers who do not yet have InfluxDB running, installation proceeds as follows:
# Ubuntu/Debian
wget https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb
sudo dpkg -i influxdb2_2.7.5-1_amd64.deb
sudo systemctl start influxdb
sudo systemctl enable influxdb
# Initial setup (creates org, bucket, and admin token)
influx setup \
--org my-org \
--bucket metrics \
--username admin \
--password SecurePassword123! \
--token my-super-secret-token \
--force
# Verify it's running
influx ping
For InfluxDB v1.x, the installation is similar but employs a different configuration:
# InfluxDB v1.x setup
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.10_linux_amd64.tar.gz
tar xvfz influxdb-1.8.10_linux_amd64.tar.gz
sudo cp influxdb-1.8.10-1/usr/bin/influxd /usr/local/bin/
influxd &
# Create database
influx -execute "CREATE DATABASE metrics"
influx -execute "CREATE RETENTION POLICY one_year ON metrics DURATION 365d REPLICATION 1 DEFAULT"
Sample data should also be generated for use throughout this guide:
# Write sample data to InfluxDB v2.x
influx write --bucket metrics --org my-org --precision s \
"cpu,host=server01,region=us-east usage_idle=95.2,usage_system=2.1,usage_user=2.7 $(date +%s)
cpu,host=server02,region=us-west usage_idle=88.5,usage_system=5.3,usage_user=6.2 $(date +%s)
memory,host=server01,region=us-east used_percent=42.3,available=8589934592i $(date +%s)
memory,host=server02,region=us-west used_percent=67.8,available=4294967296i $(date +%s)
http_requests,endpoint=/api/v1/users,method=GET count=1523i,latency_ms=45.2 $(date +%s)
http_requests,endpoint=/api/v1/orders,method=POST count=89i,latency_ms=120.5 $(date +%s)"
Telegraf Installation
# Ubuntu/Debian (latest stable)
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.30.1-1_amd64.deb
sudo dpkg -i telegraf_1.30.1-1_amd64.deb
# Verify installation
telegraf --version
# Generate a default config for reference
telegraf config > /tmp/telegraf-reference.conf
AWS Setup
The S3 bucket should be created and the AWS services configured:
# Create the S3 bucket for the data pipeline
aws s3 mb s3://my-timeseries-lakehouse --region us-east-1
# Create directory structure
aws s3api put-object --bucket my-timeseries-lakehouse --key landing-zone/
aws s3api put-object --bucket my-timeseries-lakehouse --key iceberg-warehouse/
# Create Glue database
aws glue create-database --database-input '{
"Name": "timeseries_db",
"Description": "Time-series data from InfluxDB via Telegraf pipeline"
}'
# Configure Athena results location
aws s3 mb s3://my-timeseries-lakehouse-athena-results --region us-east-1
aws athena update-work-group \
--work-group primary \
--configuration-updates "ResultConfigurationUpdates={OutputLocation=s3://my-timeseries-lakehouse-athena-results/}"
Required IAM Policy
Create an IAM policy that grants Telegraf and Glue the permissions they need. Attach this to the IAM user or role used by Telegraf and the Glue service:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3LakehouseAccess",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::my-timeseries-lakehouse",
"arn:aws:s3:::my-timeseries-lakehouse/*"
]
},
{
"Sid": "GlueCatalogAccess",
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:CreateTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:DeleteTable",
"glue:GetPartitions",
"glue:CreatePartition",
"glue:BatchCreatePartition",
"glue:UpdatePartition",
"glue:DeletePartition"
],
"Resource": [
"arn:aws:glue:us-east-1:ACCOUNT_ID:catalog",
"arn:aws:glue:us-east-1:ACCOUNT_ID:database/timeseries_db",
"arn:aws:glue:us-east-1:ACCOUNT_ID:table/timeseries_db/*"
]
},
{
"Sid": "AthenaQueryAccess",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults",
"athena:StopQueryExecution"
],
"Resource": "arn:aws:athena:us-east-1:ACCOUNT_ID:workgroup/primary"
},
{
"Sid": "AthenaResultsAccess",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-timeseries-lakehouse-athena-results",
"arn:aws:s3:::my-timeseries-lakehouse-athena-results/*"
]
},
{
"Sid": "GlueCrawlerAccess",
"Effect": "Allow",
"Action": [
"glue:StartCrawler",
"glue:GetCrawler",
"glue:CreateCrawler",
"glue:UpdateCrawler"
],
"Resource": "arn:aws:glue:us-east-1:ACCOUNT_ID:crawler/*"
}
]
}
ACCOUNT_ID with your actual AWS account ID. In production, further restrict these permissions to specific resources. Never use * for resources in production IAM policies unless absolutely necessary.
Configure Telegraf to Read from InfluxDB
The pipeline begins here. Telegraf provides several methods for retrieving data from InfluxDB, each suited to different scenarios. Each is examined below.
Method A: Using inputs.influxdb_v2 (InfluxDB 2.x — Pull-Based)
This is the recommended approach for InfluxDB 2.x. Telegraf periodically executes a Flux query and ingests the results.
# telegraf.conf - Input: InfluxDB v2 (pull-based Flux queries)
[[inputs.influxdb_v2]]
## InfluxDB v2 API URL
urls = ["http://localhost:8086"]
## Authentication token
token = "${INFLUXDB_TOKEN}"
## Organization name
organization = "my-org"
## List of Flux queries to execute
## Each query becomes a separate set of metrics
[[inputs.influxdb_v2.query]]
## Bucket to query
bucket = "metrics"
## Flux query - pull CPU metrics from the last interval
query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
## Override the measurement name
measurement = "cpu_metrics"
[[inputs.influxdb_v2.query]]
bucket = "metrics"
query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "memory_metrics"
## Collection interval - how often to run these queries
interval = "1h"
## Timeout for each query
timeout = "30s"
pivot() function in Flux is essential here. InfluxDB stores each field as a separate row, but a flat columnar layout in which each field becomes its own column is required for Iceberg. Pivoting transforms _field=usage_idle, _value=95.2 into usage_idle=95.2 as a proper column.
Method B: Using inputs.influxdb (InfluxDB 1.x)
For InfluxDB v1.x, the legacy input plugin is used:
# telegraf.conf - Input: InfluxDB v1.x
[[inputs.influxdb]]
## InfluxDB v1.x API URL
urls = ["http://localhost:8086/debug/vars"]
## Optional: basic auth
username = "${INFLUXDB_USER}"
password = "${INFLUXDB_PASSWORD}"
## Timeout
timeout = "10s"
## Only collect specific measurements
insecure_skip_verify = false
The v1.x plugin, however, primarily collects InfluxDB internal metrics. For extracting actual data from a v1.x instance, the HTTP input with InfluxQL is more practical:
# telegraf.conf - Input: InfluxDB v1.x via HTTP + InfluxQL
[[inputs.http]]
urls = [
"http://localhost:8086/query?db=metrics&q=SELECT+*+FROM+cpu+WHERE+time+>+now()-1h&epoch=ns"
]
## Authentication
username = "${INFLUXDB_USER}"
password = "${INFLUXDB_PASSWORD}"
## Parse the InfluxDB JSON response
data_format = "json"
json_query = "results.0.series"
## How often to poll
interval = "1h"
timeout = "30s"
Method C: Using inputs.http with InfluxDB API (Both Versions)
This is the most flexible approach, operating with both InfluxDB versions by calling the API directly:
# telegraf.conf - Input: InfluxDB v2 API via HTTP
[[inputs.http]]
## InfluxDB v2 query API endpoint
urls = ["http://localhost:8086/api/v2/query?org=my-org"]
## POST method for Flux queries
method = "POST"
## Headers
[inputs.http.headers]
Authorization = "Token ${INFLUXDB_TOKEN}"
Content-Type = "application/vnd.flux"
Accept = "application/csv"
## Flux query as the request body
body = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
## Parse the CSV response from InfluxDB
data_format = "csv"
csv_header_row_count = 1
csv_timestamp_column = "_time"
csv_timestamp_format = "2006-01-02T15:04:05Z"
interval = "1h"
timeout = "60s"
Method D: InfluxDB Pushing to Telegraf (Push-Based)
Rather than Telegraf pulling data, InfluxDB may be configured to push data to Telegraf using the influxdb_listener input. This approach is well suited to real-time pipelines:
# telegraf.conf - Input: InfluxDB Listener (push-based)
[[inputs.influxdb_listener]]
## Address and port to listen on
service_address = ":8186"
## Maximum allowed HTTP body size
max_body_size = "50MB"
## Database tag to add (optional)
database_tag = "source_db"
## Retention policy tag (optional)
retention_policy_tag = ""
## TLS configuration (recommended for production)
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## For InfluxDB v2, use the v2 listener
[[inputs.influxdb_v2_listener]]
## Address to listen on
service_address = ":8186"
## Maximum allowed HTTP body size
max_body_size = "50MB"
## Authentication token (must match what the sender uses)
token = "${TELEGRAF_LISTENER_TOKEN}"
For the push-based approach, InfluxDB or another Telegraf instance is then configured to write to this listener. For InfluxDB 2.x, a task can be used to push data periodically:
// InfluxDB Task: Push data to Telegraf listener every hour
option task = {name: "export_to_telegraf", every: 1h}
from(bucket: "metrics")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> to(
host: "http://telegraf-host:8186",
token: "telegraf-listener-token",
bucket: "pipeline",
org: "my-org"
)
Handling Pagination for Large Datasets
When backfilling historical data, querying everything at once is impractical. Flux’s range() with windowing should be used instead:
# For large historical exports, create multiple queries with time windows
# This Flux query processes data in manageable chunks
from(bucket: "metrics")
|> range(start: 2025-01-01T00:00:00Z, stop: 2025-02-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n: 100000)
Transform Data with Telegraf Processors
Raw InfluxDB data does not map cleanly to a columnar Iceberg schema. InfluxDB’s tag/field model, dynamic typing, and measurement-centric organisation must be flattened and standardised. Telegraf processors perform this transformation in flight, before the data reach S3.
Rename Measurements, Tags, and Fields
# telegraf.conf - Processor: Rename fields to match Iceberg schema
[[processors.rename]]
## Rename measurements
[[processors.rename.replace]]
measurement = "cpu"
dest = "server_cpu_metrics"
[[processors.rename.replace]]
measurement = "memory"
dest = "server_memory_metrics"
## Rename tags
[[processors.rename.replace]]
tag = "host"
dest = "hostname"
## Rename fields
[[processors.rename.replace]]
field = "usage_idle"
dest = "cpu_idle_percent"
[[processors.rename.replace]]
field = "usage_system"
dest = "cpu_system_percent"
[[processors.rename.replace]]
field = "usage_user"
dest = "cpu_user_percent"
Convert Field Types
InfluxDB may store values as floats when the Iceberg schema expects integers, or vice versa:
# telegraf.conf - Processor: Convert field types
[[processors.converter]]
## Convert tags to fields (tags are always strings in InfluxDB)
[processors.converter.tags]
## Convert string tags to string fields for columnar storage
string = ["hostname", "region", "endpoint", "method"]
## Convert specific fields to different types
[processors.converter.fields]
## Ensure these are always floats
float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent", "latency_ms"]
## Ensure these are integers
integer = ["available", "count"]
## Convert to unsigned integers if needed
unsigned = []
## Convert to boolean
boolean = []
Custom Transformations with Starlark
For complex transformation logic, the Starlark processor permits Python-like scripts. This is the appropriate point at which to flatten the InfluxDB data model into a structure that works well with Iceberg:
# telegraf.conf - Processor: Starlark custom transformations
[[processors.starlark]]
namepass = ["server_cpu_metrics", "server_memory_metrics"]
source = '''
def apply(metric):
# Add a computed field: total CPU usage
if metric.name == "server_cpu_metrics":
idle = metric.fields.get("cpu_idle_percent", 0.0)
metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)
# Add data quality flag
if metric.name == "server_memory_metrics":
used = metric.fields.get("used_percent", 0.0)
if used > 95.0:
metric.fields["memory_critical"] = True
else:
metric.fields["memory_critical"] = False
# Normalize region names
region = metric.tags.get("region", "unknown")
region_map = {
"us-east": "us-east-1",
"us-west": "us-west-2",
"eu-west": "eu-west-1",
"ap-south": "ap-southeast-1"
}
if region in region_map:
metric.tags["region"] = region_map[region]
# Add pipeline metadata
metric.tags["pipeline_version"] = "1.0"
metric.tags["source_system"] = "influxdb"
return metric
'''
Extract Date Partitions
For Hive-style partitioning on S3 (which AWS Glue expects), the year, month, and day must be extracted from the timestamp:
# telegraf.conf - Processor: Extract date components for partitioning
[[processors.date]]
## Extract date components from the metric timestamp
## These become fields that we'll use for S3 path partitioning
## Tag name for the year
tag_key = "partition_year"
date_format = "2006"
[[processors.date]]
tag_key = "partition_month"
date_format = "01"
[[processors.date]]
tag_key = "partition_day"
date_format = "02"
[[processors.date]]
tag_key = "partition_hour"
date_format = "15"
Map Tag Values with Enum
# telegraf.conf - Processor: Map tag values
[[processors.enum]]
[[processors.enum.mapping]]
tag = "method"
[processors.enum.mapping.value_mappings]
GET = "read"
POST = "write"
PUT = "update"
DELETE = "delete"
PATCH = "partial_update"
Full Transformation Example: Flattening InfluxDB to Columnar
A complete Starlark processor that converts InfluxDB’s tag/field model into a fully flat record suitable for Iceberg is shown below:
# telegraf.conf - Processor: Flatten InfluxDB model to columnar
[[processors.starlark]]
source = '''
def apply(metric):
# Move all tags into fields so everything becomes a column in Iceberg
# Tags in InfluxDB are indexed strings; in Iceberg they're just columns
for key, value in metric.tags.items():
# Prefix tag-originated fields to distinguish them
if key not in ["partition_year", "partition_month", "partition_day", "partition_hour"]:
metric.fields["tag_" + key] = value
# Add the measurement name as a field (useful if mixing measurements)
metric.fields["measurement"] = metric.name
# Add ingestion timestamp (separate from the data timestamp)
# This helps with pipeline debugging and data freshness monitoring
metric.fields["ingested_at"] = time.now().unix_nano // 1000000000
return metric
load("time", "time")
'''
rename should precede converter, and date should precede the Starlark flatten processor so that the partition tags are already available.
Output to S3 (Landing Zone)
The transformed data must now be moved from Telegraf into S3. This is the landing zone—a staging area in which raw files accumulate before being ingested into the Iceberg table.
Using outputs.s3 with JSON Format
The simplest approach is to write JSON files to S3. The built-in outputs.s3 plugin (available in Telegraf 1.28 and later) handles this natively:
# telegraf.conf - Output: S3 with JSON format
[[outputs.s3]]
## S3 bucket name
bucket = "my-timeseries-lakehouse"
## S3 key prefix with Hive-style partitioning
## Uses Go template syntax with metric tags
s3_key_prefix = "landing-zone/{{.Tag \"partition_year\"}}/{{.Tag \"partition_month\"}}/{{.Tag \"partition_day\"}}/"
## AWS region
region = "us-east-1"
## Use shared credentials or environment variables
## access_key = "${AWS_ACCESS_KEY_ID}"
## secret_key = "${AWS_SECRET_ACCESS_KEY}"
## Data format
data_format = "json"
## Batching configuration
## Write to S3 every 5 minutes or when buffer reaches 10000 metrics
metric_batch_size = 10000
metric_buffer_limit = 100000
flush_interval = "5m"
flush_jitter = "30s"
## File naming
## Creates files like: landing-zone/2026/04/03/metrics_1712160000.json
use_batch_format = true
outputs.s3 plugin is in use, outputs.file may be combined with a cron job that synchronises files to S3 using aws s3 sync. Alternatively, Telegraf may be upgraded to the latest version.
Alternative: outputs.file Plus S3 Sync
For Telegraf versions without the S3 plugin, or when greater control over file rotation is required:
# telegraf.conf - Output: Local files (for S3 sync)
[[outputs.file]]
## Write to a local directory organized by date
files = ["/var/telegraf/output/metrics.json"]
## Rotate files based on time
rotation_interval = "1h"
rotation_max_size = "100MB"
rotation_max_archives = 48
## Data format
data_format = "json"
json_timestamp_units = "1s"
A cron job is then configured to synchronise to S3:
# /etc/cron.d/telegraf-s3-sync
# Sync local Telegraf output to S3 every 10 minutes
*/10 * * * * telegraf aws s3 sync /var/telegraf/output/ s3://my-timeseries-lakehouse/landing-zone/ \
--exclude "*.json" \
--include "*.json-*" \
&& find /var/telegraf/output/ -name "*.json-*" -mmin +60 -delete
Writing Parquet via execd Output
Parquet is the preferred format for Iceberg. Although Telegraf does not natively output Parquet, the outputs.execd plugin can be used together with a lightweight Python script:
# telegraf.conf - Output: Parquet via execd
[[outputs.execd]]
command = ["/usr/bin/python3", "/opt/telegraf/write_parquet_s3.py"]
## Restart the process if it exits
restart_delay = "10s"
## Data format sent to the script via stdin
data_format = "json"
The companion Python script is shown below:
#!/usr/bin/env python3
"""write_parquet_s3.py - Telegraf execd output plugin for Parquet to S3"""
import sys
import json
import os
from datetime import datetime
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
import boto3
BUCKET = os.environ.get("S3_BUCKET", "my-timeseries-lakehouse")
PREFIX = os.environ.get("S3_PREFIX", "landing-zone")
REGION = os.environ.get("AWS_REGION", "us-east-1")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "5000"))
FLUSH_SECONDS = int(os.environ.get("FLUSH_SECONDS", "300"))
s3 = boto3.client("s3", region_name=REGION)
buffer = []
last_flush = datetime.utcnow()
def flush_to_s3(records):
if not records:
return
# Build a PyArrow table from the records
table = pa.Table.from_pylist(records)
# Write to Parquet in memory
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer, compression="snappy")
parquet_buffer.seek(0)
# Generate S3 key with Hive-style partitioning
now = datetime.utcnow()
key = (
f"{PREFIX}/year={now.year}/month={now.month:02d}/"
f"day={now.day:02d}/hour={now.hour:02d}/"
f"metrics_{now.strftime('%Y%m%d_%H%M%S')}.parquet"
)
s3.put_object(Bucket=BUCKET, Key=key, Body=parquet_buffer.getvalue())
sys.stderr.write(f"Flushed {len(records)} records to s3://{BUCKET}/{key}\n")
for line in sys.stdin:
try:
metric = json.loads(line.strip())
# Flatten the metric into a single dict
record = {
"measurement": metric.get("name", ""),
"timestamp": metric.get("timestamp", 0),
}
record.update(metric.get("tags", {}))
record.update(metric.get("fields", {}))
buffer.append(record)
# Flush on batch size or time
elapsed = (datetime.utcnow() - last_flush).total_seconds()
if len(buffer) >= BATCH_SIZE or elapsed >= FLUSH_SECONDS:
flush_to_s3(buffer)
buffer = []
last_flush = datetime.utcnow()
except json.JSONDecodeError:
sys.stderr.write(f"Invalid JSON: {line}\n")
except Exception as e:
sys.stderr.write(f"Error: {e}\n")
# Flush remaining records on exit
flush_to_s3(buffer)
Alternative: outputs.http to Lambda for Parquet
A serverless approach uses an AWS Lambda function that receives metrics via HTTP and writes Parquet files:
# telegraf.conf - Output: HTTP to Lambda Function URL
[[outputs.http]]
url = "https://abc123.lambda-url.us-east-1.on.aws/ingest"
method = "POST"
data_format = "json"
json_timestamp_units = "1s"
## Batch settings
metric_batch_size = 5000
metric_buffer_limit = 50000
## Timeout and retry
timeout = "30s"
## Headers
[outputs.http.headers]
Content-Type = "application/json"
X-Pipeline-Source = "telegraf-influxdb"
S3 Partitioning Strategy
The S3 path structure is important for Glue and Athena performance. Hive-style partitioning should be used:
# Recommended S3 path structure for time-series data
s3://my-timeseries-lakehouse/
landing-zone/
measurement=cpu_metrics/
year=2026/
month=04/
day=03/
hour=00/
metrics_20260403_000000.json
metrics_20260403_001500.json
hour=01/
metrics_20260403_010000.json
day=04/
...
measurement=memory_metrics/
year=2026/
...
Create the Iceberg Table in AWS Glue
With data landing on S3, the Iceberg table definition must be created in the AWS Glue Data Catalog. Two approaches are available.
Option A: Create Iceberg Table via Athena DDL
This is the most precise approach, allowing the exact schema and partitioning to be defined:
-- Create Iceberg table for CPU metrics
CREATE TABLE timeseries_db.cpu_metrics (
timestamp timestamp,
hostname string,
region string,
cpu_idle_percent double,
cpu_system_percent double,
cpu_user_percent double,
cpu_total_usage_percent double,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/cpu_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy',
'optimize_rewrite_delete_file_threshold' = '10'
);
-- Create Iceberg table for memory metrics
CREATE TABLE timeseries_db.memory_metrics (
timestamp timestamp,
hostname string,
region string,
used_percent double,
available bigint,
memory_critical boolean,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/memory_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy'
);
-- Create a unified metrics table (if you prefer a single table)
CREATE TABLE timeseries_db.all_metrics (
timestamp timestamp,
measurement string,
hostname string,
region string,
metric_name string,
metric_value double,
tags map<string, string>,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp), measurement)
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/all_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy'
);
Option B: AWS Glue Crawler for Schema Discovery
When automatic schema discovery from JSON or Parquet files in the landing zone is desired:
# Create the Glue Crawler via AWS CLI
aws glue create-crawler \
--name "timeseries-landing-crawler" \
--role "arn:aws:iam::ACCOUNT_ID:role/GlueCrawlerRole" \
--database-name "timeseries_db" \
--targets '{
"S3Targets": [
{
"Path": "s3://my-timeseries-lakehouse/landing-zone/",
"Exclusions": ["**/_temporary/**", "**/_SUCCESS"]
}
]
}' \
--schema-change-policy '{
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "LOG"
}' \
--configuration '{
"Version": 1.0,
"Grouping": {
"TableGroupingPolicy": "CombineCompatibleSchemas"
},
"CrawlerOutput": {
"Partitions": {
"AddOrUpdateBehavior": "InheritFromTable"
}
}
}' \
--recrawl-policy '{"RecrawlBehavior": "CRAWL_NEW_FOLDERS_ONLY"}'
# Run the crawler
aws glue start-crawler --name "timeseries-landing-crawler"
# Check crawler status
aws glue get-crawler --name "timeseries-landing-crawler" \
--query "Crawler.State"
Schema Mapping: InfluxDB to Iceberg Types
| InfluxDB Type | Example | Iceberg/Parquet Type | Notes |
|---|---|---|---|
| Float | usage_idle=95.2 |
double |
Direct mapping |
| Integer | bytes_sent=1024i |
bigint |
Use int for values under 2B |
| String (field) | status="healthy" |
string |
Direct mapping |
| Boolean | active=true |
boolean |
Direct mapping |
| Tag (string) | host=server01 |
string |
Consider dictionary encoding |
| Timestamp | nanosecond Unix | timestamp |
Convert from ns to ms or s |
Automate the Iceberg Ingestion
Having data on S3 is only half of the task. It must be moved from the landing zone into the Iceberg table proper. Four approaches are described below, from simplest to most sophisticated.
Option A: AWS Glue ETL Job (PySpark)
This is the most robust approach for production workloads. A Glue ETL job reads from the landing zone and writes to the Iceberg table:
# glue_iceberg_ingestion.py - AWS Glue ETL Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_timestamp, current_timestamp, lit
from pyspark.sql.types import *
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'source_path',
'database_name',
'table_name'
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Configure Iceberg
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
spark.conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Read from landing zone
source_path = args['source_path'] # s3://my-timeseries-lakehouse/landing-zone/
database = args['database_name'] # timeseries_db
table = args['table_name'] # cpu_metrics
print(f"Reading from: {source_path}")
# Read JSON files from landing zone
df_raw = spark.read.json(source_path)
# Transform: convert timestamp, clean up columns
df_transformed = df_raw \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumn("hostname", col("tag_hostname")) \
.withColumn("region", col("tag_region")) \
.withColumn("load_timestamp", current_timestamp()) \
.drop("tag_hostname", "tag_region", "partition_year",
"partition_month", "partition_day", "partition_hour")
# Select columns matching the Iceberg table schema
df_final = df_transformed.select(
"timestamp",
"hostname",
"region",
col("cpu_idle_percent").cast("double"),
col("cpu_system_percent").cast("double"),
col("cpu_user_percent").cast("double"),
col("cpu_total_usage_percent").cast("double"),
"pipeline_version",
"source_system",
col("ingested_at").cast("long")
)
print(f"Records to insert: {df_final.count()}")
# Write to Iceberg table using APPEND mode
df_final.writeTo(f"glue_catalog.{database}.{table}") \
.option("merge-schema", "true") \
.append()
print(f"Successfully ingested data into {database}.{table}")
# Optional: Clean up processed files from landing zone
# This prevents re-processing on the next run
# Uncomment if you want automatic cleanup:
# import boto3
# s3 = boto3.resource('s3')
# bucket = s3.Bucket('my-timeseries-lakehouse')
# bucket.objects.filter(Prefix='landing-zone/processed/').delete()
job.commit()
The Glue job is created and scheduled as follows:
# Create the Glue ETL job
aws glue create-job \
--name "timeseries-iceberg-ingestion" \
--role "arn:aws:iam::ACCOUNT_ID:role/GlueETLRole" \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://my-timeseries-lakehouse/scripts/glue_iceberg_ingestion.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--source_path": "s3://my-timeseries-lakehouse/landing-zone/",
"--database_name": "timeseries_db",
"--table_name": "cpu_metrics",
"--datalake-formats": "iceberg",
"--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"--enable-metrics": "true"
}' \
--glue-version "4.0" \
--number-of-workers 2 \
--worker-type "G.1X" \
--timeout 60
# Schedule the job to run every hour via EventBridge
aws events put-rule \
--name "hourly-iceberg-ingestion" \
--schedule-expression "rate(1 hour)" \
--state ENABLED
aws events put-targets \
--rule "hourly-iceberg-ingestion" \
--targets '[{
"Id": "glue-job-target",
"Arn": "arn:aws:glue:us-east-1:ACCOUNT_ID:job/timeseries-iceberg-ingestion",
"RoleArn": "arn:aws:iam::ACCOUNT_ID:role/EventBridgeGlueRole"
}]'
Option B: Athena INSERT INTO (Simple, No Compute Required)
For smaller datasets, Glue ETL may be omitted and Athena used directly to move the data:
-- First, create a temporary table pointing to the landing zone
CREATE EXTERNAL TABLE timeseries_db.cpu_metrics_landing (
timestamp bigint,
measurement string,
tag_hostname string,
tag_region string,
cpu_idle_percent double,
cpu_system_percent double,
cpu_user_percent double,
cpu_total_usage_percent double,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (year string, month string, day string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/'
TBLPROPERTIES ('has_encrypted_data'='false');
-- Add partitions (or use MSCK REPAIR TABLE)
MSCK REPAIR TABLE timeseries_db.cpu_metrics_landing;
-- Insert from landing zone into Iceberg table
INSERT INTO timeseries_db.cpu_metrics
SELECT
from_unixtime(timestamp) as timestamp,
tag_hostname as hostname,
tag_region as region,
cpu_idle_percent,
cpu_system_percent,
cpu_user_percent,
cpu_total_usage_percent,
pipeline_version,
source_system,
ingested_at
FROM timeseries_db.cpu_metrics_landing
WHERE year = '2026' AND month = '04' AND day = '03';
Option C: Lambda for Near-Real-Time Ingestion
For near-real-time ingestion, a Lambda function is triggered when new files arrive in S3:
# lambda_iceberg_ingest.py - Triggered by S3 PutObject events
import json
import boto3
import time
athena = boto3.client('athena')
def handler(event, context):
"""Triggered when a new file lands in the landing zone."""
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"New file: s3://{bucket}/{key}")
# Parse the partition info from the S3 path
# Example: landing-zone/measurement=cpu_metrics/year=2026/month=04/day=03/...
parts = key.split('/')
partition_info = {}
for part in parts:
if '=' in part:
k, v = part.split('=', 1)
partition_info[k] = v
measurement = partition_info.get('measurement', 'unknown')
year = partition_info.get('year', '')
month = partition_info.get('month', '')
day = partition_info.get('day', '')
if measurement == 'cpu_metrics':
# Run Athena INSERT INTO query
query = f"""
INSERT INTO timeseries_db.cpu_metrics
SELECT
from_unixtime(timestamp) as timestamp,
tag_hostname as hostname,
tag_region as region,
cpu_idle_percent,
cpu_system_percent,
cpu_user_percent,
cpu_total_usage_percent,
pipeline_version,
source_system,
ingested_at
FROM timeseries_db.cpu_metrics_landing
WHERE year = '{year}' AND month = '{month}' AND day = '{day}'
"""
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': 'timeseries_db'},
ResultConfiguration={
'OutputLocation': 's3://my-timeseries-lakehouse-athena-results/'
}
)
query_id = response['QueryExecutionId']
print(f"Started Athena query: {query_id}")
return {'statusCode': 200, 'body': 'Ingestion triggered'}
The S3 event trigger is configured as follows:
# Create the Lambda function
aws lambda create-function \
--function-name timeseries-iceberg-ingest \
--runtime python3.12 \
--handler lambda_iceberg_ingest.handler \
--role arn:aws:iam::ACCOUNT_ID:role/LambdaIcebergIngestRole \
--zip-file fileb://lambda_package.zip \
--timeout 300 \
--memory-size 256
# Add S3 trigger permission
aws lambda add-permission \
--function-name timeseries-iceberg-ingest \
--statement-id s3-trigger \
--action lambda:InvokeFunction \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::my-timeseries-lakehouse
# Configure S3 bucket notification
aws s3api put-bucket-notification-configuration \
--bucket my-timeseries-lakehouse \
--notification-configuration '{
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:timeseries-iceberg-ingest",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": "landing-zone/"},
{"Name": "suffix", "Value": ".json"}
]
}
}
}
]
}'
Option D: Apache Spark on EMR
For the highest throughput and maximum flexibility, Spark is run directly on EMR with the Iceberg connector:
# emr_iceberg_job.py - Spark job for EMR
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("InfluxDB-to-Iceberg") \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
# Read new files from landing zone
df = spark.read.json("s3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/year=2026/")
# Transform and write to Iceberg
df_clean = df \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumnRenamed("tag_hostname", "hostname") \
.withColumnRenamed("tag_region", "region") \
.select("timestamp", "hostname", "region",
"cpu_idle_percent", "cpu_system_percent",
"cpu_user_percent", "cpu_total_usage_percent",
"pipeline_version", "source_system", "ingested_at")
# Append to Iceberg table
df_clean.writeTo("glue_catalog.timeseries_db.cpu_metrics").append()
# Run compaction to optimize file sizes
spark.sql("""
CALL glue_catalog.system.rewrite_data_files(
table => 'timeseries_db.cpu_metrics',
options => map('target-file-size-bytes', '134217728')
)
""")
spark.stop()
# Submit the EMR job
aws emr add-steps \
--cluster-id j-XXXXXXXXXXXXX \
--steps '[{
"Type": "Spark",
"Name": "Iceberg Ingestion",
"ActionOnFailure": "CONTINUE",
"Args": [
"--deploy-mode", "cluster",
"--conf", "spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0",
"--conf", "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"s3://my-timeseries-lakehouse/scripts/emr_iceberg_job.py"
]
}]'
Complete End-to-End telegraf.conf
A full, production-ready Telegraf configuration combining all preceding elements is presented below. Copying this file and updating the environment variables yields a working pipeline:
# =============================================================================
# TELEGRAF CONFIGURATION: InfluxDB → S3 Landing Zone (for Iceberg)
# =============================================================================
# This configuration reads time-series data from InfluxDB v2, transforms it
# into a flat columnar schema, and writes it to S3 with Hive-style partitioning
# for subsequent ingestion into Apache Iceberg tables.
# =============================================================================
# Global Agent Configuration
[agent]
## Collection interval - how often input plugins are gathered
interval = "1h"
## Flush interval - how often output plugins write
flush_interval = "5m"
## Jitter to prevent thundering herd
collection_jitter = "30s"
flush_jitter = "30s"
## Metric batch and buffer sizes
metric_batch_size = 10000
metric_buffer_limit = 100000
## Override default hostname
hostname = ""
omit_hostname = true
## Logging
debug = false
quiet = false
logfile = "/var/log/telegraf/telegraf-pipeline.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "100MB"
logfile_rotation_max_archives = 7
# =============================================================================
# INPUT: Read from InfluxDB v2 via Flux queries
# =============================================================================
[[inputs.influxdb_v2]]
urls = ["${INFLUXDB_URL}"]
token = "${INFLUXDB_TOKEN}"
organization = "${INFLUXDB_ORG}"
## CPU Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "cpu_metrics"
## Memory Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "memory_metrics"
## HTTP Request Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "http_request_metrics"
timeout = "60s"
# =============================================================================
# PROCESSORS: Transform data for Iceberg compatibility
# =============================================================================
# Step 1: Rename fields to clean, descriptive names
[[processors.rename]]
order = 1
[[processors.rename.replace]]
field = "usage_idle"
dest = "cpu_idle_percent"
[[processors.rename.replace]]
field = "usage_system"
dest = "cpu_system_percent"
[[processors.rename.replace]]
field = "usage_user"
dest = "cpu_user_percent"
[[processors.rename.replace]]
field = "used_percent"
dest = "memory_used_percent"
[[processors.rename.replace]]
tag = "host"
dest = "hostname"
# Step 2: Convert field types for schema consistency
[[processors.converter]]
order = 2
[processors.converter.fields]
float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent",
"memory_used_percent", "latency_ms"]
integer = ["available", "count"]
# Step 3: Extract date partitions from timestamp
[[processors.date]]
order = 3
tag_key = "partition_year"
date_format = "2006"
[[processors.date]]
order = 4
tag_key = "partition_month"
date_format = "01"
[[processors.date]]
order = 5
tag_key = "partition_day"
date_format = "02"
# Step 4: Custom transformations (compute derived fields, flatten tags)
[[processors.starlark]]
order = 6
source = '''
load("time", "time")
def apply(metric):
# Compute total CPU usage
if metric.name == "cpu_metrics":
idle = metric.fields.get("cpu_idle_percent", 0.0)
metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)
# Memory health flag
if metric.name == "memory_metrics":
used = metric.fields.get("memory_used_percent", 0.0)
metric.fields["memory_critical"] = used > 95.0
# Flatten all tags into fields for columnar storage
for key, value in metric.tags.items():
if not key.startswith("partition_"):
metric.fields["tag_" + key] = value
# Add metadata
metric.fields["measurement"] = metric.name
metric.fields["source_system"] = "influxdb"
metric.fields["pipeline_version"] = "1.0"
metric.fields["ingested_at"] = int(time.now().unix_nano / 1000000000)
return metric
'''
# =============================================================================
# OUTPUT: Write to S3 with Hive-style partitioning
# =============================================================================
[[outputs.s3]]
bucket = "${AWS_S3_BUCKET}"
s3_key_prefix = "landing-zone/measurement={{.Name}}/year={{.Tag \"partition_year\"}}/month={{.Tag \"partition_month\"}}/day={{.Tag \"partition_day\"}}/"
region = "${AWS_REGION}"
## Authentication (uses environment variables or instance role)
# access_key = "${AWS_ACCESS_KEY_ID}"
# secret_key = "${AWS_SECRET_ACCESS_KEY}"
data_format = "json"
json_timestamp_units = "1s"
## Batching
metric_batch_size = 10000
metric_buffer_limit = 100000
flush_interval = "5m"
flush_jitter = "30s"
use_batch_format = true
# =============================================================================
# MONITORING: Internal Telegraf metrics
# =============================================================================
[[inputs.internal]]
collect_memstats = true
name_prefix = "telegraf_pipeline_"
[[outputs.file]]
files = ["/var/log/telegraf/internal_metrics.json"]
data_format = "json"
namepass = ["telegraf_pipeline_*"]
rotation_interval = "24h"
rotation_max_archives = 7
The required environment variables are set as follows:
# /etc/default/telegraf or /etc/telegraf/telegraf.env
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=my-super-secret-token
INFLUXDB_ORG=my-org
INFLUXDB_BUCKET=metrics
AWS_S3_BUCKET=my-timeseries-lakehouse
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=secret...
The pipeline is started as follows:
# Test the configuration first
telegraf --config /etc/telegraf/telegraf-pipeline.conf --test
# Run in foreground for debugging
telegraf --config /etc/telegraf/telegraf-pipeline.conf
# Run as a service
sudo cp /etc/telegraf/telegraf-pipeline.conf /etc/telegraf/telegraf.conf
sudo systemctl restart telegraf
sudo systemctl status telegraf
sudo journalctl -u telegraf -f
Querying Iceberg Data with Athena
Once data are flowing into the Iceberg tables, they can be queried with standard SQL through Amazon Athena. Several practical queries for daily use are presented below.
Basic Analytical Queries
-- Average CPU usage per host over the last 24 hours
SELECT
hostname,
region,
AVG(cpu_total_usage_percent) as avg_cpu_usage,
MAX(cpu_total_usage_percent) as peak_cpu_usage,
MIN(cpu_idle_percent) as min_idle_percent,
COUNT(*) as data_points
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY hostname, region
ORDER BY avg_cpu_usage DESC;
-- Hourly aggregation for dashboarding
SELECT
date_trunc('hour', timestamp) as hour,
hostname,
AVG(cpu_total_usage_percent) as avg_cpu,
APPROX_PERCENTILE(cpu_total_usage_percent, 0.95) as p95_cpu,
APPROX_PERCENTILE(cpu_total_usage_percent, 0.99) as p99_cpu
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '7' day
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
-- Memory alerts: find hosts with high memory usage
SELECT
hostname,
region,
timestamp,
used_percent,
available / (1024*1024*1024) as available_gb
FROM timeseries_db.memory_metrics
WHERE used_percent > 90
AND timestamp >= current_timestamp - interval '1' hour
ORDER BY used_percent DESC;
Time Travel Queries
One of Iceberg’s principal features is time travel: querying the data as they existed at a previous point in time:
-- Query data as it existed yesterday at noon
SELECT *
FROM timeseries_db.cpu_metrics
FOR TIMESTAMP AS OF TIMESTAMP '2026-04-02 12:00:00'
WHERE hostname = 'server01';
-- Compare current data with data from a week ago
SELECT
current_data.hostname,
current_data.avg_cpu as current_avg_cpu,
historical.avg_cpu as week_ago_avg_cpu,
current_data.avg_cpu - historical.avg_cpu as cpu_change
FROM (
SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '1' day
GROUP BY hostname
) current_data
JOIN (
SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
FROM timeseries_db.cpu_metrics
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-27 00:00:00'
WHERE timestamp >= TIMESTAMP '2026-03-26' AND timestamp < TIMESTAMP '2026-03-27'
GROUP BY hostname
) historical ON current_data.hostname = historical.hostname;
-- View table snapshot history
SELECT * FROM timeseries_db.cpu_metrics$snapshots ORDER BY committed_at DESC LIMIT 10;
-- View manifest files
SELECT * FROM timeseries_db.cpu_metrics$manifests;
Joining with Other Data Sources
-- Join CPU metrics with a server inventory table
SELECT
c.hostname,
c.region,
s.instance_type,
s.team,
AVG(c.cpu_total_usage_percent) as avg_cpu,
s.monthly_cost
FROM timeseries_db.cpu_metrics c
JOIN timeseries_db.server_inventory s ON c.hostname = s.hostname
WHERE c.timestamp >= current_timestamp - interval '7' day
GROUP BY c.hostname, c.region, s.instance_type, s.team, s.monthly_cost
HAVING AVG(c.cpu_total_usage_percent) < 10 -- Underutilized servers
ORDER BY s.monthly_cost DESC;
Athena Cost Optimization Tips
SELECT * on large tables should be avoided.
- Use partition predicates:
WHERE timestamp >= ...triggers Iceberg partition pruning, scanning only the relevant Parquet files. - Select specific columns: Parquet is columnar, so
SELECT hostname, cpu_total_usage_percentreads far less data thanSELECT *. - Run compaction regularly: Small files degrade query performance and increase cost. Files should be kept between 128MB and 256MB.
- Use CTAS for frequent queries: Materialise expensive queries as new Iceberg tables.
Alternative Pipeline: InfluxDB to Telegraf to Kafka to Spark to Iceberg
Organisations requiring true streaming ingestion with exactly-once semantics should consider a Kafka-based pipeline. The architecture is as follows.
InfluxDB → Telegraf → Kafka Topic → Spark Structured Streaming → Iceberg Table
When to Use Kafka Rather Than S3-Based
- S3-based (this guide's main approach) is appropriate when batch processing is acceptable (minutes to hours), data volume is under 1TB per day, minimal infrastructure is desired, and cost is a priority.
- Kafka-based is appropriate when sub-minute latency is required, data volume exceeds 1TB per day, a Kafka cluster is already operational, and exactly-once delivery guarantees are needed.
Telegraf Kafka Output Configuration
# telegraf.conf - Output: Kafka
[[outputs.kafka]]
## Kafka broker addresses
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092", "kafka-broker-3:9092"]
## Topic for all metrics (or use topic_suffix for per-measurement topics)
topic = "influxdb-metrics"
## Use measurement name as topic suffix for separate topics
## Creates topics like: influxdb-metrics-cpu_metrics, influxdb-metrics-memory_metrics
# topic_suffix = {method = "measurement"}
## Compression
compression_codec = "snappy"
## Required acks: 0=none, 1=leader, -1=all replicas
required_acks = -1
## Max message size
max_message_bytes = 1048576
## Data format
data_format = "json"
json_timestamp_units = "1ms"
## SASL authentication (if Kafka requires it)
# sasl_mechanism = "SCRAM-SHA-512"
# sasl_username = "${KAFKA_USERNAME}"
# sasl_password = "${KAFKA_PASSWORD}"
## TLS
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
The Spark Structured Streaming consumer is shown below:
# spark_kafka_iceberg.py - Spark Structured Streaming from Kafka to Iceberg
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Kafka-to-Iceberg-Streaming") \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
# Define the schema matching our Telegraf JSON output
metrics_schema = StructType([
StructField("name", StringType()),
StructField("timestamp", LongType()),
StructField("tags", MapType(StringType(), StringType())),
StructField("fields", MapType(StringType(), DoubleType()))
])
# Read from Kafka
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker-1:9092") \
.option("subscribe", "influxdb-metrics") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON messages
df_parsed = df_kafka \
.select(from_json(col("value").cast("string"), metrics_schema).alias("data")) \
.select("data.*") \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumn("hostname", col("tags")["hostname"]) \
.withColumn("region", col("tags")["region"])
# Write to Iceberg using foreachBatch
def write_to_iceberg(batch_df, batch_id):
batch_df.writeTo("glue_catalog.timeseries_db.all_metrics") \
.option("merge-schema", "true") \
.append()
query = df_parsed.writeStream \
.foreachBatch(write_to_iceberg) \
.option("checkpointLocation", "s3://my-timeseries-lakehouse/checkpoints/kafka-iceberg/") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
Monitoring and Troubleshooting
A data pipeline is only as effective as its monitoring. The following describes how to maintain pipeline health.
Telegraf Internal Metrics
The inputs.internal plugin configured earlier provides important operational metrics:
# Check Telegraf metrics buffer status
cat /var/log/telegraf/internal_metrics.json | python3 -m json.tool | grep -E "metrics_gathered|metrics_written|buffer_size"
# Key metrics to monitor:
# - gather_errors: input plugin failures (InfluxDB connection issues)
# - metrics_gathered: total metrics collected per interval
# - metrics_written: total metrics sent to S3
# - buffer_size: current buffer usage (should stay well below buffer_limit)
# - write_errors: output plugin failures (S3 permission or network issues)
Common Issues and Resolutions
| Issue | Symptoms | Resolution |
|---|---|---|
| InfluxDB connection failure | gather_errors increasing, no new metrics |
Verify InfluxDB URL and token. Check network connectivity. Ensure InfluxDB is running. |
| S3 permission denied | write_errors increasing, AccessDenied in logs |
Check IAM policy. Verify AWS credentials. Ensure bucket policy allows PutObject. |
| Schema mismatch in Glue | Athena queries return NULL or fail | Re-run Glue Crawler. Check that JSON field names match table column names. Verify type conversions in Telegraf processors. |
| Glue Crawler fails | Crawler stuck in RUNNING or FAILED state | Check Glue Crawler IAM role. Verify S3 path is correct. Look for malformed JSON files in landing zone. |
| Data type conflicts | Fields showing as wrong type in Athena | Use processors.converter to enforce types in Telegraf. InfluxDB may return integers as floats or vice versa. |
| Buffer overflow | metrics_dropped count increasing |
Increase metric_buffer_limit. Reduce flush_interval. Check for S3 write latency issues. |
| Duplicate data in Iceberg | Row counts higher than expected | Implement idempotent ingestion with MERGE INTO instead of INSERT. Track processed files to avoid re-ingestion. |
| Too many small files | Athena queries slow and expensive | Increase Telegraf batch size. Run Iceberg compaction regularly. Target 128-256MB file sizes. |
Data Validation Queries
-- Check data freshness: how recent is the latest data?
SELECT
MAX(timestamp) as latest_data,
current_timestamp as current_time,
date_diff('minute', MAX(timestamp), current_timestamp) as minutes_behind
FROM timeseries_db.cpu_metrics;
-- Check for data gaps: are there any missing hours?
SELECT
date_trunc('hour', timestamp) as hour,
COUNT(*) as record_count
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY 1
ORDER BY 1;
-- Validate data quality: check for NULLs and outliers
SELECT
COUNT(*) as total_records,
COUNT(hostname) as non_null_hostname,
COUNT(cpu_total_usage_percent) as non_null_cpu,
MIN(cpu_total_usage_percent) as min_cpu,
MAX(cpu_total_usage_percent) as max_cpu,
COUNT(CASE WHEN cpu_total_usage_percent > 100 THEN 1 END) as invalid_cpu_over_100,
COUNT(CASE WHEN cpu_total_usage_percent < 0 THEN 1 END) as invalid_cpu_negative
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '1' hour;
Performance Optimisation
Establishing a functioning pipeline is one task; achieving good performance at scale is another. The key tuning parameters are discussed below.
Telegraf Buffer Tuning
The two most important Telegraf settings are metric_batch_size and metric_buffer_limit:
- metric_batch_size: the number of metrics sent to the output plugin at a time. Larger batches reduce S3 API calls but increase memory usage and latency.
- metric_buffer_limit: the maximum number of metrics held in memory. If the output is slow, metrics queue at this point; once the buffer is full, new metrics are dropped.
Recommended Settings by Data Volume
| Setting | Small (<10K metrics/min) | Medium (10K-100K/min) | Large (>100K/min) |
|---|---|---|---|
metric_batch_size |
5,000 | 10,000 | 50,000 |
metric_buffer_limit |
50,000 | 200,000 | 1,000,000 |
flush_interval |
10m | 5m | 1m |
collection_interval |
1h | 15m | 5m |
| Target S3 file size | 64-128 MB | 128-256 MB | 256-512 MB |
| Partition granularity | Day | Day | Hour |
| Telegraf RAM estimate | 128 MB | 512 MB | 2-4 GB |
| Compaction frequency | Daily | Every 6 hours | Every 1-2 hours |
Iceberg Compaction
Small files impair Iceberg performance. Compaction should be scheduled to merge them:
-- Run compaction via Athena (Athena v3 with Iceberg support)
OPTIMIZE timeseries_db.cpu_metrics REWRITE DATA USING BIN_PACK;
-- Or via Spark (more control over target file size)
-- In a Glue ETL job or EMR Spark session:
CALL glue_catalog.system.rewrite_data_files(
table => 'timeseries_db.cpu_metrics',
options => map(
'target-file-size-bytes', '134217728', -- 128MB
'min-file-size-bytes', '67108864', -- 64MB
'max-file-size-bytes', '268435456' -- 256MB
)
);
-- Expire old snapshots to reclaim storage
CALL glue_catalog.system.expire_snapshots(
table => 'timeseries_db.cpu_metrics',
older_than => TIMESTAMP '2026-03-01 00:00:00',
retain_last => 10
);
-- Remove orphan files
CALL glue_catalog.system.remove_orphan_files(
table => 'timeseries_db.cpu_metrics',
older_than => TIMESTAMP '2026-03-01 00:00:00'
);
Partitioning Best Practices for Time-Series Data
- Partition by day for most workloads. This produces a manageable number of partitions and files.
- Add a secondary partition on high-cardinality dimensions such as
measurementwhen specific measurements are queried frequently. - Avoid over-partitioning. Partitioning by minute produces millions of tiny files that destroy performance.
- Use Iceberg's hidden partitioning with
day(timestamp)rather than creating explicit partition columns. Queries ontimestampthen automatically trigger partition pruning without users needing to be aware of partitions. - Monitor partition sizes. If any partition contains fewer than ten files, or each file is under 10MB, the partitioning is too granular.
Cost Analysis
Concrete figures merit examination. The cost savings from moving time-series data from InfluxDB to Iceberg on S3 can be substantial, particularly at scale.
| Data Volume | InfluxDB Cloud (storage + queries) | S3 + Iceberg + Athena | Monthly Savings |
|---|---|---|---|
| 100 GB | ~$200/mo (storage) + ~$50/mo (queries) | ~$2.30 (S3) + ~$5 (Athena) + ~$10 (Glue) | ~$233/mo (93% savings) |
| 1 TB | ~$2,000/mo + ~$200/mo | ~$23 (S3) + ~$25 (Athena) + ~$20 (Glue) | ~$2,132/mo (97% savings) |
| 10 TB | ~$20,000/mo + ~$500/mo | ~$230 (S3) + ~$100 (Athena) + ~$50 (Glue) | ~$20,120/mo (98% savings) |
Additional costs to consider include the following:
- Telegraf compute: Runs on existing infrastructure. Minimal CPU and RAM are required for most workloads.
- S3 API costs: PUT requests at $0.005 per 1,000. With batching, this is typically under $10 per month.
- Glue Crawler: $0.44 per DPU-hour. A daily crawl typically costs $1 to $5 per month.
- Glue ETL: $0.44 per DPU-hour. A daily ten-minute job with two DPUs costs approximately $13 per month.
- Data transfer: Free within the same AWS region; cross-region transfer adds $0.02 per GB.
The break-even point is almost immediate. Even at 100GB, savings of more than $230 per month accrue from the move to S3 and Iceberg. The pipeline infrastructure (Telegraf, Glue) costs less than $30 per month for most workloads.
Concluding Remarks
Building a data pipeline from InfluxDB to Apache Iceberg through Telegraf is not only technically feasible but also a compelling architecture that addresses real problems. InfluxDB continues to perform its principal function—real-time monitoring and dashboards—while historical data are offloaded to a lakehouse that costs 90 to 98 per cent less and provides SQL analytics, ML pipelines, and proper data governance.
The architecture comprises the following elements:
- Telegraf input plugins that retrieve data from InfluxDB v1.x or v2.x using four methods, ranging from simple pull-based queries to real-time push-based listeners.
- Telegraf processors that transform InfluxDB's tag/field model into a flat columnar schema suitable for Iceberg, with type conversion, field renaming, computed fields, and date partitioning.
- S3 output with Hive-style partitioning that lands data in formats AWS Glue can discover and catalogue.
- Iceberg table creation via Athena DDL or Glue Crawlers, with appropriate partitioning for time-series workloads.
- Automated ingestion using Glue ETL jobs, Athena INSERT INTO, Lambda triggers, or Spark on EMR.
- A complete, production-ready telegraf.conf that can be deployed with minimal modification.
For organisations requiring real-time pattern detection on streaming data before it lands in the lakehouse, combining this pipeline with complex event processing using Apache Flink permits in-flight anomaly detection while still archiving all data to Iceberg. The principal merit of this architecture is its modularity. It is possible to begin simply—with JSON files on S3 and a Glue Crawler—and progress to Parquet with Spark streaming as requirements grow. Telegraf's plugin architecture permits the substitution of inputs and outputs without rewriting transformation logic, and Iceberg's partition evolution permits changes to partitioning strategy without rewriting any historical data.
For organisations with terabytes of time-series data in InfluxDB and rising storage bills, this pipeline provides a viable migration path. It can be set up over a weekend, validated with a week of dual-writing, and then used as the basis for reducing InfluxDB retention policies.
References
- Telegraf Documentation — Official Telegraf plugin documentation and configuration guide
- InfluxDB v2 Documentation — Flux query language and API reference
- Apache Iceberg Documentation — Table format specification and engine integrations
- Amazon Athena Iceberg Integration — Creating and querying Iceberg tables with Athena
- AWS Glue Iceberg Support — Using Iceberg with Glue ETL jobs
- Telegraf Plugin Directory — Complete list of input, processor, and output plugins
- Amazon S3 Documentation — Storage classes, pricing, and lifecycle policies
- Iceberg Spark Integration — Reading and writing Iceberg tables with Apache Spark