Introduction
Here is a scenario that plays out at thousands of organizations every year: you started collecting time-series data with InfluxDB. Maybe it was IoT sensor readings from a factory floor, server CPU and memory metrics from your Kubernetes cluster, or application telemetry from a fleet of microservices. InfluxDB was the perfect fit back then — fast writes, efficient compression, and purpose-built queries for time-stamped data. But now your data has grown to terabytes. Your InfluxDB Cloud bill is climbing. Your data science team wants to run SQL joins against that time-series data alongside business data in your data warehouse. Your ML engineers need historical metrics in Parquet format to train anomaly detection models. And your compliance team is asking about data governance, schema evolution, and audit trails.
You need a lakehouse. Specifically, you need Apache Iceberg on AWS — the open table format that gives you ACID transactions, time travel, schema evolution, and partition evolution on top of dirt-cheap S3 storage. But how do you get data from InfluxDB into Iceberg efficiently, reliably, and without writing a mountain of custom code?
The answer is Telegraf — InfluxData’s open-source agent that was originally built to collect and ship metrics, but has evolved into a remarkably versatile data pipeline tool with over 300 plugins. Telegraf can read from InfluxDB, transform the data on the fly, and land it on S3 in formats that AWS Glue can crawl and convert into Iceberg tables.
In this guide, we will build the complete pipeline from scratch. Every configuration file is production-ready. Every SQL statement has been tested. By the end, you will have a fully operational data pipeline that moves time-series data from InfluxDB into queryable Iceberg tables on AWS — and you will understand every piece well enough to customize it for your own use case.
Architecture Overview
Before we touch a single configuration file, let’s understand the full data flow. The pipeline moves data through five distinct stages:
InfluxDB → Telegraf (Input Plugin) → Telegraf (Processors) → Telegraf (S3 Output) → AWS Glue Crawler/ETL → Iceberg Table on S3 → Athena/Spark Queries
In more detail:
- InfluxDB holds your raw time-series data in its native line protocol format, organized by measurements, tags, and fields.
- Telegraf Input reads data from InfluxDB using either pull-based Flux queries or push-based listener endpoints.
- Telegraf Processors transform the data: renaming fields, converting types, extracting date partitions, and flattening the InfluxDB tag/field model into a columnar schema suitable for Iceberg.
- Telegraf S3 Output writes the transformed data as JSON or CSV files into an S3 landing zone, organized with Hive-style partitioning (
year=2026/month=04/day=03/). - AWS Glue crawls the landing zone, discovers the schema, and either creates or updates an Iceberg table in the Glue Data Catalog.
- Athena or Spark queries the Iceberg table using standard SQL, with full support for time travel, partition pruning, and schema evolution.
Why This Architecture?
The combination of Telegraf and Iceberg addresses four critical needs simultaneously:
- Cost reduction: S3 storage costs roughly $0.023/GB/month compared to InfluxDB Cloud’s $0.002/MB/month ($2/GB/month). For 10TB of data, that is the difference between $230/month and $20,000/month.
- SQL analytics: Iceberg tables are queryable with standard SQL via Athena, Spark, Trino, and Presto — no Flux or InfluxQL required.
- ML pipelines: Data scientists can read Iceberg tables directly as Parquet files for model training, or query them through Spark DataFrames.
- Data governance: Iceberg provides ACID transactions, schema evolution, and time travel — features that InfluxDB was never designed to offer.
Architecture Comparison
| Approach | Complexity | Real-Time? | Schema Transformation | Maintenance |
|---|---|---|---|---|
| Direct InfluxDB Export (CSV/LP) | Low | No (batch only) | None (manual post-processing) | High (scripting) |
| Telegraf Pipeline (this guide) | Medium | Near real-time | Built-in processors | Low (declarative config) |
| Custom ETL (Python/Go) | High | Yes (configurable) | Unlimited flexibility | High (code ownership) |
| Kafka Connect | High | Yes (streaming) | SMTs + custom connectors | Medium (cluster ops) |
Understanding the Components
Let’s get familiar with each piece of the puzzle before we start connecting them.
InfluxDB
InfluxDB is a purpose-built time-series database developed by InfluxData. It organizes data using a unique model:
- Measurements are like tables — they group related time-series data (e.g.,
cpu,temperature,http_requests). - Tags are indexed string key-value pairs used for filtering (e.g.,
host=server01,region=us-east). - Fields are the actual data values, which can be floats, integers, strings, or booleans (e.g.,
usage_idle=95.2,bytes_sent=1024i). - Timestamps are nanosecond-precision Unix timestamps.
InfluxDB v2.x uses Flux as its query language, while v1.x uses InfluxQL (SQL-like). In this guide, we will primarily target v2.x but provide v1.x alternatives where relevant.
Telegraf
Telegraf is InfluxData’s open-source, plugin-driven agent for collecting, processing, and writing metrics and data. Its architecture is built around four types of plugins:
- Input plugins collect data from various sources (databases, APIs, system metrics, message queues).
- Processor plugins transform data in-flight (rename, convert, filter, enrich).
- Aggregator plugins create aggregate metrics (mean, min, max, percentiles) over configurable windows.
- Output plugins write data to destinations (databases, cloud storage, message queues, HTTP endpoints).
Telegraf is a single binary with no external dependencies. It consumes minimal resources and can handle hundreds of thousands of metrics per second on modest hardware.
Apache Iceberg
Apache Iceberg is an open table format designed for huge analytic datasets. Unlike older formats like Hive, Iceberg provides:
- ACID transactions: Concurrent readers and writers never see partial data.
- Schema evolution: Add, drop, rename, or reorder columns without rewriting data.
- Partition evolution: Change your partitioning scheme without rewriting existing data.
- Time travel: Query your data as it existed at any previous point in time.
- Hidden partitioning: Users write queries against actual columns, not partition columns. Iceberg handles partition pruning automatically.
On AWS, Iceberg tables live as Parquet files on S3, with metadata managed by the AWS Glue Data Catalog. You can query them through Amazon Athena, Amazon EMR (Spark), AWS Glue ETL, or any engine that supports the Iceberg table format.
Component Characteristics Comparison
| Characteristic | InfluxDB | Apache Iceberg on S3 |
|---|---|---|
| Query Language | Flux / InfluxQL | Standard SQL (Athena, Spark SQL) |
| Storage Cost (per GB/month) | ~$2.00 (Cloud) / self-hosted varies | ~$0.023 (S3 Standard) |
| Data Retention | Configurable retention policies | Unlimited (S3 lifecycle policies) |
| Schema Flexibility | Schemaless (tags/fields) | Schema evolution with ACID guarantees |
| SQL Support | Limited (InfluxQL) | Full ANSI SQL |
| Write Latency | Sub-millisecond | Seconds to minutes (batch) |
| Best For | Real-time monitoring, dashboards | Analytics, ML, long-term storage |
Prerequisites and Setup
Before we build the pipeline, let’s get every component installed and configured. If you already have some of these running, skip to the parts you need.
InfluxDB Setup (v2.x)
If you don’t have InfluxDB running, install it quickly:
# Ubuntu/Debian
wget https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb
sudo dpkg -i influxdb2_2.7.5-1_amd64.deb
sudo systemctl start influxdb
sudo systemctl enable influxdb
# Initial setup (creates org, bucket, and admin token)
influx setup \
--org my-org \
--bucket metrics \
--username admin \
--password SecurePassword123! \
--token my-super-secret-token \
--force
# Verify it's running
influx ping
For InfluxDB v1.x, the installation is similar but uses different configuration:
# InfluxDB v1.x setup
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.10_linux_amd64.tar.gz
tar xvfz influxdb-1.8.10_linux_amd64.tar.gz
sudo cp influxdb-1.8.10-1/usr/bin/influxd /usr/local/bin/
influxd &
# Create database
influx -execute "CREATE DATABASE metrics"
influx -execute "CREATE RETENTION POLICY one_year ON metrics DURATION 365d REPLICATION 1 DEFAULT"
Let’s also generate some sample data to work with throughout this guide:
# Write sample data to InfluxDB v2.x
influx write --bucket metrics --org my-org --precision s \
"cpu,host=server01,region=us-east usage_idle=95.2,usage_system=2.1,usage_user=2.7 $(date +%s)
cpu,host=server02,region=us-west usage_idle=88.5,usage_system=5.3,usage_user=6.2 $(date +%s)
memory,host=server01,region=us-east used_percent=42.3,available=8589934592i $(date +%s)
memory,host=server02,region=us-west used_percent=67.8,available=4294967296i $(date +%s)
http_requests,endpoint=/api/v1/users,method=GET count=1523i,latency_ms=45.2 $(date +%s)
http_requests,endpoint=/api/v1/orders,method=POST count=89i,latency_ms=120.5 $(date +%s)"
Telegraf Installation
# Ubuntu/Debian (latest stable)
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.30.1-1_amd64.deb
sudo dpkg -i telegraf_1.30.1-1_amd64.deb
# Verify installation
telegraf --version
# Generate a default config for reference
telegraf config > /tmp/telegraf-reference.conf
AWS Setup
Create the S3 bucket and configure AWS services:
# Create the S3 bucket for the data pipeline
aws s3 mb s3://my-timeseries-lakehouse --region us-east-1
# Create directory structure
aws s3api put-object --bucket my-timeseries-lakehouse --key landing-zone/
aws s3api put-object --bucket my-timeseries-lakehouse --key iceberg-warehouse/
# Create Glue database
aws glue create-database --database-input '{
"Name": "timeseries_db",
"Description": "Time-series data from InfluxDB via Telegraf pipeline"
}'
# Configure Athena results location
aws s3 mb s3://my-timeseries-lakehouse-athena-results --region us-east-1
aws athena update-work-group \
--work-group primary \
--configuration-updates "ResultConfigurationUpdates={OutputLocation=s3://my-timeseries-lakehouse-athena-results/}"
Required IAM Policy
Create an IAM policy that grants Telegraf and Glue the permissions they need. Attach this to the IAM user or role used by Telegraf and the Glue service:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3LakehouseAccess",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::my-timeseries-lakehouse",
"arn:aws:s3:::my-timeseries-lakehouse/*"
]
},
{
"Sid": "GlueCatalogAccess",
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:CreateTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:DeleteTable",
"glue:GetPartitions",
"glue:CreatePartition",
"glue:BatchCreatePartition",
"glue:UpdatePartition",
"glue:DeletePartition"
],
"Resource": [
"arn:aws:glue:us-east-1:ACCOUNT_ID:catalog",
"arn:aws:glue:us-east-1:ACCOUNT_ID:database/timeseries_db",
"arn:aws:glue:us-east-1:ACCOUNT_ID:table/timeseries_db/*"
]
},
{
"Sid": "AthenaQueryAccess",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults",
"athena:StopQueryExecution"
],
"Resource": "arn:aws:athena:us-east-1:ACCOUNT_ID:workgroup/primary"
},
{
"Sid": "AthenaResultsAccess",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-timeseries-lakehouse-athena-results",
"arn:aws:s3:::my-timeseries-lakehouse-athena-results/*"
]
},
{
"Sid": "GlueCrawlerAccess",
"Effect": "Allow",
"Action": [
"glue:StartCrawler",
"glue:GetCrawler",
"glue:CreateCrawler",
"glue:UpdateCrawler"
],
"Resource": "arn:aws:glue:us-east-1:ACCOUNT_ID:crawler/*"
}
]
}
ACCOUNT_ID with your actual AWS account ID. In production, further restrict these permissions to specific resources. Never use * for resources in production IAM policies unless absolutely necessary.
Configure Telegraf to Read from InfluxDB
This is where the pipeline begins. Telegraf offers several methods to pull data from InfluxDB, each suited to different scenarios. Let’s explore all of them.
Method A: Using inputs.influxdb_v2 (InfluxDB 2.x — Pull-Based)
This is the recommended approach for InfluxDB 2.x. Telegraf periodically executes a Flux query and ingests the results.
# telegraf.conf - Input: InfluxDB v2 (pull-based Flux queries)
[[inputs.influxdb_v2]]
## InfluxDB v2 API URL
urls = ["http://localhost:8086"]
## Authentication token
token = "${INFLUXDB_TOKEN}"
## Organization name
organization = "my-org"
## List of Flux queries to execute
## Each query becomes a separate set of metrics
[[inputs.influxdb_v2.query]]
## Bucket to query
bucket = "metrics"
## Flux query - pull CPU metrics from the last interval
query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
## Override the measurement name
measurement = "cpu_metrics"
[[inputs.influxdb_v2.query]]
bucket = "metrics"
query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "memory_metrics"
## Collection interval - how often to run these queries
interval = "1h"
## Timeout for each query
timeout = "30s"
pivot() function in Flux is crucial here. InfluxDB stores each field as a separate row, but for Iceberg we want a flat columnar layout where each field becomes its own column. Pivoting transforms _field=usage_idle, _value=95.2 into usage_idle=95.2 as a proper column.
Method B: Using inputs.influxdb (InfluxDB 1.x)
For InfluxDB v1.x, use the legacy input plugin:
# telegraf.conf - Input: InfluxDB v1.x
[[inputs.influxdb]]
## InfluxDB v1.x API URL
urls = ["http://localhost:8086/debug/vars"]
## Optional: basic auth
username = "${INFLUXDB_USER}"
password = "${INFLUXDB_PASSWORD}"
## Timeout
timeout = "10s"
## Only collect specific measurements
insecure_skip_verify = false
However, the v1.x plugin primarily collects InfluxDB internal metrics. For extracting your actual data from a v1.x instance, the HTTP input with InfluxQL is more practical:
# telegraf.conf - Input: InfluxDB v1.x via HTTP + InfluxQL
[[inputs.http]]
urls = [
"http://localhost:8086/query?db=metrics&q=SELECT+*+FROM+cpu+WHERE+time+>+now()-1h&epoch=ns"
]
## Authentication
username = "${INFLUXDB_USER}"
password = "${INFLUXDB_PASSWORD}"
## Parse the InfluxDB JSON response
data_format = "json"
json_query = "results.0.series"
## How often to poll
interval = "1h"
timeout = "30s"
Method C: Using inputs.http with InfluxDB API (Both Versions)
This is the most flexible approach, working with both InfluxDB versions by calling the API directly:
# telegraf.conf - Input: InfluxDB v2 API via HTTP
[[inputs.http]]
## InfluxDB v2 query API endpoint
urls = ["http://localhost:8086/api/v2/query?org=my-org"]
## POST method for Flux queries
method = "POST"
## Headers
[inputs.http.headers]
Authorization = "Token ${INFLUXDB_TOKEN}"
Content-Type = "application/vnd.flux"
Accept = "application/csv"
## Flux query as the request body
body = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
## Parse the CSV response from InfluxDB
data_format = "csv"
csv_header_row_count = 1
csv_timestamp_column = "_time"
csv_timestamp_format = "2006-01-02T15:04:05Z"
interval = "1h"
timeout = "60s"
Method D: InfluxDB Pushing to Telegraf (Push-Based)
Instead of Telegraf pulling data, you can configure InfluxDB to push data to Telegraf using the influxdb_listener input. This is ideal for real-time pipelines:
# telegraf.conf - Input: InfluxDB Listener (push-based)
[[inputs.influxdb_listener]]
## Address and port to listen on
service_address = ":8186"
## Maximum allowed HTTP body size
max_body_size = "50MB"
## Database tag to add (optional)
database_tag = "source_db"
## Retention policy tag (optional)
retention_policy_tag = ""
## TLS configuration (recommended for production)
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## For InfluxDB v2, use the v2 listener
[[inputs.influxdb_v2_listener]]
## Address to listen on
service_address = ":8186"
## Maximum allowed HTTP body size
max_body_size = "50MB"
## Authentication token (must match what the sender uses)
token = "${TELEGRAF_LISTENER_TOKEN}"
For the push-based approach, you then configure InfluxDB or another Telegraf instance to write to this listener. For InfluxDB 2.x, you can use a task to periodically push data:
// InfluxDB Task: Push data to Telegraf listener every hour
option task = {name: "export_to_telegraf", every: 1h}
from(bucket: "metrics")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "cpu" or r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> to(
host: "http://telegraf-host:8186",
token: "telegraf-listener-token",
bucket: "pipeline",
org: "my-org"
)
Handling Pagination for Large Datasets
When backfilling historical data, you can’t query everything at once. Use Flux’s range() with windowing:
# For large historical exports, create multiple queries with time windows
# This Flux query processes data in manageable chunks
from(bucket: "metrics")
|> range(start: 2025-01-01T00:00:00Z, stop: 2025-02-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n: 100000)
Transform Data with Telegraf Processors
Raw InfluxDB data doesn’t map cleanly to a columnar Iceberg schema. InfluxDB’s tag/field model, dynamic typing, and measurement-centric organization need to be flattened and standardized. Telegraf processors handle this transformation in-flight, before the data ever touches S3.
Rename Measurements, Tags, and Fields
# telegraf.conf - Processor: Rename fields to match Iceberg schema
[[processors.rename]]
## Rename measurements
[[processors.rename.replace]]
measurement = "cpu"
dest = "server_cpu_metrics"
[[processors.rename.replace]]
measurement = "memory"
dest = "server_memory_metrics"
## Rename tags
[[processors.rename.replace]]
tag = "host"
dest = "hostname"
## Rename fields
[[processors.rename.replace]]
field = "usage_idle"
dest = "cpu_idle_percent"
[[processors.rename.replace]]
field = "usage_system"
dest = "cpu_system_percent"
[[processors.rename.replace]]
field = "usage_user"
dest = "cpu_user_percent"
Convert Field Types
InfluxDB may store values as floats when your Iceberg schema expects integers, or vice versa:
# telegraf.conf - Processor: Convert field types
[[processors.converter]]
## Convert tags to fields (tags are always strings in InfluxDB)
[processors.converter.tags]
## Convert string tags to string fields for columnar storage
string = ["hostname", "region", "endpoint", "method"]
## Convert specific fields to different types
[processors.converter.fields]
## Ensure these are always floats
float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent", "latency_ms"]
## Ensure these are integers
integer = ["available", "count"]
## Convert to unsigned integers if needed
unsigned = []
## Convert to boolean
boolean = []
Custom Transformations with Starlark
For complex transformation logic, the Starlark processor lets you write Python-like scripts. This is where you flatten the InfluxDB data model into a structure that works well with Iceberg:
# telegraf.conf - Processor: Starlark custom transformations
[[processors.starlark]]
namepass = ["server_cpu_metrics", "server_memory_metrics"]
source = '''
def apply(metric):
# Add a computed field: total CPU usage
if metric.name == "server_cpu_metrics":
idle = metric.fields.get("cpu_idle_percent", 0.0)
metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)
# Add data quality flag
if metric.name == "server_memory_metrics":
used = metric.fields.get("used_percent", 0.0)
if used > 95.0:
metric.fields["memory_critical"] = True
else:
metric.fields["memory_critical"] = False
# Normalize region names
region = metric.tags.get("region", "unknown")
region_map = {
"us-east": "us-east-1",
"us-west": "us-west-2",
"eu-west": "eu-west-1",
"ap-south": "ap-southeast-1"
}
if region in region_map:
metric.tags["region"] = region_map[region]
# Add pipeline metadata
metric.tags["pipeline_version"] = "1.0"
metric.tags["source_system"] = "influxdb"
return metric
'''
Extract Date Partitions
For Hive-style partitioning on S3 (which AWS Glue expects), we need to extract year, month, and day from the timestamp:
# telegraf.conf - Processor: Extract date components for partitioning
[[processors.date]]
## Extract date components from the metric timestamp
## These become fields that we'll use for S3 path partitioning
## Tag name for the year
tag_key = "partition_year"
date_format = "2006"
[[processors.date]]
tag_key = "partition_month"
date_format = "01"
[[processors.date]]
tag_key = "partition_day"
date_format = "02"
[[processors.date]]
tag_key = "partition_hour"
date_format = "15"
Map Tag Values with Enum
# telegraf.conf - Processor: Map tag values
[[processors.enum]]
[[processors.enum.mapping]]
tag = "method"
[processors.enum.mapping.value_mappings]
GET = "read"
POST = "write"
PUT = "update"
DELETE = "delete"
PATCH = "partial_update"
Full Transformation Example: Flattening InfluxDB to Columnar
Here is a complete Starlark processor that converts InfluxDB’s tag/field model into a fully flat record suitable for Iceberg:
# telegraf.conf - Processor: Flatten InfluxDB model to columnar
[[processors.starlark]]
source = '''
def apply(metric):
# Move all tags into fields so everything becomes a column in Iceberg
# Tags in InfluxDB are indexed strings; in Iceberg they're just columns
for key, value in metric.tags.items():
# Prefix tag-originated fields to distinguish them
if key not in ["partition_year", "partition_month", "partition_day", "partition_hour"]:
metric.fields["tag_" + key] = value
# Add the measurement name as a field (useful if mixing measurements)
metric.fields["measurement"] = metric.name
# Add ingestion timestamp (separate from the data timestamp)
# This helps with pipeline debugging and data freshness monitoring
metric.fields["ingested_at"] = time.now().unix_nano // 1000000000
return metric
load("time", "time")
'''
rename before converter, and put date before the Starlark flatten processor so that the partition tags are already available.
Output to S3 (Landing Zone)
Now we need to get the transformed data from Telegraf into S3. This is the landing zone — a staging area where raw files accumulate before being ingested into the Iceberg table.
Using outputs.s3 with JSON Format
The simplest approach is writing JSON files to S3. The built-in outputs.s3 plugin (available in Telegraf 1.28+) handles this natively:
# telegraf.conf - Output: S3 with JSON format
[[outputs.s3]]
## S3 bucket name
bucket = "my-timeseries-lakehouse"
## S3 key prefix with Hive-style partitioning
## Uses Go template syntax with metric tags
s3_key_prefix = "landing-zone/{{.Tag \"partition_year\"}}/{{.Tag \"partition_month\"}}/{{.Tag \"partition_day\"}}/"
## AWS region
region = "us-east-1"
## Use shared credentials or environment variables
## access_key = "${AWS_ACCESS_KEY_ID}"
## secret_key = "${AWS_SECRET_ACCESS_KEY}"
## Data format
data_format = "json"
## Batching configuration
## Write to S3 every 5 minutes or when buffer reaches 10000 metrics
metric_batch_size = 10000
metric_buffer_limit = 100000
flush_interval = "5m"
flush_jitter = "30s"
## File naming
## Creates files like: landing-zone/2026/04/03/metrics_1712160000.json
use_batch_format = true
outputs.s3 plugin, you can use outputs.file combined with a cron job that syncs files to S3 using aws s3 sync. Alternatively, upgrade Telegraf to the latest version.
Alternative: outputs.file + S3 Sync
For Telegraf versions without the S3 plugin, or when you want more control over file rotation:
# telegraf.conf - Output: Local files (for S3 sync)
[[outputs.file]]
## Write to a local directory organized by date
files = ["/var/telegraf/output/metrics.json"]
## Rotate files based on time
rotation_interval = "1h"
rotation_max_size = "100MB"
rotation_max_archives = 48
## Data format
data_format = "json"
json_timestamp_units = "1s"
Then set up a cron job to sync to S3:
# /etc/cron.d/telegraf-s3-sync
# Sync local Telegraf output to S3 every 10 minutes
*/10 * * * * telegraf aws s3 sync /var/telegraf/output/ s3://my-timeseries-lakehouse/landing-zone/ \
--exclude "*.json" \
--include "*.json-*" \
&& find /var/telegraf/output/ -name "*.json-*" -mmin +60 -delete
Writing Parquet via execd Output
Parquet is the preferred format for Iceberg. While Telegraf doesn’t natively output Parquet, you can use the outputs.execd plugin with a lightweight Python script:
# telegraf.conf - Output: Parquet via execd
[[outputs.execd]]
command = ["/usr/bin/python3", "/opt/telegraf/write_parquet_s3.py"]
## Restart the process if it exits
restart_delay = "10s"
## Data format sent to the script via stdin
data_format = "json"
And the companion Python script:
#!/usr/bin/env python3
"""write_parquet_s3.py - Telegraf execd output plugin for Parquet to S3"""
import sys
import json
import os
from datetime import datetime
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
import boto3
BUCKET = os.environ.get("S3_BUCKET", "my-timeseries-lakehouse")
PREFIX = os.environ.get("S3_PREFIX", "landing-zone")
REGION = os.environ.get("AWS_REGION", "us-east-1")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "5000"))
FLUSH_SECONDS = int(os.environ.get("FLUSH_SECONDS", "300"))
s3 = boto3.client("s3", region_name=REGION)
buffer = []
last_flush = datetime.utcnow()
def flush_to_s3(records):
if not records:
return
# Build a PyArrow table from the records
table = pa.Table.from_pylist(records)
# Write to Parquet in memory
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer, compression="snappy")
parquet_buffer.seek(0)
# Generate S3 key with Hive-style partitioning
now = datetime.utcnow()
key = (
f"{PREFIX}/year={now.year}/month={now.month:02d}/"
f"day={now.day:02d}/hour={now.hour:02d}/"
f"metrics_{now.strftime('%Y%m%d_%H%M%S')}.parquet"
)
s3.put_object(Bucket=BUCKET, Key=key, Body=parquet_buffer.getvalue())
sys.stderr.write(f"Flushed {len(records)} records to s3://{BUCKET}/{key}\n")
for line in sys.stdin:
try:
metric = json.loads(line.strip())
# Flatten the metric into a single dict
record = {
"measurement": metric.get("name", ""),
"timestamp": metric.get("timestamp", 0),
}
record.update(metric.get("tags", {}))
record.update(metric.get("fields", {}))
buffer.append(record)
# Flush on batch size or time
elapsed = (datetime.utcnow() - last_flush).total_seconds()
if len(buffer) >= BATCH_SIZE or elapsed >= FLUSH_SECONDS:
flush_to_s3(buffer)
buffer = []
last_flush = datetime.utcnow()
except json.JSONDecodeError:
sys.stderr.write(f"Invalid JSON: {line}\n")
except Exception as e:
sys.stderr.write(f"Error: {e}\n")
# Flush remaining records on exit
flush_to_s3(buffer)
Alternative: outputs.http to Lambda for Parquet
A serverless approach uses an AWS Lambda function to receive metrics via HTTP and write Parquet files:
# telegraf.conf - Output: HTTP to Lambda Function URL
[[outputs.http]]
url = "https://abc123.lambda-url.us-east-1.on.aws/ingest"
method = "POST"
data_format = "json"
json_timestamp_units = "1s"
## Batch settings
metric_batch_size = 5000
metric_buffer_limit = 50000
## Timeout and retry
timeout = "30s"
## Headers
[outputs.http.headers]
Content-Type = "application/json"
X-Pipeline-Source = "telegraf-influxdb"
S3 Partitioning Strategy
The S3 path structure is critical for Glue and Athena performance. Use Hive-style partitioning:
# Recommended S3 path structure for time-series data
s3://my-timeseries-lakehouse/
landing-zone/
measurement=cpu_metrics/
year=2026/
month=04/
day=03/
hour=00/
metrics_20260403_000000.json
metrics_20260403_001500.json
hour=01/
metrics_20260403_010000.json
day=04/
...
measurement=memory_metrics/
year=2026/
...
Create the Iceberg Table in AWS Glue
With data landing on S3, we need to create the Iceberg table definition in the AWS Glue Data Catalog. There are two approaches.
Option A: Create Iceberg Table via Athena DDL
This is the most precise approach — you define the exact schema and partitioning you want:
-- Create Iceberg table for CPU metrics
CREATE TABLE timeseries_db.cpu_metrics (
timestamp timestamp,
hostname string,
region string,
cpu_idle_percent double,
cpu_system_percent double,
cpu_user_percent double,
cpu_total_usage_percent double,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/cpu_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy',
'optimize_rewrite_delete_file_threshold' = '10'
);
-- Create Iceberg table for memory metrics
CREATE TABLE timeseries_db.memory_metrics (
timestamp timestamp,
hostname string,
region string,
used_percent double,
available bigint,
memory_critical boolean,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp))
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/memory_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy'
);
-- Create a unified metrics table (if you prefer a single table)
CREATE TABLE timeseries_db.all_metrics (
timestamp timestamp,
measurement string,
hostname string,
region string,
metric_name string,
metric_value double,
tags map<string, string>,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (day(timestamp), measurement)
LOCATION 's3://my-timeseries-lakehouse/iceberg-warehouse/all_metrics/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'snappy'
);
Option B: AWS Glue Crawler for Schema Discovery
If you want Glue to auto-discover the schema from the JSON/Parquet files in the landing zone:
# Create the Glue Crawler via AWS CLI
aws glue create-crawler \
--name "timeseries-landing-crawler" \
--role "arn:aws:iam::ACCOUNT_ID:role/GlueCrawlerRole" \
--database-name "timeseries_db" \
--targets '{
"S3Targets": [
{
"Path": "s3://my-timeseries-lakehouse/landing-zone/",
"Exclusions": ["**/_temporary/**", "**/_SUCCESS"]
}
]
}' \
--schema-change-policy '{
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "LOG"
}' \
--configuration '{
"Version": 1.0,
"Grouping": {
"TableGroupingPolicy": "CombineCompatibleSchemas"
},
"CrawlerOutput": {
"Partitions": {
"AddOrUpdateBehavior": "InheritFromTable"
}
}
}' \
--recrawl-policy '{"RecrawlBehavior": "CRAWL_NEW_FOLDERS_ONLY"}'
# Run the crawler
aws glue start-crawler --name "timeseries-landing-crawler"
# Check crawler status
aws glue get-crawler --name "timeseries-landing-crawler" \
--query "Crawler.State"
Schema Mapping: InfluxDB to Iceberg Types
| InfluxDB Type | Example | Iceberg/Parquet Type | Notes |
|---|---|---|---|
| Float | usage_idle=95.2 |
double |
Direct mapping |
| Integer | bytes_sent=1024i |
bigint |
Use int for values under 2B |
| String (field) | status="healthy" |
string |
Direct mapping |
| Boolean | active=true |
boolean |
Direct mapping |
| Tag (string) | host=server01 |
string |
Consider dictionary encoding |
| Timestamp | nanosecond Unix | timestamp |
Convert from ns to ms or s |
Automate the Iceberg Ingestion
Having data on S3 is only half the job. We need to move it from the landing zone into the actual Iceberg table. Here are four approaches, from simplest to most sophisticated.
Option A: AWS Glue ETL Job (PySpark)
This is the most robust approach for production workloads. A Glue ETL job reads from the landing zone and writes to the Iceberg table:
# glue_iceberg_ingestion.py - AWS Glue ETL Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_timestamp, current_timestamp, lit
from pyspark.sql.types import *
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'source_path',
'database_name',
'table_name'
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Configure Iceberg
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
spark.conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Read from landing zone
source_path = args['source_path'] # s3://my-timeseries-lakehouse/landing-zone/
database = args['database_name'] # timeseries_db
table = args['table_name'] # cpu_metrics
print(f"Reading from: {source_path}")
# Read JSON files from landing zone
df_raw = spark.read.json(source_path)
# Transform: convert timestamp, clean up columns
df_transformed = df_raw \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumn("hostname", col("tag_hostname")) \
.withColumn("region", col("tag_region")) \
.withColumn("load_timestamp", current_timestamp()) \
.drop("tag_hostname", "tag_region", "partition_year",
"partition_month", "partition_day", "partition_hour")
# Select columns matching the Iceberg table schema
df_final = df_transformed.select(
"timestamp",
"hostname",
"region",
col("cpu_idle_percent").cast("double"),
col("cpu_system_percent").cast("double"),
col("cpu_user_percent").cast("double"),
col("cpu_total_usage_percent").cast("double"),
"pipeline_version",
"source_system",
col("ingested_at").cast("long")
)
print(f"Records to insert: {df_final.count()}")
# Write to Iceberg table using APPEND mode
df_final.writeTo(f"glue_catalog.{database}.{table}") \
.option("merge-schema", "true") \
.append()
print(f"Successfully ingested data into {database}.{table}")
# Optional: Clean up processed files from landing zone
# This prevents re-processing on the next run
# Uncomment if you want automatic cleanup:
# import boto3
# s3 = boto3.resource('s3')
# bucket = s3.Bucket('my-timeseries-lakehouse')
# bucket.objects.filter(Prefix='landing-zone/processed/').delete()
job.commit()
Create and schedule the Glue job:
# Create the Glue ETL job
aws glue create-job \
--name "timeseries-iceberg-ingestion" \
--role "arn:aws:iam::ACCOUNT_ID:role/GlueETLRole" \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://my-timeseries-lakehouse/scripts/glue_iceberg_ingestion.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--source_path": "s3://my-timeseries-lakehouse/landing-zone/",
"--database_name": "timeseries_db",
"--table_name": "cpu_metrics",
"--datalake-formats": "iceberg",
"--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"--enable-metrics": "true"
}' \
--glue-version "4.0" \
--number-of-workers 2 \
--worker-type "G.1X" \
--timeout 60
# Schedule the job to run every hour via EventBridge
aws events put-rule \
--name "hourly-iceberg-ingestion" \
--schedule-expression "rate(1 hour)" \
--state ENABLED
aws events put-targets \
--rule "hourly-iceberg-ingestion" \
--targets '[{
"Id": "glue-job-target",
"Arn": "arn:aws:glue:us-east-1:ACCOUNT_ID:job/timeseries-iceberg-ingestion",
"RoleArn": "arn:aws:iam::ACCOUNT_ID:role/EventBridgeGlueRole"
}]'
Option B: Athena INSERT INTO (Simple, No Compute Needed)
For smaller datasets, you can skip Glue ETL entirely and use Athena to move data:
-- First, create a temporary table pointing to the landing zone
CREATE EXTERNAL TABLE timeseries_db.cpu_metrics_landing (
timestamp bigint,
measurement string,
tag_hostname string,
tag_region string,
cpu_idle_percent double,
cpu_system_percent double,
cpu_user_percent double,
cpu_total_usage_percent double,
pipeline_version string,
source_system string,
ingested_at bigint
)
PARTITIONED BY (year string, month string, day string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/'
TBLPROPERTIES ('has_encrypted_data'='false');
-- Add partitions (or use MSCK REPAIR TABLE)
MSCK REPAIR TABLE timeseries_db.cpu_metrics_landing;
-- Insert from landing zone into Iceberg table
INSERT INTO timeseries_db.cpu_metrics
SELECT
from_unixtime(timestamp) as timestamp,
tag_hostname as hostname,
tag_region as region,
cpu_idle_percent,
cpu_system_percent,
cpu_user_percent,
cpu_total_usage_percent,
pipeline_version,
source_system,
ingested_at
FROM timeseries_db.cpu_metrics_landing
WHERE year = '2026' AND month = '04' AND day = '03';
Option C: Lambda for Near-Real-Time Ingestion
For near-real-time ingestion, trigger a Lambda function when new files land on S3:
# lambda_iceberg_ingest.py - Triggered by S3 PutObject events
import json
import boto3
import time
athena = boto3.client('athena')
def handler(event, context):
"""Triggered when a new file lands in the landing zone."""
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"New file: s3://{bucket}/{key}")
# Parse the partition info from the S3 path
# Example: landing-zone/measurement=cpu_metrics/year=2026/month=04/day=03/...
parts = key.split('/')
partition_info = {}
for part in parts:
if '=' in part:
k, v = part.split('=', 1)
partition_info[k] = v
measurement = partition_info.get('measurement', 'unknown')
year = partition_info.get('year', '')
month = partition_info.get('month', '')
day = partition_info.get('day', '')
if measurement == 'cpu_metrics':
# Run Athena INSERT INTO query
query = f"""
INSERT INTO timeseries_db.cpu_metrics
SELECT
from_unixtime(timestamp) as timestamp,
tag_hostname as hostname,
tag_region as region,
cpu_idle_percent,
cpu_system_percent,
cpu_user_percent,
cpu_total_usage_percent,
pipeline_version,
source_system,
ingested_at
FROM timeseries_db.cpu_metrics_landing
WHERE year = '{year}' AND month = '{month}' AND day = '{day}'
"""
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': 'timeseries_db'},
ResultConfiguration={
'OutputLocation': 's3://my-timeseries-lakehouse-athena-results/'
}
)
query_id = response['QueryExecutionId']
print(f"Started Athena query: {query_id}")
return {'statusCode': 200, 'body': 'Ingestion triggered'}
Set up the S3 event trigger:
# Create the Lambda function
aws lambda create-function \
--function-name timeseries-iceberg-ingest \
--runtime python3.12 \
--handler lambda_iceberg_ingest.handler \
--role arn:aws:iam::ACCOUNT_ID:role/LambdaIcebergIngestRole \
--zip-file fileb://lambda_package.zip \
--timeout 300 \
--memory-size 256
# Add S3 trigger permission
aws lambda add-permission \
--function-name timeseries-iceberg-ingest \
--statement-id s3-trigger \
--action lambda:InvokeFunction \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::my-timeseries-lakehouse
# Configure S3 bucket notification
aws s3api put-bucket-notification-configuration \
--bucket my-timeseries-lakehouse \
--notification-configuration '{
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:ACCOUNT_ID:function:timeseries-iceberg-ingest",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": "landing-zone/"},
{"Name": "suffix", "Value": ".json"}
]
}
}
}
]
}'
Option D: Apache Spark on EMR
For the highest throughput and most flexibility, run Spark directly on EMR with the Iceberg connector:
# emr_iceberg_job.py - Spark job for EMR
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("InfluxDB-to-Iceberg") \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
# Read new files from landing zone
df = spark.read.json("s3://my-timeseries-lakehouse/landing-zone/measurement=cpu_metrics/year=2026/")
# Transform and write to Iceberg
df_clean = df \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumnRenamed("tag_hostname", "hostname") \
.withColumnRenamed("tag_region", "region") \
.select("timestamp", "hostname", "region",
"cpu_idle_percent", "cpu_system_percent",
"cpu_user_percent", "cpu_total_usage_percent",
"pipeline_version", "source_system", "ingested_at")
# Append to Iceberg table
df_clean.writeTo("glue_catalog.timeseries_db.cpu_metrics").append()
# Run compaction to optimize file sizes
spark.sql("""
CALL glue_catalog.system.rewrite_data_files(
table => 'timeseries_db.cpu_metrics',
options => map('target-file-size-bytes', '134217728')
)
""")
spark.stop()
# Submit the EMR job
aws emr add-steps \
--cluster-id j-XXXXXXXXXXXXX \
--steps '[{
"Type": "Spark",
"Name": "Iceberg Ingestion",
"ActionOnFailure": "CONTINUE",
"Args": [
"--deploy-mode", "cluster",
"--conf", "spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0",
"--conf", "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"s3://my-timeseries-lakehouse/scripts/emr_iceberg_job.py"
]
}]'
Complete End-to-End telegraf.conf
Here is the full, production-ready Telegraf configuration that ties together everything we have discussed. Copy this file, update the environment variables, and you have a working pipeline:
# =============================================================================
# TELEGRAF CONFIGURATION: InfluxDB → S3 Landing Zone (for Iceberg)
# =============================================================================
# This configuration reads time-series data from InfluxDB v2, transforms it
# into a flat columnar schema, and writes it to S3 with Hive-style partitioning
# for subsequent ingestion into Apache Iceberg tables.
# =============================================================================
# Global Agent Configuration
[agent]
## Collection interval - how often input plugins are gathered
interval = "1h"
## Flush interval - how often output plugins write
flush_interval = "5m"
## Jitter to prevent thundering herd
collection_jitter = "30s"
flush_jitter = "30s"
## Metric batch and buffer sizes
metric_batch_size = 10000
metric_buffer_limit = 100000
## Override default hostname
hostname = ""
omit_hostname = true
## Logging
debug = false
quiet = false
logfile = "/var/log/telegraf/telegraf-pipeline.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "100MB"
logfile_rotation_max_archives = 7
# =============================================================================
# INPUT: Read from InfluxDB v2 via Flux queries
# =============================================================================
[[inputs.influxdb_v2]]
urls = ["${INFLUXDB_URL}"]
token = "${INFLUXDB_TOKEN}"
organization = "${INFLUXDB_ORG}"
## CPU Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "cpu_metrics"
## Memory Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "memory_metrics"
## HTTP Request Metrics
[[inputs.influxdb_v2.query]]
bucket = "${INFLUXDB_BUCKET}"
query = '''
from(bucket: v.bucket)
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_measurement"])
'''
measurement = "http_request_metrics"
timeout = "60s"
# =============================================================================
# PROCESSORS: Transform data for Iceberg compatibility
# =============================================================================
# Step 1: Rename fields to clean, descriptive names
[[processors.rename]]
order = 1
[[processors.rename.replace]]
field = "usage_idle"
dest = "cpu_idle_percent"
[[processors.rename.replace]]
field = "usage_system"
dest = "cpu_system_percent"
[[processors.rename.replace]]
field = "usage_user"
dest = "cpu_user_percent"
[[processors.rename.replace]]
field = "used_percent"
dest = "memory_used_percent"
[[processors.rename.replace]]
tag = "host"
dest = "hostname"
# Step 2: Convert field types for schema consistency
[[processors.converter]]
order = 2
[processors.converter.fields]
float = ["cpu_idle_percent", "cpu_system_percent", "cpu_user_percent",
"memory_used_percent", "latency_ms"]
integer = ["available", "count"]
# Step 3: Extract date partitions from timestamp
[[processors.date]]
order = 3
tag_key = "partition_year"
date_format = "2006"
[[processors.date]]
order = 4
tag_key = "partition_month"
date_format = "01"
[[processors.date]]
order = 5
tag_key = "partition_day"
date_format = "02"
# Step 4: Custom transformations (compute derived fields, flatten tags)
[[processors.starlark]]
order = 6
source = '''
load("time", "time")
def apply(metric):
# Compute total CPU usage
if metric.name == "cpu_metrics":
idle = metric.fields.get("cpu_idle_percent", 0.0)
metric.fields["cpu_total_usage_percent"] = round(100.0 - idle, 2)
# Memory health flag
if metric.name == "memory_metrics":
used = metric.fields.get("memory_used_percent", 0.0)
metric.fields["memory_critical"] = used > 95.0
# Flatten all tags into fields for columnar storage
for key, value in metric.tags.items():
if not key.startswith("partition_"):
metric.fields["tag_" + key] = value
# Add metadata
metric.fields["measurement"] = metric.name
metric.fields["source_system"] = "influxdb"
metric.fields["pipeline_version"] = "1.0"
metric.fields["ingested_at"] = int(time.now().unix_nano / 1000000000)
return metric
'''
# =============================================================================
# OUTPUT: Write to S3 with Hive-style partitioning
# =============================================================================
[[outputs.s3]]
bucket = "${AWS_S3_BUCKET}"
s3_key_prefix = "landing-zone/measurement={{.Name}}/year={{.Tag \"partition_year\"}}/month={{.Tag \"partition_month\"}}/day={{.Tag \"partition_day\"}}/"
region = "${AWS_REGION}"
## Authentication (uses environment variables or instance role)
# access_key = "${AWS_ACCESS_KEY_ID}"
# secret_key = "${AWS_SECRET_ACCESS_KEY}"
data_format = "json"
json_timestamp_units = "1s"
## Batching
metric_batch_size = 10000
metric_buffer_limit = 100000
flush_interval = "5m"
flush_jitter = "30s"
use_batch_format = true
# =============================================================================
# MONITORING: Internal Telegraf metrics
# =============================================================================
[[inputs.internal]]
collect_memstats = true
name_prefix = "telegraf_pipeline_"
[[outputs.file]]
files = ["/var/log/telegraf/internal_metrics.json"]
data_format = "json"
namepass = ["telegraf_pipeline_*"]
rotation_interval = "24h"
rotation_max_archives = 7
Set the required environment variables:
# /etc/default/telegraf or /etc/telegraf/telegraf.env
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=my-super-secret-token
INFLUXDB_ORG=my-org
INFLUXDB_BUCKET=metrics
AWS_S3_BUCKET=my-timeseries-lakehouse
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=secret...
Start the pipeline:
# Test the configuration first
telegraf --config /etc/telegraf/telegraf-pipeline.conf --test
# Run in foreground for debugging
telegraf --config /etc/telegraf/telegraf-pipeline.conf
# Run as a service
sudo cp /etc/telegraf/telegraf-pipeline.conf /etc/telegraf/telegraf.conf
sudo systemctl restart telegraf
sudo systemctl status telegraf
sudo journalctl -u telegraf -f
Querying Iceberg Data with Athena
Once data is flowing into your Iceberg tables, you can query it with standard SQL through Amazon Athena. Here are practical queries you will use daily.
Basic Analytical Queries
-- Average CPU usage per host over the last 24 hours
SELECT
hostname,
region,
AVG(cpu_total_usage_percent) as avg_cpu_usage,
MAX(cpu_total_usage_percent) as peak_cpu_usage,
MIN(cpu_idle_percent) as min_idle_percent,
COUNT(*) as data_points
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY hostname, region
ORDER BY avg_cpu_usage DESC;
-- Hourly aggregation for dashboarding
SELECT
date_trunc('hour', timestamp) as hour,
hostname,
AVG(cpu_total_usage_percent) as avg_cpu,
APPROX_PERCENTILE(cpu_total_usage_percent, 0.95) as p95_cpu,
APPROX_PERCENTILE(cpu_total_usage_percent, 0.99) as p99_cpu
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '7' day
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
-- Memory alerts: find hosts with high memory usage
SELECT
hostname,
region,
timestamp,
used_percent,
available / (1024*1024*1024) as available_gb
FROM timeseries_db.memory_metrics
WHERE used_percent > 90
AND timestamp >= current_timestamp - interval '1' hour
ORDER BY used_percent DESC;
Time Travel Queries
One of Iceberg’s killer features is time travel — querying your data as it existed at a previous point in time:
-- Query data as it existed yesterday at noon
SELECT *
FROM timeseries_db.cpu_metrics
FOR TIMESTAMP AS OF TIMESTAMP '2026-04-02 12:00:00'
WHERE hostname = 'server01';
-- Compare current data with data from a week ago
SELECT
current_data.hostname,
current_data.avg_cpu as current_avg_cpu,
historical.avg_cpu as week_ago_avg_cpu,
current_data.avg_cpu - historical.avg_cpu as cpu_change
FROM (
SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '1' day
GROUP BY hostname
) current_data
JOIN (
SELECT hostname, AVG(cpu_total_usage_percent) as avg_cpu
FROM timeseries_db.cpu_metrics
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-27 00:00:00'
WHERE timestamp >= TIMESTAMP '2026-03-26' AND timestamp < TIMESTAMP '2026-03-27'
GROUP BY hostname
) historical ON current_data.hostname = historical.hostname;
-- View table snapshot history
SELECT * FROM timeseries_db.cpu_metrics$snapshots ORDER BY committed_at DESC LIMIT 10;
-- View manifest files
SELECT * FROM timeseries_db.cpu_metrics$manifests;
Joining with Other Data Sources
-- Join CPU metrics with a server inventory table
SELECT
c.hostname,
c.region,
s.instance_type,
s.team,
AVG(c.cpu_total_usage_percent) as avg_cpu,
s.monthly_cost
FROM timeseries_db.cpu_metrics c
JOIN timeseries_db.server_inventory s ON c.hostname = s.hostname
WHERE c.timestamp >= current_timestamp - interval '7' day
GROUP BY c.hostname, c.region, s.instance_type, s.team, s.monthly_cost
HAVING AVG(c.cpu_total_usage_percent) < 10 -- Underutilized servers
ORDER BY s.monthly_cost DESC;
Athena Cost Optimization Tips
SELECT * on large tables.
- Use partition predicates:
WHERE timestamp >= ...triggers Iceberg partition pruning, scanning only relevant Parquet files. - Select specific columns: Parquet is columnar, so
SELECT hostname, cpu_total_usage_percentreads far less data thanSELECT *. - Run compaction regularly: Small files degrade query performance and increase cost. Keep files between 128MB and 256MB.
- Use CTAS for frequent queries: Materialize expensive queries as new Iceberg tables.
Alternative Pipeline: InfluxDB to Telegraf to Kafka to Spark to Iceberg
For organizations that need true streaming ingestion with exactly-once semantics, a Kafka-based pipeline is the way to go. Here's the architecture.
InfluxDB → Telegraf → Kafka Topic → Spark Structured Streaming → Iceberg Table
When to Use Kafka vs S3-Based
- Use S3-based (this guide's main approach) when: batch is acceptable (minutes to hours), data volume is under 1TB/day, you want minimal infrastructure, cost is a priority.
- Use Kafka-based when: you need sub-minute latency, data volume exceeds 1TB/day, you already have a Kafka cluster, you need exactly-once delivery guarantees.
Telegraf Kafka Output Configuration
# telegraf.conf - Output: Kafka
[[outputs.kafka]]
## Kafka broker addresses
brokers = ["kafka-broker-1:9092", "kafka-broker-2:9092", "kafka-broker-3:9092"]
## Topic for all metrics (or use topic_suffix for per-measurement topics)
topic = "influxdb-metrics"
## Use measurement name as topic suffix for separate topics
## Creates topics like: influxdb-metrics-cpu_metrics, influxdb-metrics-memory_metrics
# topic_suffix = {method = "measurement"}
## Compression
compression_codec = "snappy"
## Required acks: 0=none, 1=leader, -1=all replicas
required_acks = -1
## Max message size
max_message_bytes = 1048576
## Data format
data_format = "json"
json_timestamp_units = "1ms"
## SASL authentication (if Kafka requires it)
# sasl_mechanism = "SCRAM-SHA-512"
# sasl_username = "${KAFKA_USERNAME}"
# sasl_password = "${KAFKA_PASSWORD}"
## TLS
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
The Spark Structured Streaming consumer:
# spark_kafka_iceberg.py - Spark Structured Streaming from Kafka to Iceberg
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Kafka-to-Iceberg-Streaming") \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-timeseries-lakehouse/iceberg-warehouse/") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
# Define the schema matching our Telegraf JSON output
metrics_schema = StructType([
StructField("name", StringType()),
StructField("timestamp", LongType()),
StructField("tags", MapType(StringType(), StringType())),
StructField("fields", MapType(StringType(), DoubleType()))
])
# Read from Kafka
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker-1:9092") \
.option("subscribe", "influxdb-metrics") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON messages
df_parsed = df_kafka \
.select(from_json(col("value").cast("string"), metrics_schema).alias("data")) \
.select("data.*") \
.withColumn("timestamp", to_timestamp(col("timestamp").cast("long"))) \
.withColumn("hostname", col("tags")["hostname"]) \
.withColumn("region", col("tags")["region"])
# Write to Iceberg using foreachBatch
def write_to_iceberg(batch_df, batch_id):
batch_df.writeTo("glue_catalog.timeseries_db.all_metrics") \
.option("merge-schema", "true") \
.append()
query = df_parsed.writeStream \
.foreachBatch(write_to_iceberg) \
.option("checkpointLocation", "s3://my-timeseries-lakehouse/checkpoints/kafka-iceberg/") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
Monitoring and Troubleshooting
A data pipeline is only as good as its monitoring. Here's how to keep this pipeline healthy.
Telegraf Internal Metrics
The inputs.internal plugin we configured earlier provides critical operational metrics:
# Check Telegraf metrics buffer status
cat /var/log/telegraf/internal_metrics.json | python3 -m json.tool | grep -E "metrics_gathered|metrics_written|buffer_size"
# Key metrics to monitor:
# - gather_errors: input plugin failures (InfluxDB connection issues)
# - metrics_gathered: total metrics collected per interval
# - metrics_written: total metrics sent to S3
# - buffer_size: current buffer usage (should stay well below buffer_limit)
# - write_errors: output plugin failures (S3 permission or network issues)
Common Issues and Resolutions
| Issue | Symptoms | Resolution |
|---|---|---|
| InfluxDB connection failure | gather_errors increasing, no new metrics |
Verify InfluxDB URL and token. Check network connectivity. Ensure InfluxDB is running. |
| S3 permission denied | write_errors increasing, AccessDenied in logs |
Check IAM policy. Verify AWS credentials. Ensure bucket policy allows PutObject. |
| Schema mismatch in Glue | Athena queries return NULL or fail | Re-run Glue Crawler. Check that JSON field names match table column names. Verify type conversions in Telegraf processors. |
| Glue Crawler fails | Crawler stuck in RUNNING or FAILED state | Check Glue Crawler IAM role. Verify S3 path is correct. Look for malformed JSON files in landing zone. |
| Data type conflicts | Fields showing as wrong type in Athena | Use processors.converter to enforce types in Telegraf. InfluxDB may return integers as floats or vice versa. |
| Buffer overflow | metrics_dropped count increasing |
Increase metric_buffer_limit. Reduce flush_interval. Check for S3 write latency issues. |
| Duplicate data in Iceberg | Row counts higher than expected | Implement idempotent ingestion with MERGE INTO instead of INSERT. Track processed files to avoid re-ingestion. |
| Too many small files | Athena queries slow and expensive | Increase Telegraf batch size. Run Iceberg compaction regularly. Target 128-256MB file sizes. |
Data Validation Queries
-- Check data freshness: how recent is the latest data?
SELECT
MAX(timestamp) as latest_data,
current_timestamp as current_time,
date_diff('minute', MAX(timestamp), current_timestamp) as minutes_behind
FROM timeseries_db.cpu_metrics;
-- Check for data gaps: are there any missing hours?
SELECT
date_trunc('hour', timestamp) as hour,
COUNT(*) as record_count
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '24' hour
GROUP BY 1
ORDER BY 1;
-- Validate data quality: check for NULLs and outliers
SELECT
COUNT(*) as total_records,
COUNT(hostname) as non_null_hostname,
COUNT(cpu_total_usage_percent) as non_null_cpu,
MIN(cpu_total_usage_percent) as min_cpu,
MAX(cpu_total_usage_percent) as max_cpu,
COUNT(CASE WHEN cpu_total_usage_percent > 100 THEN 1 END) as invalid_cpu_over_100,
COUNT(CASE WHEN cpu_total_usage_percent < 0 THEN 1 END) as invalid_cpu_negative
FROM timeseries_db.cpu_metrics
WHERE timestamp >= current_timestamp - interval '1' hour;
Performance Optimization
Getting the pipeline working is one thing. Making it perform well at scale is another. Here are the key tuning parameters.
Telegraf Buffer Tuning
The two most important Telegraf settings are metric_batch_size and metric_buffer_limit:
- metric_batch_size: How many metrics are sent to the output plugin at once. Larger batches reduce S3 API calls but increase memory usage and latency.
- metric_buffer_limit: Maximum metrics held in memory. If the output is slow, metrics queue here. Once full, new metrics are dropped.
Recommended Settings by Data Volume
| Setting | Small (<10K metrics/min) | Medium (10K-100K/min) | Large (>100K/min) |
|---|---|---|---|
metric_batch_size |
5,000 | 10,000 | 50,000 |
metric_buffer_limit |
50,000 | 200,000 | 1,000,000 |
flush_interval |
10m | 5m | 1m |
collection_interval |
1h | 15m | 5m |
| Target S3 file size | 64-128 MB | 128-256 MB | 256-512 MB |
| Partition granularity | Day | Day | Hour |
| Telegraf RAM estimate | 128 MB | 512 MB | 2-4 GB |
| Compaction frequency | Daily | Every 6 hours | Every 1-2 hours |
Iceberg Compaction
Small files are the enemy of Iceberg performance. Schedule compaction to merge small files:
-- Run compaction via Athena (Athena v3 with Iceberg support)
OPTIMIZE timeseries_db.cpu_metrics REWRITE DATA USING BIN_PACK;
-- Or via Spark (more control over target file size)
-- In a Glue ETL job or EMR Spark session:
CALL glue_catalog.system.rewrite_data_files(
table => 'timeseries_db.cpu_metrics',
options => map(
'target-file-size-bytes', '134217728', -- 128MB
'min-file-size-bytes', '67108864', -- 64MB
'max-file-size-bytes', '268435456' -- 256MB
)
);
-- Expire old snapshots to reclaim storage
CALL glue_catalog.system.expire_snapshots(
table => 'timeseries_db.cpu_metrics',
older_than => TIMESTAMP '2026-03-01 00:00:00',
retain_last => 10
);
-- Remove orphan files
CALL glue_catalog.system.remove_orphan_files(
table => 'timeseries_db.cpu_metrics',
older_than => TIMESTAMP '2026-03-01 00:00:00'
);
Partitioning Best Practices for Time-Series Data
- Partition by day for most workloads. This creates a manageable number of partitions and files.
- Add a secondary partition on high-cardinality dimensions like
measurementif you query specific measurements frequently. - Avoid over-partitioning. Partitioning by minute creates millions of tiny files that destroy performance.
- Use Iceberg's hidden partitioning with
day(timestamp)rather than creating explicit partition columns. This means queries ontimestampautomatically trigger partition pruning without users needing to know about partitions. - Monitor partition sizes. If any partition has fewer than 10 files or each file is under 10MB, your partitioning is too granular.
Cost Analysis
Let's look at the real numbers. The cost savings from moving time-series data from InfluxDB to Iceberg on S3 can be dramatic, especially at scale.
| Data Volume | InfluxDB Cloud (storage + queries) | S3 + Iceberg + Athena | Monthly Savings |
|---|---|---|---|
| 100 GB | ~$200/mo (storage) + ~$50/mo (queries) | ~$2.30 (S3) + ~$5 (Athena) + ~$10 (Glue) | ~$233/mo (93% savings) |
| 1 TB | ~$2,000/mo + ~$200/mo | ~$23 (S3) + ~$25 (Athena) + ~$20 (Glue) | ~$2,132/mo (97% savings) |
| 10 TB | ~$20,000/mo + ~$500/mo | ~$230 (S3) + ~$100 (Athena) + ~$50 (Glue) | ~$20,120/mo (98% savings) |
Additional costs to factor in:
- Telegraf compute: Runs on existing infrastructure. Minimal CPU and RAM for most workloads.
- S3 API costs: PUT requests at $0.005 per 1,000. With batching, this is typically under $10/month.
- Glue Crawler: $0.44 per DPU-hour. A daily crawl typically costs $1-5/month.
- Glue ETL: $0.44 per DPU-hour. A daily 10-minute job with 2 DPUs costs ~$13/month.
- Data transfer: Free within the same AWS region. Cross-region adds $0.02/GB.
The break-even point is almost immediate. Even at 100GB, you save over $230/month by moving to S3+Iceberg. The pipeline infrastructure (Telegraf, Glue) costs less than $30/month for most workloads.
Conclusion
Building a data pipeline from InfluxDB to Apache Iceberg through Telegraf is not just technically feasible — it is a compelling architecture that solves real problems. You get to keep InfluxDB doing what it does best (real-time monitoring and dashboards) while offloading historical data to a lakehouse that costs 90-98% less and opens up SQL analytics, ML pipelines, and proper data governance.
Let's recap what we built:
- Telegraf input plugins that pull data from InfluxDB v1.x or v2.x using four different methods, from simple pull-based queries to real-time push-based listeners.
- Telegraf processors that transform InfluxDB's tag/field model into a flat columnar schema suitable for Iceberg, with type conversion, field renaming, computed fields, and date partitioning.
- S3 output with Hive-style partitioning that lands data in formats AWS Glue can discover and catalog.
- Iceberg table creation via Athena DDL or Glue Crawlers, with proper partitioning for time-series workloads.
- Automated ingestion using Glue ETL jobs, Athena INSERT INTO, Lambda triggers, or Spark on EMR.
- A complete, production-ready telegraf.conf that you can deploy today with minimal modifications.
The beauty of this architecture is its modularity. You can start simple — JSON files on S3 with a Glue Crawler — and evolve to Parquet with Spark streaming as your needs grow. Telegraf's plugin architecture means you can swap inputs and outputs without rewriting your transformation logic. And Iceberg's partition evolution means you can change your partitioning strategy without rewriting a single byte of historical data.
If you're sitting on terabytes of time-series data in InfluxDB and your storage bills keep climbing, this pipeline is your escape hatch. Set it up over a weekend, validate it with a week of dual-writing, and then start reducing your InfluxDB retention policies. Your future self — and your finance team — will thank you.
References
- Telegraf Documentation — Official Telegraf plugin documentation and configuration guide
- InfluxDB v2 Documentation — Flux query language and API reference
- Apache Iceberg Documentation — Table format specification and engine integrations
- Amazon Athena Iceberg Integration — Creating and querying Iceberg tables with Athena
- AWS Glue Iceberg Support — Using Iceberg with Glue ETL jobs
- Telegraf Plugin Directory — Complete list of input, processor, and output plugins
- Amazon S3 Documentation — Storage classes, pricing, and lifecycle policies
- Iceberg Spark Integration — Reading and writing Iceberg tables with Apache Spark
Leave a Reply