Architecture¶
This document describes the system architecture of the Amtrak Ingestion pipeline.
Overview¶
The Amtrak Ingestion system is a serverless data pipeline built on AWS Chalice that processes real-time train data through several stages:
Data Ingestion - Fetch data from external APIs
Data Enrichment - Add GTFS metrics and direction information
Event Generation - Generate arrival/departure events
Data Storage - Store processed data in S3
Data Collation - Aggregate daily data for analysis
System Components¶
+------------------+ +------------------+ +------------------+
| Amtraker API | | GTFS Feeds | | AWS EventBridge|
+--------+---------+ +--------+---------+ +--------+---------+
| | |
v v v
+--------+---------+ +--------+---------+ +--------+---------+
| read.py | | gtfs.py | | Scheduled Tasks |
| (Data Reading) | | (GTFS Processing)| | (Cron Triggers) |
+--------+---------+ +--------+---------+ +--------+---------+
| | |
+------------+-----------+------------------------+
|
v
+------------+------------+
| main.py |
| (Pipeline Orchestration)|
+------------+------------+
|
+-------------+-------------+
| |
v v
+-------+--------+ +--------+-------+
| transform.py | | write.py |
| (Enrichment) | | (Event Gen) |
+-------+--------+ +--------+-------+
| |
+-------------+-------------+
|
v
+------------+------------+
| disk.py |
| (Local File Storage) |
+------------+------------+
|
v
+------------+------------+
| s3_upload.py |
| (S3 Operations) |
+------------+------------+
|
v
+------------+------------+
| AWS S3 |
| (Data Persistence) |
+-------------------------+
Data Flow¶
Real-time Data Pipeline¶
The real-time pipeline runs every 5 minutes:
Fetch Data:
read.pyfetches data from the Amtraker APIValidate: Pydantic models validate the API response
Transform: Convert to Polars DataFrame and split by provider
Enrich:
transform.pyadds GTFS direction IDs and scheduled metricsFilter:
timefilter.pyfilters to only new eventsGenerate Events:
write.pycreates arrival/departure recordsStore: Upload compressed JSON to S3
Daily Collation Pipeline¶
The collation pipeline runs daily at 3:00 AM UTC:
Download: Fetch all gzipped JSON files for the previous day from S3
Decompress: Extract JSON data from gzip archives
Process: Parse events and organize by route/direction/stop
Write CSV: Generate CSV files with event data
Compress: Gzip CSV files for storage efficiency
Upload: Store collated data in S3
S3 Data Organization¶
Raw Events¶
Real-time event data is stored in a hierarchical structure:
s3://amtrak-performance/
└── Events-live/
└── raw/
└── {Provider}/
└── Year={YYYY}/
└── Month={MM}/
└── Day={DD}/
└── _{HH}_{MM}.json.gz
Collated Data¶
Daily collated data is organized by route, direction, and stop:
s3://amtrak-performance/
└── Events-live/
└── daily-{Provider}-data/
└── {route}_{direction}_{stop}/
└── Year={YYYY}/
└── Month={MM}/
└── Day={DD}/
└── events.csv.gz
GTFS Cache¶
GTFS bundles are cached in S3:
s3://amtrak-performance/
└── GTFS/
├── Amtrak.zip
├── VIA.zip
├── Brightline.zip
└── last_modified.json
Module Responsibilities¶
Module |
Responsibility |
|---|---|
|
Chalice application entry point, HTTP endpoints, scheduled task definitions |
|
Pipeline orchestration, GTFS bundle management, data collation |
|
API data fetching, validation, DataFrame transformations |
|
Data enrichment with GTFS direction IDs and scheduled metrics |
|
Event generation, service date calculation |
|
GTFS data loading, metrics calculation, direction lookup generation |
|
Local file storage operations |
|
S3 upload/download operations |
|
Event filtering by timestamp |
|
Configuration, logging setup, AWS clients |
|
Constants, enums, field definitions |
Error Handling¶
The pipeline implements several error handling strategies:
API Failures: Log errors and continue with next scheduled run
Invalid Data: Pydantic validation rejects malformed responses
S3 Errors: Retry logic with exponential backoff
GTFS Issues: Fall back to cached data if update fails
Logging¶
All modules use structured logging configured in config.py:
Log format:
%(asctime)s | %(levelname)s | %(name)s | %(message)sDate format:
%Y-%m-%d %H:%M:%SCloudWatch-compatible output