-
Notifications
You must be signed in to change notification settings - Fork 0
/
helloworld.py
72 lines (58 loc) · 2.71 KB
/
helloworld.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import requests
from urllib3.exceptions import InsecureRequestWarning
import json
def helloworld():
url = "http://128.31.25.50/api/v1/namespaces/_/actions/helloPy?blocking=true&result=false"
payload = "{\"name\":\"World\"}"
headers = {
'Authorization': 'Basic MjNiYzQ2YjEtNzFmNi00ZWQ1LThjNTQtODE2YWE0ZjhjNTAyOjEyM3pPM3haQ0xyTU42djJCS0sxZFhZRnBYbFBrY2NPRnFtMTJDZEFzTWdSVTRWck5aOWx5R1ZDR3VNREdJd1A=',
'Content-Type': 'application/json'
}
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
response = requests.request("POST", url, headers=headers, data=payload, verify=False)
print(response.text.encode('utf8'))
return response.json()
# return response.text.encode('utf8')
def create(name,payload):
url = "https://128.31.25.50/api/v1/namespaces/_/actions/"+name+"?overwrite=false"
payload=format(name,payload)
headers = {
'Authorization': 'Basic MjNiYzQ2YjEtNzFmNi00ZWQ1LThjNTQtODE2YWE0ZjhjNTAyOjEyM3pPM3haQ0xyTU42djJCS0sxZFhZRnBYbFBrY2NPRnFtMTJDZEFzTWdSVTRWck5aOWx5R1ZDR3VNREdJd1A=',
'Content-Type': 'application/json',
}
response = requests.request("PUT", url, headers=headers, data=payload, verify=False)
print(response.text.encode('utf8'))
return True
def invoke(name,payload):
url = "http://128.31.25.50/api/v1/namespaces/_/actions/"+name+"?blocking=true&result=false"
payload = "{\"name\":\"World\"}"
headers = {
'Authorization': 'Basic MjNiYzQ2YjEtNzFmNi00ZWQ1LThjNTQtODE2YWE0ZjhjNTAyOjEyM3pPM3haQ0xyTU42djJCS0sxZFhZRnBYbFBrY2NPRnFtMTJDZEFzTWdSVTRWck5aOWx5R1ZDR3VNREdJd1A=',
'Content-Type': 'application/json'
}
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
response = requests.request("POST", url, headers=headers, data=payload, verify=False)
print(response.text.encode('utf8'))
return response.json()
def update(name,payload):
url = "https://128.31.25.50/api/v1/namespaces/_/actio0ns/"+name+"?overwrite=true"
payload = format(name, payload)
headers = {
'Authorization': 'Basic MjNiYzQ2YjEtNzFmNi00ZWQ1LThjNTQtODE2YWE0ZjhjNTAyOjEyM3pPM3haQ0xyTU42djJCS0sxZFhZRnBYbFBrY2NPRnFtMTJDZEFzTWdSVTRWck5aOWx5R1ZDR3VNREdJd1A=',
'Content-Type': 'application/json',
}
response = requests.request("PUT", url, headers=headers, data=payload, verify=False)
print(response.text.encode('utf8'))
return True
def format(name,code):
new_entry = {
"namespace": "_",
"name": name,
"exec": {
"kind": "python:default",
"code": code
}
}
return json.dumps(new_entry)
if __name__ == "__main__":
helloworld()