diff --git a/ingestor/.chalice/config.json b/ingestor/.chalice/config.json index d808fc9..a5f1452 100644 --- a/ingestor/.chalice/config.json +++ b/ingestor/.chalice/config.json @@ -110,8 +110,13 @@ "update_time_predictions": { "iam_policy_file": "policy-time-predictions.json", "lambda_timeout": 300 + }, + "update_service_ridership_dashboard": { + "iam_policy_file": "policy-service-ridership-dashboard.json", + "lambda_timeout": 900, + "lambda_memory_size": 1024 } } } } -} +} \ No newline at end of file diff --git a/ingestor/.chalice/policy-service-ridership-dashboard.json b/ingestor/.chalice/policy-service-ridership-dashboard.json new file mode 100644 index 0000000..24c0164 --- /dev/null +++ b/ingestor/.chalice/policy-service-ridership-dashboard.json @@ -0,0 +1,34 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": "arn:*:logs:*:*:*" + }, + { + "Effect": "Allow", + "Action": "s3:*", + "Resource": [ + "arn:aws:s3:::tm-gtfs", + "arn:aws:s3:::tm-gtfs/*", + "arn:aws:s3:::tm-service-ridership-dashboard", + "arn:aws:s3:::tm-service-ridership-dashboard/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "dynamodb:Query" + ], + "Resource": [ + "arn:aws:dynamodb:us-east-1:473352343756:table/Ridership", + "arn:aws:dynamodb:us-east-1:473352343756:table/ScheduledServiceDaily" + ] + } + ] +} \ No newline at end of file diff --git a/ingestor/app.py b/ingestor/app.py index fd0aa96..c96cc56 100644 --- a/ingestor/app.py +++ b/ingestor/app.py @@ -14,6 +14,7 @@ predictions, landing, trip_metrics, + service_ridership_dashboard, ) app = Chalice(app_name="ingestor") @@ -148,3 +149,9 @@ def store_landing_data(event): ridership_data = landing.get_ridership_data() landing.upload_to_s3(json.dumps(trip_metrics_data), json.dumps(ridership_data)) landing.clear_cache() + + +# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have bene ingested) +@app.schedule(Cron(30, 9, "*", "*", "?", "*")) +def update_service_ridership_dashboard(): + service_ridership_dashboard.create_service_ridership_dash_json() diff --git a/ingestor/chalicelib/gtfs/ingest.py b/ingestor/chalicelib/gtfs/ingest.py index 4661cd7..7d2854e 100644 --- a/ingestor/chalicelib/gtfs/ingest.py +++ b/ingestor/chalicelib/gtfs/ingest.py @@ -18,7 +18,7 @@ date_range, index_by, is_valid_route_id, - get_services_for_date, + get_service_ids_for_date_to_has_exceptions, get_total_service_minutes, ) from .models import SessionModels, RouteDateTotals @@ -50,6 +50,7 @@ def create_gl_route_date_totals(totals: List[RouteDateTotals]): total_by_hour[i] += total.by_hour[i] total_count = sum(t.count for t in gl_totals) total_service_minutes = sum(t.service_minutes for t in gl_totals) + has_service_exceptions = any((t.has_service_exceptions for t in gl_totals)) return RouteDateTotals( route_id="Green", line_id="Green", @@ -57,22 +58,31 @@ def create_gl_route_date_totals(totals: List[RouteDateTotals]): count=total_count, service_minutes=total_service_minutes, by_hour=total_by_hour, + has_service_exceptions=has_service_exceptions, ) def create_route_date_totals(today: date, models: SessionModels): all_totals = [] - services_for_today = get_services_for_date(models, today) + service_ids_and_exception_status_for_today = get_service_ids_for_date_to_has_exceptions(models, today) for route_id, route in models.routes.items(): if not is_valid_route_id(route_id): continue - trips = [trip for trip in models.trips_by_route_id.get(route_id, []) if trip.service_id in services_for_today] + trips = [ + trip + for trip in models.trips_by_route_id.get(route_id, []) + if trip.service_id in service_ids_and_exception_status_for_today.keys() + ] + has_service_exceptions = any( + (service_ids_and_exception_status_for_today.get(trip.service_id, False) for trip in trips) + ) totals = RouteDateTotals( route_id=route_id, line_id=route.line_id, date=today, count=len(trips), by_hour=bucket_trips_by_hour(trips), + has_service_exceptions=has_service_exceptions, service_minutes=get_total_service_minutes(trips), ) all_totals.append(totals) @@ -99,6 +109,7 @@ def ingest_feed_to_dynamo( "lineId": total.line_id, "count": total.count, "serviceMinutes": total.service_minutes, + "hasServiceExceptions": total.has_service_exceptions, "byHour": {"totals": total.by_hour}, } batch.put_item(Item=item) @@ -112,6 +123,7 @@ def ingest_feeds( force_rebuild_feeds: bool = False, ): for feed in feeds: + feed.use_compact_only() try: if force_rebuild_feeds: print(f"[{feed.key}] Forcing rebuild locally") diff --git a/ingestor/chalicelib/gtfs/models.py b/ingestor/chalicelib/gtfs/models.py index 95d05a7..7f8fec0 100644 --- a/ingestor/chalicelib/gtfs/models.py +++ b/ingestor/chalicelib/gtfs/models.py @@ -27,6 +27,7 @@ class RouteDateTotals: count: int service_minutes: int by_hour: List[int] + has_service_exceptions: bool @property def timestamp(self): diff --git a/ingestor/chalicelib/gtfs/utils.py b/ingestor/chalicelib/gtfs/utils.py index 2801bfe..a5e4381 100644 --- a/ingestor/chalicelib/gtfs/utils.py +++ b/ingestor/chalicelib/gtfs/utils.py @@ -64,8 +64,12 @@ def date_range(start_date: date, end_date: date): now = now + timedelta(days=1) -def get_services_for_date(models: "SessionModels", today: date): - services_for_today = set() +def get_service_ids_for_date_to_has_exceptions(models: "SessionModels", today: date) -> dict[str, bool]: + """ + Reports a dict of service IDs that are active on the given date mapped to a boolean indicating if + there are any exceptions for that service on that date. + """ + services_for_today: dict[str, bool] = {} for service_id in models.calendar_services.keys(): service = models.calendar_services.get(service_id) if not service: @@ -81,15 +85,13 @@ def get_services_for_date(models: "SessionModels", today: date): service.saturday, service.sunday, ][today.weekday()] == ServiceDayAvailability.AVAILABLE + service_exceptions_today = [ex for ex in service_exceptions if ex.date == today] is_removed_by_exception = any( - ( - ex.date == today and ex.exception_type == CalendarServiceExceptionType.REMOVED - for ex in service_exceptions - ) + (ex.exception_type == CalendarServiceExceptionType.REMOVED for ex in service_exceptions_today) ) is_added_by_exception = any( - (ex.date == today and ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions) + (ex.exception_type == CalendarServiceExceptionType.ADDED for ex in service_exceptions_today) ) if is_added_by_exception or (in_range and on_sevice_day and not is_removed_by_exception): - services_for_today.add(service_id) + services_for_today[service_id] = len(service_exceptions_today) > 0 return services_for_today diff --git a/ingestor/chalicelib/service_ridership_dashboard/.gitignore b/ingestor/chalicelib/service_ridership_dashboard/.gitignore new file mode 100644 index 0000000..8845ff1 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/.gitignore @@ -0,0 +1 @@ +dash.json \ No newline at end of file diff --git a/ingestor/chalicelib/service_ridership_dashboard/__init__.py b/ingestor/chalicelib/service_ridership_dashboard/__init__.py new file mode 100644 index 0000000..1535671 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/__init__.py @@ -0,0 +1,3 @@ +from .ingest import create_service_ridership_dash_json + +__all__ = ["create_service_ridership_dash_json"] diff --git a/ingestor/chalicelib/service_ridership_dashboard/config.py b/ingestor/chalicelib/service_ridership_dashboard/config.py new file mode 100644 index 0000000..14ec149 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/config.py @@ -0,0 +1,23 @@ +from datetime import date +from pytz import timezone + +# Lower bound for time series and GTFS feeds +PRE_COVID_DATE = date(2020, 2, 24) + +# Date to use as a baseline +START_DATE = date(2018, 1, 1) + +# Boston baby +TIME_ZONE = timezone("US/Eastern") + +# Ignore these +IGNORE_LINE_IDS = ["line-CapeFlyer", "line-Foxboro"] + +# Date ranges with service gaps that we paper over because of major holidays or catastrophes +# rather than doing more complicated special-casing with GTFS services +FILL_DATE_RANGES = [ + (date(2021, 11, 19), date(2021, 11, 26)), # Thanksgiving 2021 + (date(2021, 12, 18), date(2021, 12, 26)), # Christmas 2021 + (date(2022, 12, 18), date(2023, 1, 3)), # Christmas 2022 + (date(2022, 3, 28), date(2022, 3, 29)), # Haymarket garage collapse +] diff --git a/ingestor/chalicelib/service_ridership_dashboard/gtfs.py b/ingestor/chalicelib/service_ridership_dashboard/gtfs.py new file mode 100644 index 0000000..e9ce289 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/gtfs.py @@ -0,0 +1,35 @@ +import boto3 +from typing import Optional +from tempfile import TemporaryDirectory +from mbta_gtfs_sqlite import MbtaGtfsArchive +from mbta_gtfs_sqlite.models import Route, Line + +from ..gtfs.utils import bucket_by, index_by + +from .config import IGNORE_LINE_IDS + +RoutesByLine = dict[Line, Route] + + +def get_routes_by_line(include_only_line_ids: Optional[list[str]]) -> dict[Line, Route]: + s3 = boto3.resource("s3") + archive = MbtaGtfsArchive( + local_archive_path=TemporaryDirectory().name, + s3_bucket=s3.Bucket("tm-gtfs"), + ) + feed = archive.get_latest_feed() + feed.use_compact_only() + feed.download_or_build() + session = feed.create_sqlite_session(compact=True) + lines_by_id = index_by( + session.query(Line).all(), + lambda line: line.line_id, + ) + all_routes_with_line_ids = [ + route + for route in session.query(Route).all() + if route.line_id + and route.line_id not in IGNORE_LINE_IDS + and (not include_only_line_ids or route.line_id in include_only_line_ids) + ] + return bucket_by(all_routes_with_line_ids, lambda route: lines_by_id[route.line_id]) diff --git a/ingestor/chalicelib/service_ridership_dashboard/ingest.py b/ingestor/chalicelib/service_ridership_dashboard/ingest.py new file mode 100644 index 0000000..6c56a79 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/ingest.py @@ -0,0 +1,169 @@ +import json +import click +from datetime import date, datetime, timedelta +from typing import cast, Optional +from pathlib import PurePath + +from .config import ( + START_DATE, + TIME_ZONE, + PRE_COVID_DATE, +) +from .service_levels import get_service_level_entries_by_line_id, ServiceLevelsByDate, ServiceLevelsEntry +from .ridership import ridership_by_line_id, RidershipEntry +from .gtfs import get_routes_by_line +from .service_summaries import summarize_weekly_service_around_date +from .util import date_to_string, date_from_string +from .time_series import get_weekly_median_time_series +from .summary import get_summary_data +from .types import ServiceRegimes, LineData, DashJSON, LineKind +from .s3 import put_dashboard_json_to_s3 + +parent_dir = PurePath(__file__).parent +debug_file_name = parent_dir / "dash.json" + + +def get_line_kind(route_ids: list[str], line_id: str) -> LineKind: + if line_id.startswith("line-Boat"): + return "boat" + if any((r for r in route_ids if r.lower().startswith("cr-"))): + return "regional-rail" + if line_id.startswith("line-SL"): + return "silver" + if line_id in ("line-Red", "line-Orange", "line-Blue", "line-Green"): + return cast(LineKind, line_id.split("-")[1].lower()) + return "bus" + + +def create_service_regimes( + service_levels: ServiceLevelsByDate, + date: date, +) -> ServiceRegimes: + return { + "current": summarize_weekly_service_around_date( + date=date, + service_levels=service_levels, + ), + "oneYearAgo": summarize_weekly_service_around_date( + date=date - timedelta(days=365), + service_levels=service_levels, + ), + "baseline": summarize_weekly_service_around_date( + date=PRE_COVID_DATE, + service_levels=service_levels, + ), + } + + +def create_line_data( + start_date: date, + end_date: date, + service_levels: dict[date, ServiceLevelsEntry], + ridership: dict[date, RidershipEntry], +) -> LineData: + [latest_service_levels_date, *_] = sorted(service_levels.keys(), reverse=True) + service_level_entry = service_levels[latest_service_levels_date] + return { + "id": service_level_entry.line_id, + "shortName": service_level_entry.line_short_name, + "longName": service_level_entry.line_long_name, + "routeIds": service_level_entry.route_ids, + "startDate": date_to_string(start_date), + "lineKind": get_line_kind( + route_ids=service_level_entry.route_ids, + line_id=service_level_entry.line_id, + ), + "ridershipHistory": get_weekly_median_time_series( + entries=ridership, + entry_value_getter=lambda entry: entry.ridership, + start_date=start_date, + max_end_date=end_date, + ), + "serviceHistory": get_weekly_median_time_series( + entries=service_levels, + entry_value_getter=lambda entry: round(sum(entry.service_levels)), + start_date=start_date, + max_end_date=end_date, + ), + "serviceRegimes": create_service_regimes( + service_levels=service_levels, + date=latest_service_levels_date, + ), + } + + +def create_service_ridership_dash_json( + start_date: date = START_DATE, + end_date: date = datetime.now(TIME_ZONE).date(), + write_debug_files: bool = False, + write_to_s3: bool = True, + include_only_line_ids: Optional[list[str]] = None, +): + print( + f"Creating service ridership dashboard JSON for {start_date} to {end_date} " + + f"{'for lines ' + ', '.join(include_only_line_ids) if include_only_line_ids else ''}" + ) + routes_by_line = get_routes_by_line(include_only_line_ids=include_only_line_ids) + service_level_entries = get_service_level_entries_by_line_id( + routes_by_line=routes_by_line, + start_date=start_date, + end_date=end_date, + ) + ridership_entries = ridership_by_line_id( + start_date=start_date, + end_date=end_date, + line_ids=list(service_level_entries.keys()), + ) + line_data_by_line_id = { + line_id: create_line_data( + start_date=start_date, + end_date=end_date, + service_levels=service_level_entries[line_id], + ridership=ridership_entries[line_id], + ) + for line_id in service_level_entries.keys() + if service_level_entries[line_id] + and ridership_entries[line_id] + and len(service_level_entries[line_id]) + and len(ridership_entries[line_id]) + } + summary_data = get_summary_data( + line_data=list(line_data_by_line_id.values()), + start_date=start_date, + end_date=end_date, + ) + dash_json: DashJSON = { + "summaryData": summary_data, + "lineData": line_data_by_line_id, + } + if write_debug_files: + with open(debug_file_name, "w") as f: + json.dump(dash_json, f) + if write_to_s3: + put_dashboard_json_to_s3(today=end_date, dash_json=dash_json) + + +@click.command() +@click.option("--start", default=START_DATE, help="Start date for the dashboard") +@click.option("--end", default=datetime.now(TIME_ZONE).date(), help="End date for the dashboard") +@click.option("--debug", default=False, help="Write debug file", is_flag=True) +@click.option("--s3", default=False, help="Write to S3", is_flag=True) +@click.option("--lines", default=None, help="Include only these line IDs") +def create_service_ridership_dash_json_command( + start: str, + end: str, + debug: bool = False, + s3: bool = False, + lines: Optional[str] = None, +): + create_service_ridership_dash_json( + start_date=date_from_string(start), + end_date=date_from_string(end), + write_debug_files=debug, + write_to_s3=s3, + include_only_line_ids=[f"line-{line}" for line in lines.split(",")] if lines else None, + ) + + +if __name__ == "__main__": + create_service_ridership_dash_json_command() diff --git a/ingestor/chalicelib/service_ridership_dashboard/queries.py b/ingestor/chalicelib/service_ridership_dashboard/queries.py new file mode 100644 index 0000000..b75b0b2 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/queries.py @@ -0,0 +1,48 @@ +import boto3 +from datetime import date +from boto3.dynamodb.conditions import Key +from dynamodb_json import json_util as ddb_json +from typing import TypedDict + +# Create a DynamoDB resource +dynamodb = boto3.resource("dynamodb") + + +class ByHour(TypedDict): + totals: list[int] + + +class ScheduledServiceRow(TypedDict): + routeId: str + date: str + byHour: ByHour + count: int + hasServiceExceptions: bool + lineId: str + serviceMinutes: int + timestamp: int + + +class RidershipRow(TypedDict): + lineId: str + count: int + date: str + timestamp: int + + +def query_scheduled_service(start_date: date, end_date: date, route_id: str) -> list[ScheduledServiceRow]: + table = dynamodb.Table("ScheduledServiceDaily") + date_condition = Key("date").between(start_date.isoformat(), end_date.isoformat()) + route_condition = Key("routeId").eq(route_id) + condition = date_condition & route_condition + response = table.query(KeyConditionExpression=condition) + return ddb_json.loads(response["Items"]) + + +def query_ridership(start_date: date, end_date: date, line_id: str) -> list[RidershipRow]: + table = dynamodb.Table("Ridership") + date_condition = Key("date").between(start_date.isoformat(), end_date.isoformat()) + line_condition = Key("lineId").eq(line_id) + condition = date_condition & line_condition + response = table.query(KeyConditionExpression=condition) + return ddb_json.loads(response["Items"]) diff --git a/ingestor/chalicelib/service_ridership_dashboard/ridership.py b/ingestor/chalicelib/service_ridership_dashboard/ridership.py new file mode 100644 index 0000000..f7f39d7 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/ridership.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass +from datetime import date +from tqdm import tqdm + +from .queries import query_ridership, RidershipRow +from .util import date_from_string + + +@dataclass +class RidershipEntry: + date: date + ridership: float + + +RidershipByDate = dict[date, RidershipEntry] +RidershipByLineId = dict[str, RidershipByDate] + + +def _get_ridership_for_line_id(start_date: date, end_date: date, line_id: str) -> RidershipByDate: + ridership_by_date: RidershipByDate = {} + entries: list[RidershipRow] = query_ridership( + start_date=start_date, + end_date=end_date, + line_id=line_id, + ) + for entry in entries: + date = date_from_string(entry["date"]) + ridership_by_date[date] = RidershipEntry( + date=date, + ridership=entry["count"], + ) + return ridership_by_date + + +def ridership_by_line_id(start_date: date, end_date: date, line_ids: list[str]) -> RidershipByLineId: + ridership_by_line_id: RidershipByLineId = {} + for line_id in (progress := tqdm(line_ids)): + progress.set_description(f"Loading ridership for {line_id}") + entries_for_line_id = _get_ridership_for_line_id( + start_date=start_date, + end_date=end_date, + line_id=line_id, + ) + ridership_by_line_id[line_id] = entries_for_line_id + return ridership_by_line_id diff --git a/ingestor/chalicelib/service_ridership_dashboard/s3.py b/ingestor/chalicelib/service_ridership_dashboard/s3.py new file mode 100644 index 0000000..b0a447e --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/s3.py @@ -0,0 +1,15 @@ +import boto3 +import json +from datetime import date + +from .util import date_to_string +from .types import DashJSON + +bucket = boto3.resource("s3").Bucket("tm-service-ridership-dashboard") + + +def put_dashboard_json_to_s3(today: date, dash_json: DashJSON) -> None: + print("Uploading dashboard JSON to S3") + contents = json.dumps(dash_json) + bucket.put_object(Key=f"{date_to_string(today)}.json", Body=contents) + bucket.put_object(Key="latest.json", Body=contents) diff --git a/ingestor/chalicelib/service_ridership_dashboard/service_levels.py b/ingestor/chalicelib/service_ridership_dashboard/service_levels.py new file mode 100644 index 0000000..43b39e0 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/service_levels.py @@ -0,0 +1,73 @@ +from dataclasses import dataclass +from datetime import date +from tqdm import tqdm + +from .queries import query_scheduled_service, ScheduledServiceRow +from .gtfs import RoutesByLine +from .util import bucket_by, index_by, date_range, date_to_string + + +@dataclass +class ServiceLevelsEntry: + line_id: str + line_short_name: str + line_long_name: str + route_ids: list[str] + service_levels: list[int] + has_service_exceptions: bool + date: date + + +ServiceLevelsByDate = dict[date, ServiceLevelsEntry] +ServiceLevelsByLineId = dict[str, ServiceLevelsByDate] + + +def _divide_by_two_to_get_unidirectional_trip_counts(trip_counts: list[int]): + return [count / 2 for count in trip_counts] + + +def _get_trip_count_by_hour_totals_for_day(rows_for_day: list[ScheduledServiceRow]) -> list[int]: + by_hour_counts_for_day: list[list[int]] = [item["byHour"]["totals"] for item in rows_for_day] + bidirectional_trip_counts = [sum(hour_values) for hour_values in zip(*by_hour_counts_for_day)] + return _divide_by_two_to_get_unidirectional_trip_counts(bidirectional_trip_counts) + + +def _get_has_service_exception(rows_for_day: list[ScheduledServiceRow]) -> bool: + return any(item.get("hasServiceExceptions") for item in rows_for_day) + + +def get_service_level_entries_by_line_id( + routes_by_line: RoutesByLine, + start_date: date, + end_date: date, +) -> ServiceLevelsByLineId: + entries: dict[str, list[ServiceLevelsEntry]] = {} + for line, routes in (progress := tqdm(routes_by_line.items())): + entries.setdefault(line.line_id, []) + progress.set_description(f"Loading service levels for {line.line_id}") + results_by_date_str: dict[str, list[ScheduledServiceRow]] = bucket_by( + [ + row + for route in routes + for row in query_scheduled_service( + start_date=start_date, + end_date=end_date, + route_id=route.route_id, + ) + ], + lambda row: row["date"], + ) + for today in date_range(start_date, end_date): + today_str = date_to_string(today) + all_service_levels_today = results_by_date_str.get(today_str, []) + entry = ServiceLevelsEntry( + date=today, + line_id=line.line_id, + line_short_name=line.line_short_name, + line_long_name=line.line_long_name, + route_ids=[route.route_id for route in routes], + service_levels=_get_trip_count_by_hour_totals_for_day(all_service_levels_today), + has_service_exceptions=_get_has_service_exception(all_service_levels_today), + ) + entries[line.line_id].append(entry) + return {line_id: index_by(entries, lambda e: e.date) for line_id, entries in entries.items()} diff --git a/ingestor/chalicelib/service_ridership_dashboard/service_summaries.py b/ingestor/chalicelib/service_ridership_dashboard/service_summaries.py new file mode 100644 index 0000000..caa7773 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/service_summaries.py @@ -0,0 +1,108 @@ +from typing import Optional +from datetime import date, timedelta + +from .service_levels import ServiceLevelsEntry, ServiceLevelsByDate +from .types import ServiceSummary, ServiceSummaryForDay + + +def _is_matching_service_levels_entry( + entry: ServiceLevelsEntry, + valid_date_range_inclusive: tuple[date, date], + matching_days_of_week: list[int], + require_typical_service: bool, +) -> bool: + valid_start_date, valid_end_date = valid_date_range_inclusive + return ( + valid_start_date <= entry.date <= valid_end_date + and entry.date.weekday() in matching_days_of_week + and (not require_typical_service or not entry.has_service_exceptions) + ) + + +def _get_matching_service_levels_entry( + service_levels: ServiceLevelsByDate, + start_lookback_date: date, + max_lookback_days: int, + matching_days_of_week: list[int], + require_typical_service: bool, +) -> Optional[ServiceLevelsEntry]: + end_lookback_date = start_lookback_date - timedelta(days=max_lookback_days) + for lookback_date in sorted(service_levels.keys(), reverse=True): + if _is_matching_service_levels_entry( + service_levels[lookback_date], + (end_lookback_date, start_lookback_date), + matching_days_of_week, + require_typical_service, + ): + return service_levels[lookback_date] + return None + + +def _is_service_cancelled_on_date( + service_levels: ServiceLevelsByDate, + start_lookback_date: date, + matching_days_of_week: list[int], +) -> bool: + return ( + _get_matching_service_levels_entry( + service_levels=service_levels, + start_lookback_date=start_lookback_date, + matching_days_of_week=matching_days_of_week, + require_typical_service=False, + max_lookback_days=7, + ) + is None + ) + + +def _get_service_levels_summary_dict( + start_lookback_date: date, + service_levels: ServiceLevelsByDate, + matching_days_of_week: list[int], +) -> ServiceSummaryForDay: + if _is_service_cancelled_on_date( + service_levels=service_levels, + start_lookback_date=start_lookback_date, + matching_days_of_week=matching_days_of_week, + ): + return { + "cancelled": True, + "tripsPerHour": None, + "totalTrips": 0, + } + service_levels_entry = _get_matching_service_levels_entry( + start_lookback_date=start_lookback_date, + service_levels=service_levels, + matching_days_of_week=matching_days_of_week, + require_typical_service=True, + max_lookback_days=(1000 * 365), + ) + assert service_levels_entry is not None + return { + "cancelled": False, + "tripsPerHour": service_levels_entry.service_levels, + "totalTrips": round(sum(service_levels_entry.service_levels)), + } + + +def summarize_weekly_service_around_date(date: date, service_levels: ServiceLevelsByDate) -> ServiceSummary: + weekday_service = _get_service_levels_summary_dict( + service_levels=service_levels, + start_lookback_date=date, + matching_days_of_week=list(range(0, 5)), + ) + saturday_service = _get_service_levels_summary_dict( + service_levels=service_levels, + start_lookback_date=date, + matching_days_of_week=[5], + ) + sunday_service = _get_service_levels_summary_dict( + service_levels=service_levels, + start_lookback_date=date, + matching_days_of_week=[6], + ) + return { + "weekday": weekday_service, + "saturday": saturday_service, + "sunday": sunday_service, + } diff --git a/ingestor/chalicelib/service_ridership_dashboard/summary.py b/ingestor/chalicelib/service_ridership_dashboard/summary.py new file mode 100644 index 0000000..be768e3 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/summary.py @@ -0,0 +1,53 @@ +from datetime import date + +from .util import date_to_string +from .types import LineData, SummaryData +from .time_series import ( + merge_weekly_median_time_series, + get_latest_weekly_median_time_series_entry, +) + + +def _line_is_cancelled(line: LineData) -> bool: + return line["serviceRegimes"]["current"]["weekday"]["cancelled"] + + +def _line_has_reduced_service(line: LineData) -> bool: + try: + weekday_service_last_year = line["serviceRegimes"]["oneYearAgo"]["weekday"]["totalTrips"] + weekday_service_present = line["serviceRegimes"]["current"]["weekday"]["totalTrips"] + return weekday_service_present / weekday_service_last_year < (19 / 20) + except ZeroDivisionError: + return False + + +def _line_has_increased_service(line: LineData) -> bool: + try: + weekday_service_last_year = line["serviceRegimes"]["oneYearAgo"]["weekday"]["totalTrips"] + weekday_service_present = line["serviceRegimes"]["current"]["weekday"]["totalTrips"] + return weekday_service_present / weekday_service_last_year > (20 / 19) + except ZeroDivisionError: + return False + + +def get_summary_data(line_data: list[LineData], start_date: date, end_date: date) -> SummaryData: + total_ridership_history = merge_weekly_median_time_series([line["ridershipHistory"] for line in line_data]) + total_service_history = merge_weekly_median_time_series([line["serviceHistory"] for line in line_data]) + total_passengers = get_latest_weekly_median_time_series_entry(total_ridership_history) + total_trips = get_latest_weekly_median_time_series_entry(total_service_history) + total_routes_cancelled = sum(_line_is_cancelled(line) for line in line_data) + total_reduced_service = sum(_line_has_reduced_service(line) for line in line_data) + total_increased_service = sum(_line_has_increased_service(line) for line in line_data) + return { + "totalRidershipHistory": total_ridership_history, + "totalServiceHistory": total_service_history, + "totalRidershipPercentage": 0, # From CRD, remove + "totalServicePercentage": 0, # From CRD, remove + "totalPassengers": total_passengers or 0, + "totalTrips": total_trips or 0, + "totalRoutesCancelled": total_routes_cancelled, + "totalReducedService": total_reduced_service, + "totalIncreasedService": total_increased_service, + "startDate": date_to_string(start_date), + "endDate": date_to_string(end_date), + } diff --git a/ingestor/chalicelib/service_ridership_dashboard/time_series.py b/ingestor/chalicelib/service_ridership_dashboard/time_series.py new file mode 100644 index 0000000..a057ea9 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/time_series.py @@ -0,0 +1,142 @@ +from typing import TypeVar, Callable, Optional +from datetime import date, timedelta + +from .util import date_range, date_to_string +from .config import FILL_DATE_RANGES +from .types import WeeklyMedianTimeSeries + +Entry = TypeVar("Entry") +EntryDict = dict[date, Entry] + + +def _iterate_mondays( + entries: EntryDict, + start_date: date, + max_end_date: date, +): + max_found_date = max(entries.keys()) + end_date = min(max_end_date, max_found_date) + for today in date_range(start_date, end_date): + if today in entries: + yield today, entries[today] + + +def _get_entry_value_for_date( + entries: EntryDict, + date: date, + entry_value_getter: Callable[[Entry], float], +) -> Optional[float]: + if date in entries: + return entry_value_getter(entries[date]) + for previous_date in sorted(entries.keys(), reverse=True): + if previous_date < date: + return entry_value_getter(entries[previous_date]) + return None + + +def _choose_between_previous_and_current_value( + current_value: Optional[float], + previous_value: Optional[float], + today: date, +) -> float: + if current_value is None and previous_value is not None: + return previous_value + is_weekend = today.weekday() >= 5 + if is_weekend and previous_value: + return previous_value + return current_value or 0 + + +def _date_ranges_intersect( + a: tuple[date, date], + b: tuple[date, date], +) -> bool: + return a[0] <= b[1] and b[0] <= a[1] + + +def _find_zero_ranges_in_time_series(time_series: list[float]) -> list[tuple[int, int]]: + zero_ranges = [] + current_range_start = None + for i, value in enumerate(time_series): + if value == 0: + if current_range_start is None: + current_range_start = i + else: + if current_range_start is not None: + zero_ranges.append((current_range_start, i - 1)) + current_range_start = None + if current_range_start is not None: + zero_ranges.append((current_range_start, len(time_series) - 1)) + return zero_ranges + + +def _fill_zero_ranges_in_time_series(time_series: list[float], start_date: date) -> list[float]: + zero_ranges = _find_zero_ranges_in_time_series(time_series) + altered_time_series = time_series.copy() + for range_start_idx, range_end_idx in zero_ranges: + last_non_zero_value = time_series[range_start_idx - 1] if range_start_idx > 0 else 0 + zero_date_range = ( + start_date + timedelta(days=range_start_idx), + start_date + timedelta(days=range_end_idx), + ) + should_fill_special_range = any( + _date_ranges_intersect(zero_date_range, fill_range) for fill_range in FILL_DATE_RANGES + ) + should_fill_small_range = range_end_idx - range_start_idx <= 5 + should_fill = should_fill_special_range or should_fill_small_range + if should_fill: + for i in range(range_start_idx, range_end_idx + 1): + altered_time_series[i] = last_non_zero_value + return altered_time_series + + +def _get_monday_of_week_containing_date(date: date) -> date: + return date - timedelta(days=date.weekday()) + + +def _bucket_by_week(entries: dict[date, Entry]) -> dict[date, list[Entry]]: + buckets: dict[date, list[Entry]] = {} + for today, entry in entries.items(): + week_start = _get_monday_of_week_containing_date(today) + buckets.setdefault(week_start, []) + buckets[week_start].append(entry) + return buckets + + +def get_weekly_median_time_series( + entries: dict[date, Entry], + entry_value_getter: Callable[[Entry], float], + start_date: date, + max_end_date: date, +) -> WeeklyMedianTimeSeries: + weekly_buckets = _bucket_by_week(entries) + weekly_medians: dict[str, float] = {} + for week_start, week_entries in _iterate_mondays(weekly_buckets, start_date, max_end_date): + week_values = [entry_value_getter(entry) for entry in week_entries] + week_values.sort() + weekly_medians[date_to_string(week_start)] = week_values[len(week_values) // 2] + return weekly_medians + + +def merge_weekly_median_time_series(many_series: list[WeeklyMedianTimeSeries]) -> WeeklyMedianTimeSeries: + merged_series: dict[str, float] = {} + for series in many_series: + for week_start, value in series.items(): + merged_series.setdefault(week_start, 0) + merged_series[week_start] += value + return merged_series + + +def get_weekly_median_time_series_entry_for_date(series: WeeklyMedianTimeSeries, date: date) -> Optional[float]: + monday = _get_monday_of_week_containing_date(date) + return series.get(date_to_string(monday)) + + +def get_latest_weekly_median_time_series_entry(series: WeeklyMedianTimeSeries) -> Optional[float]: + latest_date = max(series.keys()) + return series.get(latest_date) + + +def get_earliest_weekly_median_time_series_entry(series: WeeklyMedianTimeSeries) -> Optional[float]: + earliest_date = min(series.keys()) + return series.get(earliest_date) diff --git a/ingestor/chalicelib/service_ridership_dashboard/types.py b/ingestor/chalicelib/service_ridership_dashboard/types.py new file mode 100644 index 0000000..4f6c4c0 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/types.py @@ -0,0 +1,63 @@ +from typing import TypedDict, Literal, Optional + +LineKind = Literal[ + "bus", + "regional-rail", + "silver", + "red", + "orange", + "blue", + "green", + "boat", +] + +WeeklyMedianTimeSeries = dict[str, float] # Map yyyy-mm-dd to numbers + + +class ServiceSummaryForDay(TypedDict): + cancelled: bool + tripsPerHour: Optional[list[int]] + totalTrips: int + + +class ServiceSummary(TypedDict): + weekday: ServiceSummaryForDay + saturday: ServiceSummaryForDay + sunday: ServiceSummaryForDay + + +class ServiceRegimes(TypedDict): + current: ServiceSummary + oneYearAgo: ServiceSummary + baseline: ServiceSummary # This is the pre-covid service level from CRD + + +class LineData(TypedDict): + id: str + shortName: str + longName: str + routeIds: list[str] + startDate: str + lineKind: LineKind + ridershipHistory: WeeklyMedianTimeSeries + serviceHistory: WeeklyMedianTimeSeries + serviceRegimes: ServiceRegimes + + +class SummaryData(TypedDict): + totalRidershipHistory: WeeklyMedianTimeSeries + totalServiceHistory: WeeklyMedianTimeSeries + totalRidershipPercentage: float + totalServicePercentage: float + totalPassengers: float + totalTrips: float + totalRoutesCancelled: int + totalReducedService: int + totalIncreasedService: int + startDate: str + endDate: str + + +class DashJSON(TypedDict): + lineData: dict[str, LineData] + summaryData: SummaryData diff --git a/ingestor/chalicelib/service_ridership_dashboard/util.py b/ingestor/chalicelib/service_ridership_dashboard/util.py new file mode 100644 index 0000000..6804b09 --- /dev/null +++ b/ingestor/chalicelib/service_ridership_dashboard/util.py @@ -0,0 +1,69 @@ +from typing import Tuple +from datetime import datetime, date, timedelta + + +def date_from_string(date_str): + return datetime.strptime(date_str, "%Y-%m-%d").date() + + +def date_to_string(date): + return date.strftime("%Y-%m-%d") + + +def index_by(items, key_getter): + res = {} + if isinstance(key_getter, str): + key_getter_as_str = key_getter + key_getter = lambda dict: dict[key_getter_as_str] # noqa: E731 + for item in items: + res[key_getter(item)] = item + return res + + +def bucket_by(items, key_getter): + res = {} + if isinstance(key_getter, str): + key_getter_as_str = key_getter + key_getter = lambda dict: dict[key_getter_as_str] # noqa: E731 + for item in items: + key = key_getter(item) + key_items = res.setdefault(key, []) + key_items.append(item) + return res + + +def get_ranges_of_same_value(items_dict): + current_value = None + current_keys = None + sorted_items = sorted(items_dict.items(), key=lambda item: item[0]) + for key, value in sorted_items: + if value == current_value: + current_keys.append(key) + else: + if current_keys: + yield current_keys, current_value + current_keys = [key] + current_value = value + if len(current_keys) > 0: + yield current_keys, current_value + + +def get_date_ranges_of_same_value(items_dict): + for dates, value in get_ranges_of_same_value(items_dict): + min_date = min(dates) + max_date = max(dates) + yield (min_date, max_date), value + + +def date_range(start_date: date, end_date: date): + assert start_date <= end_date + now = start_date + while now <= end_date: + yield now + now = now + timedelta(days=1) + + +def date_range_contains(containing: Tuple[date, date], contained: Tuple[date, date]): + (containing_from, containing_to) = containing + (contained_from, contained_to) = contained + return contained_from >= containing_from and contained_to <= containing_to diff --git a/poetry.lock b/poetry.lock index cb9cec9..534c37f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1682,4 +1682,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "~3.11" -content-hash = "71cb884bfed9d45d31d7f4d818f677d0419cfa759380af76ea92cfec2427b4d0" +content-hash = "21174c12d2c64b56619b439aa9e86d5cfd82e59d3609e193dd25a3c8777cda14" diff --git a/pyproject.toml b/pyproject.toml index d9b363a..d494439 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ openpyxl = "^3.1.2" dynamodb-json = "^1.3" datadog_lambda = "5.94.0" tqdm = "^4.66.1" +click = "^8.1.7" [tool.poetry.dev-dependencies] chalice = "^1.31.2"