Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add incremental flag for prod target in el.py #16

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ tasks:
bi:
cmds:
- npm run dev --prefix ./reports
clean:
cmds:
- rm -f ./data/*.json.gz 2>/dev/null; exit 0
76 changes: 46 additions & 30 deletions el.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import argparse
import os
from datetime import date, datetime, timedelta
from datetime import datetime, timedelta

import duckdb
import requests
from halo import Halo
from tqdm import tqdm


def validate_date(date_str):
def validate_date(datetime_str):
try:
input_date = datetime.strptime(date_str, "%Y-%m-%d").date()
current_date = date.today()
if input_date < date(2023, 1, 1) or input_date > current_date:
input_datetime = datetime.strptime(datetime_str, "%Y-%m-%d-%H")
current_datetime = datetime.now()
if input_datetime < datetime(2015, 1, 1) or input_datetime > current_datetime:
raise argparse.ArgumentTypeError(
f"Date {date_str} is outside the acceptable range."
f"Datetime {datetime_str} is outside the acceptable range."
)
return input_date
return input_datetime
except ValueError:
raise argparse.ArgumentTypeError(
f"Invalid date format: {date_str}. Please use the format YYYY-MM-DD."
f"Invalid datetime format: {datetime_str}. \
Please use the format YYYY-MM-DD-HH."
)


Expand Down Expand Up @@ -54,6 +55,25 @@ def download_data(active_datetime):
print(f"🎉 Hooray! {url_datetime} already exists. Skipping download.")


def extract_data(start_datetime, end_datetime):
total_hours = int((end_datetime - start_datetime).total_seconds() / 3600)
progress_bar = tqdm(total=total_hours)

active_datetime = start_datetime

while active_datetime <= end_datetime:
download_data(active_datetime)
if args.incremental and args.prod:
load_data()
os.remove(
f"data/{datetime.strftime(active_datetime, '%Y-%m-%d-%-H')}.json.gz"
)
active_datetime += timedelta(hours=1)
progress_bar.update(1)

progress_bar.close()


def load_data():
if args.check:
data_path = "data-test"
Expand Down Expand Up @@ -106,22 +126,24 @@ def load_data():
"""
)
con.close()
spinner.succeed("🦆 Loading data into DuckDB... Done!")
if args.prod:
spinner.succeed("🦆☁️ Loading data into MotherDuck... Done!")
else:
spinner.succeed("🦆💾 Loading data into DuckDB... Done!")


parser = argparse.ArgumentParser()
# TODO Allow hourly granularity
parser.add_argument(
"start_date",
"start_datetime",
help="The start date of the range",
default=str(date.today() - timedelta(days=1)),
default=datetime.strftime(datetime.now() - timedelta(hours=1), "%Y-%m-%d-%H"),
nargs="?",
type=validate_date,
)
parser.add_argument(
"end_date",
help="The end date of the range",
default=str(date.today()),
"end_datetime",
help="The end date of the range (inclusive)",
default=datetime.strftime(datetime.now(), "%Y-%m-%d-%H"),
nargs="?",
type=validate_date,
)
Expand All @@ -146,6 +168,13 @@ def load_data():
default=False,
action="store_true",
)
parser.add_argument(
"-i",
"--incremental",
help="Run in incremental load mode, only works with a production target",
default=False,
action="store_true",
)
parser.add_argument(
"-c",
"--check",
Expand All @@ -156,20 +185,7 @@ def load_data():
args = parser.parse_args()

if args.extract:
start_datetime = datetime.combine(args.start_date, datetime.min.time())
end_datetime = datetime.combine(args.end_date, datetime.min.time())

total_hours = int((end_datetime - start_datetime).total_seconds() / 3600)
progress_bar = tqdm(total=total_hours)

active_datetime = start_datetime

while active_datetime <= end_datetime:
download_data(active_datetime)
progress_bar.update(1)
active_datetime += timedelta(hours=1)

progress_bar.close()
extract_data(args.start_datetime, args.end_datetime)

if args.load:
if args.load and not args.incremental:
load_data()