Data Pipeline¶
This document describes the data processing pipeline in detail.
Overview¶
The data pipeline transforms raw train data from the Amtraker API into structured arrival and departure events enriched with GTFS schedule information.
Pipeline Stages¶
Stage 1: Data Ingestion¶
The pipeline begins by fetching data from the Amtraker API.
Module: read.py
Functions:
validate_amtraker_data()- Fetches and validates API responsetrainresponse_to_polars()- Converts response to Polars DataFrameread_amtraker_data()- Complete ingestion pipeline
Data Source:
Amtraker API:
https://api-v3.amtraker.com/v3/trains
Output: Polars DataFrame with train and station data
Stage 2: Data Transformation¶
Raw data is cleaned and restructured for processing.
Module: read.py
Functions:
remove_excess_fields()- Removes unnecessary columnsexplode_df()- Expands nested station arraysremove_bus()- Filters out bus servicessplit_df_by_provider()- Separates data by transit provider
Transformations:
Remove metadata fields not needed for analysis
Explode station list into individual rows
Filter out non-rail services
Split DataFrame by provider (Amtrak, VIA, Brightline)
Stage 3: GTFS Enrichment¶
Train data is enriched with GTFS schedule information.
Module: transform.py
Functions:
add_direction_id()- Adds GTFS direction IDsadd_scheduled_metrics()- Adds scheduled headway and travel time
Enrichment Process:
Direction ID: Look up direction (0/1) based on route and headsign
Scheduled Headway: Time gap between consecutive vehicles at a stop
Scheduled Travel Time: Expected time from trip start to each stop
Stage 4: Event Generation¶
Arrival and departure events are generated from the enriched data.
Module: write.py
Functions:
calculate_service_date_from_datetime()- Calculates service dateadd_service_dates()- Adds service date columnswrite_amtraker_events()- Generates events
Event Types:
ARR- Arrival event (when train arrives at station)DEP- Departure event (when train departs from station)
Service Date Logic:
The service date is the date the trip started, not the calendar date. Events between midnight and 3:30 AM are assigned to the previous day’s service date.
Stage 5: Time Filtering¶
Only new events are processed to avoid duplicates.
Module: timefilter.py
Functions:
filter_events()- Filters by last processed timestampget_last_processed()- Retrieves last timestamp from S3set_last_processed()- Stores current timestamp in S3
Filtering Logic:
Events are filtered based on the lastValTS field, which indicates
when the train data was last updated. Only events newer than the last
processed timestamp are included.
Stage 6: Storage¶
Processed events are stored in AWS S3.
Module: s3_upload.py
Functions:
_compress_and_upload_file()- Compresses and uploads filesupload_todays_events_to_s3()- Uploads daily event files
Storage Format:
JSON format with gzip compression
Organized by provider, year, month, day, and timestamp
Event Schema¶
Each generated event contains the following fields:
Field |
Type |
Description |
|---|---|---|
|
date |
The service date for this event |
|
string |
The route identifier |
|
string |
Unique trip identifier |
|
integer |
Direction of travel (0 or 1) |
|
string |
Station identifier |
|
integer |
Order of stop in the trip |
|
string |
Unique vehicle identifier |
|
string |
Human-readable vehicle label |
|
string |
“ARR” for arrival, “DEP” for departure |
|
datetime |
Timestamp of the event |
|
integer |
Expected time between vehicles (seconds) |
|
integer |
Expected travel time from trip start (seconds) |
GTFS Metrics¶
Scheduled Headway¶
The scheduled headway is the expected time gap between consecutive vehicles at a specific stop on a route.
Calculation:
Load GTFS stop_times for the route
Group by stop_id and direction_id
Sort by arrival_time
Calculate difference between consecutive arrivals
Usage:
Headway is used to analyze service frequency and identify gaps in service.
Scheduled Travel Time¶
The scheduled travel time is the expected elapsed time from the start of a trip to a specific stop.
Calculation:
Load GTFS stop_times for the trip
Get arrival time at each stop
Calculate difference from first stop’s departure time
Usage:
Travel time is used to measure on-time performance and delays.
Provider Configuration¶
The pipeline supports multiple transit providers:
Provider |
Enabled |
Description |
|---|---|---|
Amtrak |
Yes |
National passenger railroad service |
VIA |
Yes |
VIA Rail Canada |
Brightline |
No |
Florida high-speed rail (disabled) |
To enable or disable providers, modify the flags in config.py.
Data Quality¶
Validation¶
Pydantic models validate API responses
Missing required fields raise validation errors
Invalid data types are rejected
Filtering¶
Bus services are filtered out
Disabled providers are excluded
Events older than the last processed time are skipped
Error Handling¶
API failures are logged and the pipeline continues
Invalid records are skipped with warnings
S3 upload failures trigger retries