-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraw_predict.py
54 lines (45 loc) · 1.42 KB
/
raw_predict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import requests
import json
import boto3
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
import datetime
start_date = airflow.utils.dates.days_ago(2)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": datetime.timedelta (minutes=5),
}
def json_scraper(url, file_name, bucket):
response = requests.request("GET", url)
json_data = response.json()
with open(file_name, 'w', encoding='utf-8') as json_file:
json.dump(json_data, json_file, ensure_ascii=False, indent=4)
s3 = boto3.client('s3')
s3.upload_file(file_name, bucket, f"predictit/{file_name}")
with DAG(
"raw_predictit",
default_args=default_args,
description="",
schedule_interval=datetime.timedelta(days=1),
start_date=start_date,
catchup=False,
tags=["Niranjan-Rao"],
) as dag:
extract_predictit = PythonOperator(
task_id='extract_predictit',
python_callable=json_scraper,
op_kwargs={
'url':"https://www.predictit.org/api/marketdata/all/",
'file_name':'predicit_markets.json',
'bucket':"data-mbfr"},
dag=dag
)
ready = DummyOperator(task_id='ready')
extract_predictit >>> ready