-
Notifications
You must be signed in to change notification settings - Fork 11
/
one_day_to_parquet.py
139 lines (102 loc) · 4.13 KB
/
one_day_to_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import csv
import datetime
import logging
import tempfile
import duckdb
from dateutils import timedelta
from django.utils import timezone # noqa
from apps.greencheck.models import checks # noqa
from apps.greencheck.object_storage import object_storage_bucket # noqa
logger = logging.getLogger(__name__)
infra_bucket = object_storage_bucket("internal-infra")
def csv_of_checks_for_day(day: datetime.date, csv_path: str) -> bool:
"""
Run a query extracting the checks for the given day, and write the results
date stamped csv file .
If a directory is given, output the file into the given directory.
"""
logger.info(f"Starting export for {day}")
start_time = timezone.now()
one_day_forward = timedelta(days=1)
one_day_back = timedelta(days=-1)
res = (
checks.Greencheck.objects.filter(
date__gt=day + one_day_back, date__lt=day + one_day_forward
)
.values_list()
.iterator()
)
# return early if we have no results from our greencheck query.
if not res:
return False
logger.info(f"Writing query results to {csv_path}")
with open(csv_path, "w") as f:
list_writer = csv.writer(f)
for check in res:
list_writer.writerow(check)
logger.info(f"Finished export for {day}")
end_time = timezone.now()
logger.info(end_time)
time_span = end_time - start_time
logger.info(f"Took {time_span.seconds} seconds")
return True
def convert_csv_to_parquet(csv_path: str, parquet_path: str) -> bool:
"""
Load a CSV file at the path of `prefix/YYYY_MM_DD.csv`, and
create a compressed parquet file at the path
`prefix/YYYY_MM_DD.zstd.parquet`.
If a directory is given, look inside it for the csv, and output
the parquet file into it.
"""
start_time = timezone.now()
logger.info(f"Starting conversion to parquet for {csv_path}")
logger.info(start_time)
# set up our in-memory database with duckdb
con = duckdb.connect(database=":memory:")
# run our query to convert the day of checks to a compressed parquet file
con.execute(
(
f"COPY (SELECT * from '{csv_path}') "
f"TO '{parquet_path}' (FORMAT 'PARQUET', CODEC 'ZSTD')"
)
)
logger.info(f"Finished conversion to parquet for {csv_path}")
end_time = timezone.now()
logger.info(end_time)
time_span = end_time - start_time
logger.info(f"Took {time_span.seconds} seconds")
return True
def upload_to_object_storage(parquet_path: str, upload_path: str) -> bool:
"""
Upload parquet file at `"prefix/YYYY_MM_DD.zstd.parquet" to object storage
with the the key /parquet/days/TABLENAME_YYYY_MM_DD.zstd.parquet
If a directory is provided, look in the directory for the parquet file.
"""
start_time = timezone.now()
logger.info(f"Start uploading at {start_time}")
infra_bucket.upload_file(parquet_path, upload_path)
end_time = timezone.now()
time_span = end_time - start_time
logger.info(f"Finished uploading at {end_time}")
logger.info(f"Took {time_span.seconds} seconds")
def backup_day_to_parquet(target_date: datetime.date):
"""
Accept a target date to back up, and then extract all
the greenchecks for tha given day, and back up to object
storage as a compressed parquet file.
"""
# use a temporary directory, to clean up after ourselves
# and avoid clashes
with tempfile.TemporaryDirectory() as tmpdir:
date_string = target_date.strftime("%Y-%m-%d")
greencheck_table = "greencheck"
csv_path = f"{tmpdir}/{date_string}.local_file.csv"
parquet_path = f"{tmpdir}/{date_string}.zstd.parquet"
upload_path = f"parquet/days/{date_string}.{greencheck_table}.zstd.parquet"
csv_generated, parquet_created = None, None
# only proceed with following steps we get a truthy result from earlier steps
csv_generated = csv_of_checks_for_day(target_date, csv_path)
if csv_generated:
parquet_created = convert_csv_to_parquet(csv_path, parquet_path)
if parquet_created:
upload_to_object_storage(parquet_path, upload_path)