-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdlt_workshop.py
567 lines (411 loc) · 19.2 KB
/
dlt_workshop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# -*- coding: utf-8 -*-
"""dlt (data load tool) data engineering zoomcamp workshop
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1kLyD3AL-tYf_HqCXYnA3ZLwHGpzbLmoj
# **Install `dlt`⏳**
What is dlt?
* dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.
* You can install it using pip and there's no need to start any backends or containers. You can simply import dlt in your Python script and write a simple pipeline to load data from sources like APIs, databases, files, etc. into a destination of your choice.
Here are a few reasons why you should use dlt:
* Automated maintenance: With schema inference and evolution and alerts, and with short declarative code, maintenance becomes simple.
* Run it where Python runs: You can use dlt on Airflow, serverless functions, notebooks. It doesn't require external APIs, backends or containers, and scales on both micro and large infrastructures.
* User-friendly, declarative interface: dlt provides a user-friendly interface that removes knowledge obstacles for beginners while empowering senior professionals.
Benefits: As a data engineer, dlt offers several benefits:
* Efficient Data Extraction and Loading: dlt simplifies the process of extracting and loading data. It allows you to decorate your data-producing functions with loading or incremental extraction metadata, enabling dlt to extract and load data according to your custom logic. This is particularly useful when dealing with large datasets, as dlt supports scalability through iterators, chunking, and parallelization. Read more
* Automated Schema Management: dlt automatically infers a schema from data and loads the data to the destination. It can easily adapt and structure data as it evolves, reducing the time spent on maintenance and development. This ensures data consistency and quality. Read more
* Data Governance Support: dlt pipelines offer robust governance support through pipeline metadata utilization, schema enforcement and curation, and schema change alerts. This promotes data consistency, traceability, and control throughout the data processing lifecycle. Read more
* Flexibility and Scalability: dlt can be used on Airflow, serverless functions, notebooks, and scales on both micro and large infrastructures. It also offers several mechanisms and configuration options to scale up and fine-tune pipelines. Read more
* Post-Loading Transformations: dlt provides several options for transformations after loading the data, including using dbt, the dlt SQL client, or Pandas. This allows you to shape and manipulate the data before or after loading it, allowing you to meet specific requirements and ensure data quality and consistency. Read more
"""
# Commented out IPython magic to ensure Python compatibility.
# %%capture
# !pip install dlt[duckdb] # Install dlt with all the necessary DuckDB dependencies
"""# Part 1: Data Extraction
## Example 1: Extracting API data with a generator
Premise:
For this example, we created a simple http api that returns json "page by page", 1000 records per page.
It accepts a parameter called `page`, representing the page number.
If we request a larger page number than there is data, we get an empty response.
To get the pages, we write a loop that asks for pages starting from 1 and increasing, until we receive an empty page.
As we do not know ahead of time how many pages have data and if they fit in memory, yielding the data so it can be handled page by page scales better than first collecting all pages in memory and then returning them.
"""
import requests
BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
# I call this a paginated getter
# as it's a function that gets data
# and also paginates until there is no more data
# by yielding pages, we "microbatch", which speeds up downstream processing
def paginated_getter():
page_number = 1
while True:
# Set the query parameters
params = {'page': page_number}
# Make the GET request to the API
response = requests.get(BASE_API_URL, params=params)
response.raise_for_status() # Raise an HTTPError for bad responses
page_json = response.json()
print(f'got page number {page_number} with {len(page_json)} records')
# if the page has no records, stop iterating
if page_json:
yield page_json
page_number += 1
else:
# No more data, break the loop
break
if __name__ == '__main__':
# Use the generator to iterate over pages
for page_data in paginated_getter():
# Process each page as needed
print(page_data)
"""## Example 2: The "bad" way to download a file
In this example we download a json lines file.
Since the download is text but we want to work with iterable data strutures for loading, we convert the contents to list of jsons.
This is a less than ideal approach because if the file size is unknown, we run the risk of running out of memory. In the case of machines that run multiple jobs, an out of memory error runs the risk of killing not just the current jobs but also anything else running on the machine at the time - a situation most data engineers **really really** like to avoid.
"""
import requests
import json
def download_and_read_jsonl(url):
response = requests.get(url)
response.raise_for_status() # Raise an HTTPError for bad responses
data = response.text.splitlines()
parsed_data = [json.loads(line) for line in data]
return parsed_data
# time the download
import time
start = time.time()
url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
downloaded_data = download_and_read_jsonl(url)
if downloaded_data:
# Process or print the downloaded data as needed
print(downloaded_data[:5]) # Print the first 5 entries as an example
# time the download
end = time.time()
print(end - start)
"""## Example 3: Extracting file data with a generator "the best practice way"
"The best practice way" here refers to the most scalable way to do it, but if you are confident scale will not be an issue, then the right way might be the simplest :)
In this example we download a jsonl (like json, but lines) file.
Since it's jsonl, it has lines so we can process it line by line.
We stream download it and yield the data.
If this file were json and not jsonl, we could use ijson library to break it into lines without loading to memory.
"""
import requests
import json
url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
def stream_download_jsonl(url):
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an HTTPError for bad responses
for line in response.iter_lines():
if line:
yield json.loads(line)
# time the download
import time
start = time.time()
# Use the generator to iterate over rows with minimal memory usage
row_counter = 0
for row in stream_download_jsonl(url):
print(row)
row_counter += 1
if row_counter >= 5:
break
# time the download
end = time.time()
print(end - start)
"""### Loading the generator (any of the above)
We have 3 ways to download the same data. Let's use the fast and reliable way to load some data and inspect it in DuckDB.
In this example, we are using `dlt` library to do the loading, which will process data from the generators incrementally, following the same memory management paradigm.
We will discuss more details about `dlt` or "data load tool" later.
"""
import dlt
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(destination='duckdb', dataset_name='generators')
# we can load any generator to a table at the pipeline destnation as follows:
info = generators_pipeline.run(paginated_getter(),
table_name="http_download",
write_disposition="replace")
# the outcome metadata is returned by the load and we can inspect it by printing it.
print(info)
# we can load the next generator to the same or to a different table.
info = generators_pipeline.run(stream_download_jsonl(url),
table_name="stream_download",
write_disposition="replace")
print(info)
# show outcome
import duckdb
conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{generators_pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
# and the data
print("\n\n\n http_download table below:")
rides = conn.sql("SELECT * FROM http_download").df()
display(rides)
print("\n\n\n stream_download table below:")
passengers = conn.sql("SELECT * FROM stream_download").df()
display(passengers)
# As you can see, the same data was loaded in both cases.
"""# Part 2: Normalisation
## Load nested data with auto normalisation
When converting nested data to tabular formats, to keep fragmentations minimal:
* Nested dictionaries can be flattened into the parent row to
* Nested lists however need to be expressed as separate tables due to the different granularity (1:n relationship)
And of course, when going from JSON to DB, we want some things standardised:
* Data types such as timestamps should be detected correctly
* Column names should be converted to db-compatible names
* Unnested sub-tables should be linked to parent tables via auto generated keys
For this work, we will use `dlt` library, which is purpose-made to solve such tasks in a scalable way, for example by using generators.
### Introducing dlt
dlt is a python library created for the purpose of assisting data engineers to build simpler, faster and more robust pipelines with minimal effort.
dlt automates much of the tedious work a data engineer would do, and does it in a way that is robust.
dlt can handle things like:
- Schema: Inferring and evolving schema, alerting changes, using schemas as data contracts.
- Typing data, flattening structures, renaming columns to fit database standards.
- Processing a stream of events/rows without filling memory. This includes extraction from generators. In our example we will pass the “data” you can see above.
- Loading to a variety of dbs of file formats.
Read more about dlt [here](https://dlthub.com/docs/intro).
Now let’s use it to load our nested json to duckdb:
"""
import dlt
import duckdb
data = [
{
"vendor_name": "VTS",
"record_hash": "b00361a396177a9cb410ff61f20015ad",
"time": {
"pickup": "2009-06-14 23:23:00",
"dropoff": "2009-06-14 23:48:00"
},
"Trip_Distance": 17.52,
# nested dictionaries could be flattened
"coordinates": { # coordinates__start__lon
"start": {
"lon": -73.787442,
"lat": 40.641525
},
"end": {
"lon": -73.980072,
"lat": 40.742963
}
},
"Rate_Code": None,
"store_and_forward": None,
"Payment": {
"type": "Credit",
"amt": 20.5,
"surcharge": 0,
"mta_tax": None,
"tip": 9,
"tolls": 4.15,
"status": "booked"
},
"Passenger_Count": 2,
# nested lists need to be expressed as separate tables
"passengers": [
{"name": "John", "rating": 4.9},
{"name": "Jack", "rating": 3.9}
],
"Stops": [
{"lon": -73.6, "lat": 40.6},
{"lon": -73.5, "lat": 40.5}
]
},
# ... more data
]
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')
# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data
info = pipeline.run(data,
table_name="rides",
write_disposition="merge",
primary_key="record_hash")
print(info)
"""### Inspecting the nested structure, joining the child tables
Let's look at what happened during the load
- By looking at the loaded tables, we can see our json document got flattened and sub-documents got split into separate tables
- We can re-join those child tables to the parent table by using the generated keys `on parent_table._dlt_id = child_table._dlt_parent_id`.
- Data types: If you will pay attention to datatypes, you will note that the timestamps, which in json are of string type, are now of timestamp type in the db.
"""
# show the outcome
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)
print("\n\n\n Pasengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)
# to reflect the relationships between parent and child rows, let's join them
# of course this will have 4 rows due to the two 1:n joins
print("\n\n\n joined table")
joined = conn.sql("""
SELECT *
FROM rides as r
left join rides__passengers as rp
on r._dlt_id = rp._dlt_parent_id
left join rides__stops as rs
on r._dlt_id = rs._dlt_parent_id
""").df()
display(joined)
"""What are we looking at?
- Nested dicts got flattened into the parent row, the structure `{"coordinates":{"start": {"lat": ...}}}` became
`coordinates__start__lat`
- Nested lists got broken out into separate tables with generated columns that would allow us to join the data back when needed.
# Part 3: Incremental loading
## Update nested data
In this example the scores of the 2 passengers changed. Turns out their payment didn't go through for the ride before and they got a bad rating from the driver, so now we have to update their rating.
As you can see after running the code, their ratings are now lowered
"""
import dlt
import duckdb
data = [
{
"vendor_name": "VTS",
"record_hash": "b00361a396177a9cb410ff61f20015ad",
"time": {
"pickup": "2009-06-14 23:23:00",
"dropoff": "2009-06-14 23:48:00"
},
"Trip_Distance": 17.52,
"coordinates": {
"start": {
"lon": -73.787442,
"lat": 40.641525
},
"end": {
"lon": -73.980072,
"lat": 40.742963
}
},
"Rate_Code": None,
"store_and_forward": None,
"Payment": {
"type": "Credit",
"amt": 20.5,
"surcharge": 0,
"mta_tax": None,
"tip": 9,
"tolls": 4.15,
"status": "cancelled"
},
"Passenger_Count": 2,
"passengers": [
{"name": "John", "rating": 4.4},
{"name": "Jack", "rating": 3.6}
],
"Stops": [
{"lon": -73.6, "lat": 40.6},
{"lon": -73.5, "lat": 40.5}
]
},
]
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')
# run the pipeline with default settings, and capture the outcome
info = pipeline.run(data,
table_name="rides",
write_disposition="merge",
primary_key='record_hash')
# show the outcome
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)
print("\n\n\n Pasengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)
"""# Bonus snippets
## Load to parquet file
"""
# Commented out IPython magic to ensure Python compatibility.
# %%capture
# !pip install dlt[parquet] # Install dlt with all the necessary DuckDB dependencies
# !pip install parquet
# !mkdir .dlt
import os
import dlt
import parquet
import json
import glob
# Set the bucket_url. We can also use a local folder
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = 'file:///content/.dlt/my_folder'
url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
# Define your pipeline
pipeline = dlt.pipeline(
pipeline_name='my_pipeline',
destination='filesystem',
dataset_name='mydata'
)
# Run the pipeline with the generator we created earlier.
load_info = pipeline.run(stream_download_jsonl(url), table_name="users", loader_file_format="parquet")
print(load_info)
# Get a list of all Parquet files in the specified folder
parquet_files = glob.glob('/content/.dlt/my_folder/mydata/users/*.parquet')
# show parquet files
print("Loaded files: ")
for file in parquet_files:
print(file)
"""## Load to bigquery
To load to bigquery, we need credentials to bigquery.
- dlt looks for credentials in several places as described in the [credential docs.](https://dlthub.com/docs/general-usage/credentials/configuration)
- In the case of Bigquery you can read the docs [here](https://dlthub.com/docs/dlt-ecosystem/destinations/bigquery) for how to do it.
- If you are running from Colab or a GCP machine, or you are authenticated with the gcp CLI, you can use these already-available local credentials. We will use the Colab Oauth here.
"""
# Commented out IPython magic to ensure Python compatibility.
# %%capture
# !pip install dlt[bigquery]
# Authenticate to Google BigQuery
from google.colab import auth
auth.authenticate_user()
import os
import dlt
os.environ['GOOGLE_CLOUD_PROJECT'] = 'dlt-dev-external'
# Define your pipeline
pipeline = dlt.pipeline(
pipeline_name='my_pipeline',
destination='bigquery',
dataset_name='dtc'
)
# Run the pipeline
load_info = pipeline.run(stream_download_jsonl(url), table_name="users")
print(load_info)
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
query = """
SELECT *
FROM `dtc.users`
"""
query_job = client.query(query) # Make an API request.
print("The query data:")
for row in query_job:
# Row values can be accessed by field name or index.
print(row)
"""## Other demos
Find more demos in this repo, or look on our blog for multiple community demos
* https://github.com/dlt-hub/dlt_demos
* https://dlthub.com/docs/blog
## Docs Links
This course was tailored to enable all the cohort to complete it succesfully - so more complex things were left out. We strongly encourage you to keep learning on your own.
You will find more info about advanced capabilities of dlt here: https://dlthub.com/docs/build-a-pipeline-tutorial
Don't miss the GPT-4 docs helper button - it will help with simple questions.
If you get stuck, consider joining our community for help https://dlthub.com/community
"""