Data Pipeline

This document describes the data processing pipeline in detail.

Overview

The data pipeline transforms raw train data from the Amtraker API into structured arrival and departure events enriched with GTFS schedule information.

Pipeline Stages

Stage 1: Data Ingestion

The pipeline begins by fetching data from the Amtraker API.

Module: read.py

Functions:

  • validate_amtraker_data() - Fetches and validates API response

  • trainresponse_to_polars() - Converts response to Polars DataFrame

  • read_amtraker_data() - Complete ingestion pipeline

Data Source:

  • Amtraker API: https://api-v3.amtraker.com/v3/trains

Output: Polars DataFrame with train and station data

Stage 2: Data Transformation

Raw data is cleaned and restructured for processing.

Module: read.py

Functions:

  • remove_excess_fields() - Removes unnecessary columns

  • explode_df() - Expands nested station arrays

  • remove_bus() - Filters out bus services

  • split_df_by_provider() - Separates data by transit provider

Transformations:

  1. Remove metadata fields not needed for analysis

  2. Explode station list into individual rows

  3. Filter out non-rail services

  4. Split DataFrame by provider (Amtrak, VIA, Brightline)

Stage 3: GTFS Enrichment

Train data is enriched with GTFS schedule information.

Module: transform.py

Functions:

  • add_direction_id() - Adds GTFS direction IDs

  • add_scheduled_metrics() - Adds scheduled headway and travel time

Enrichment Process:

  1. Direction ID: Look up direction (0/1) based on route and headsign

  2. Scheduled Headway: Time gap between consecutive vehicles at a stop

  3. Scheduled Travel Time: Expected time from trip start to each stop

Stage 4: Event Generation

Arrival and departure events are generated from the enriched data.

Module: write.py

Functions:

  • calculate_service_date_from_datetime() - Calculates service date

  • add_service_dates() - Adds service date columns

  • write_amtraker_events() - Generates events

Event Types:

  • ARR - Arrival event (when train arrives at station)

  • DEP - Departure event (when train departs from station)

Service Date Logic:

The service date is the date the trip started, not the calendar date. Events between midnight and 3:30 AM are assigned to the previous day’s service date.

Stage 5: Time Filtering

Only new events are processed to avoid duplicates.

Module: timefilter.py

Functions:

  • filter_events() - Filters by last processed timestamp

  • get_last_processed() - Retrieves last timestamp from S3

  • set_last_processed() - Stores current timestamp in S3

Filtering Logic:

Events are filtered based on the lastValTS field, which indicates when the train data was last updated. Only events newer than the last processed timestamp are included.

Stage 6: Storage

Processed events are stored in AWS S3.

Module: s3_upload.py

Functions:

  • _compress_and_upload_file() - Compresses and uploads files

  • upload_todays_events_to_s3() - Uploads daily event files

Storage Format:

  • JSON format with gzip compression

  • Organized by provider, year, month, day, and timestamp

Event Schema

Each generated event contains the following fields:

Field

Type

Description

service_date

date

The service date for this event

route_id

string

The route identifier

trip_id

string

Unique trip identifier

direction_id

integer

Direction of travel (0 or 1)

stop_id

string

Station identifier

stop_sequence

integer

Order of stop in the trip

vehicle_id

string

Unique vehicle identifier

vehicle_label

string

Human-readable vehicle label

event_type

string

“ARR” for arrival, “DEP” for departure

event_time

datetime

Timestamp of the event

scheduled_headway

integer

Expected time between vehicles (seconds)

scheduled_tt

integer

Expected travel time from trip start (seconds)

GTFS Metrics

Scheduled Headway

The scheduled headway is the expected time gap between consecutive vehicles at a specific stop on a route.

Calculation:

  1. Load GTFS stop_times for the route

  2. Group by stop_id and direction_id

  3. Sort by arrival_time

  4. Calculate difference between consecutive arrivals

Usage:

Headway is used to analyze service frequency and identify gaps in service.

Scheduled Travel Time

The scheduled travel time is the expected elapsed time from the start of a trip to a specific stop.

Calculation:

  1. Load GTFS stop_times for the trip

  2. Get arrival time at each stop

  3. Calculate difference from first stop’s departure time

Usage:

Travel time is used to measure on-time performance and delays.

Provider Configuration

The pipeline supports multiple transit providers:

Provider

Enabled

Description

Amtrak

Yes

National passenger railroad service

VIA

Yes

VIA Rail Canada

Brightline

No

Florida high-speed rail (disabled)

To enable or disable providers, modify the flags in config.py.

Data Quality

Validation

  • Pydantic models validate API responses

  • Missing required fields raise validation errors

  • Invalid data types are rejected

Filtering

  • Bus services are filtered out

  • Disabled providers are excluded

  • Events older than the last processed time are skipped

Error Handling

  • API failures are logged and the pipeline continues

  • Invalid records are skipped with warnings

  • S3 upload failures trigger retries