-
Notifications
You must be signed in to change notification settings - Fork 0
/
edgar_data_request.py
92 lines (83 loc) · 2.8 KB
/
edgar_data_request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Module providing a function making call eDataPole REST API."""
import http.client
import json
import urllib.parse
RSP_STATUS = "response_status"
RSP_REASON = "response_reason"
class edgar_data_request:
def __init__(self, host="edatapole.com", port=80):
self.host = host
self.port = port
def extract_edgar_forms_by_cik(
self, access_token, cik, months_back, forms, sections=None
):
"""This is a function to extract EDGAR SEC forms by specified CIK"""
try:
conn = http.client.HTTPConnection(self.host, self.port)
# pass access token with header
# headers = { 'authorization': "Bearer "+ access_token}
# Connect to the host
conn.connect()
query = {
"cik": cik,
"months_back": months_back,
"sections": sections,
"forms": forms,
"api_key": access_token,
}
params = "/api/messages/sec/edgar_extract?" + urllib.parse.urlencode(query)
print(params)
conn.request("GET", params)
response = conn.getresponse()
data = response.read()
# Close the connection to the server.
except Exception as e:
print(e)
error = {
RESPONSE_STATUS: 999,
RESPONSE_REASON: e,
}
return error
finally:
conn.close()
entity_list = json.loads(data)
return entity_list
def extract_edgar_forms_by_ticker(
self,
access_token,
ticker,
months_back,
forms,
sections=None,
):
"""This is a function to extract EDGAR SEC forms by specified ticker"""
try:
conn = http.client.HTTPConnection(self.host, self.port)
# pass access token with header
# headers = { 'authorization': "Bearer "+ access_token}
# Connect to the host
conn.connect()
query = {
"ticker": ticker,
"months_back": months_back,
"sections": sections,
"forms": forms,
"api_key": access_token,
}
params = "/api/messages/sec/edgar_extract?" + urllib.parse.urlencode(query)
print(params)
conn.request("GET", params) # , headers=headers)
response = conn.getresponse()
data = response.read()
# Close the connection to the server.
except Exception as e:
print(e)
error = {
RESPONSE_STATUS: 999,
RESPONSE_REASON: e,
}
return error
finally:
conn.close()
entity_list = json.loads(data)
return entity_list