forked from alvarop/dkbc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dkbc.py
167 lines (125 loc) · 5.21 KB
/
dkbc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import http.client
import yaml
import json
import requests
import re
import os
import time
import urllib
from pprint import pprint
DEFAULT_API_URL = "api.digikey.com"
class DKBC:
def __init__(self, api_url=DEFAULT_API_URL, cfg_path="config.yml", debug=False):
self.cfg = None
self.cfg_path = cfg_path
self.api_url = api_url
self.debug = debug
self.__update_config()
def __update_config(self):
if self.cfg is None:
if os.path.isfile(self.cfg_path):
with open(self.cfg_path, "r") as ymlfile:
self.cfg = yaml.safe_load(ymlfile)
else:
raise FileNotFoundError('Uh oh!! "{}" not found'.format(self.cfg_path))
if int(self.cfg["token-expiration"]) < time.time():
print("Token has expired, refreshing")
self.__refresh_token()
return self.cfg
def __refresh_token(self):
# https://api-portal.digikey.com/app_overview
post_request = {
"refresh_token": self.cfg["refresh-token"],
"client_id": self.cfg["client-id"],
"client_secret": self.cfg["client-secret"],
"grant_type": "refresh_token",
}
# code The authorization code returned from the initial request (See above example).
# client_id This is the client id assigned to the application that you generated within the API Portal.
# client_secret This is the client secret assigned to the application that you generated within the API Portal.
# redirect_uri This URI must match the redirect URI that you defined while creating your application within the API Portal.
# grant_type As defined in the OAuth 2.0 specification, this field must contain a value of authorization_code.
request_url = "https://" + self.api_url + "/v1/oauth2/token"
r = requests.post(request_url, data=post_request)
response = r.json()
if self.debug:
print("Making request to")
print(r.status_code)
print(response)
if r.status_code == 200:
with open("old_config.yml", "w") as outfile:
yaml.dump(self.cfg, outfile, default_flow_style=False)
self.cfg["refresh-token"] = response["refresh_token"]
self.cfg["access-token"] = response["access_token"]
self.cfg["token-expiration"] = int(time.time()) + int(
response["expires_in"]
)
with open(self.cfg_path, "w") as outfile:
yaml.dump(self.cfg, outfile, default_flow_style=False)
else:
print("ERROR")
def process_barcode(self, barcode):
barcode_1d_re = re.compile("^[0-9]+$")
if barcode_1d_re.match(barcode):
return self.process_1d_barcode(barcode)
else:
return self.process_2d_barcode(barcode)
def process_1d_barcode(self, barcode):
self.__update_config()
conn = http.client.HTTPSConnection(self.api_url)
headers = {
"x-DIGIKEY-client-id": self.cfg["client-id"],
"authorization": "Bearer " + self.cfg["access-token"],
"content-type": "application/json",
"accept": "application/json",
}
conn.request(
"GET",
"/Barcoding/v3/ProductBarcodes/" + urllib.parse.quote(barcode),
None,
headers,
)
res = conn.getresponse()
data = json.loads(res.read())
if "httpMessage" in data and data["httpMessage"] == "Unauthorized":
print("Unauthorized! Need to refresh token.")
return data
def process_2d_barcode(self, barcode):
self.__update_config()
conn = http.client.HTTPSConnection(self.api_url)
headers = {
"x-DIGIKEY-client-id": self.cfg["client-id"],
"authorization": "Bearer " + self.cfg["access-token"],
"content-type": "application/json",
"accept": "application/json",
}
conn.request(
"GET",
"/Barcoding/v3/Product2DBarcodes/" + urllib.parse.quote(barcode, safe=''),
None,
headers,
)
res = conn.getresponse()
data = json.loads(res.read())
if "httpMessage" in data and data["httpMessage"] == "Unauthorized":
print("Unauthorized! Need to refresh token.")
return data
def get_part_details(self, part_no):
self.__update_config()
conn = http.client.HTTPSConnection(self.api_url)
# TODO - escape part_no quotes
payload = '{"Keywords": "' + part_no + '","RecordCount": "50"}'
headers = {
"x-DIGIKEY-client-id": self.cfg["client-id"],
"authorization": "Bearer " + self.cfg["access-token"],
"content-type": "application/json",
"accept": "application/json",
}
conn.request(
"POST", "/Search/v3/Products/Keyword", payload.encode("utf-8"), headers
)
res = conn.getresponse()
data = json.loads(res.read().decode("utf-8"))
if "httpMessage" in data and data["httpMessage"] == "Unauthorized":
print("Unauthorized! Need to refresh token.")
return data