forked from stanfordnmbl/opencap-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
111 lines (91 loc) · 3.8 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import requests
import time
import json
import os
import shutil
from utilsServer import processTrial, runTestSession
import traceback
import logging
import glob
import numpy as np
from utilsAPI import getAPIURL, getWorkerType
from utilsAuth import getToken
from utils import getDataDirectory, checkTime, checkResourceUsage
logging.basicConfig(level=logging.INFO)
API_TOKEN = getToken()
API_URL = getAPIURL()
workerType = getWorkerType()
# if true, will delete entire data directory when finished with a trial
isDocker = True
# get start time
t = time.localtime()
initialStatusCheck = False
while True:
# Run test trial at a given frequency to check status of machine. Stop machine if fails.
if checkTime(t,minutesElapsed=30) or not initialStatusCheck:
runTestSession(isDocker=isDocker)
t = time.localtime()
initialStatusCheck = True
# workerType = 'calibration' -> just processes calibration and neutral
# workerType = 'all' -> processes all types of trials
# no query string -> defaults to 'all'
queue_path = "trials/dequeue/?workerType=" + workerType
try:
r = requests.get("{}{}".format(API_URL, queue_path),
headers = {"Authorization": "Token {}".format(API_TOKEN)})
except Exception as e:
traceback.print_exc()
time.sleep(15)
continue
if r.status_code == 404:
logging.info("...pulling " + workerType + " trials.")
time.sleep(1)
continue
if np.floor(r.status_code/100) == 5: # 5xx codes are server faults
logging.info("API unresponsive. Status code = {:.0f}.".format(r.status_code))
time.sleep(5)
continue
# Check resource usage
resourceUsage = checkResourceUsage()
logging.info(json.dumps(resourceUsage))
logging.info(r.text)
trial = r.json()
trial_url = "{}{}{}/".format(API_URL, "trials/", trial["id"])
logging.info(trial_url)
logging.info(trial)
if len(trial["videos"]) == 0:
error_msg = {}
error_msg['error_msg'] = 'No videos uploaded. Ensure phones are connected and you have stable internet connection.'
error_msg['error_msg_dev'] = 'No videos uploaded.'
r = requests.patch(trial_url, data={"status": "error", "meta": json.dumps(error_msg)},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
continue
if any([v["video"] is None for v in trial["videos"]]):
r = requests.patch(trial_url, data={"status": "error"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
continue
trial_type = "dynamic"
if trial["name"] == "calibration":
trial_type = "calibration"
if trial["name"] == "neutral":
trial["name"] = "static"
trial_type = "static"
logging.info("processTrial({},{},trial_type={})".format(trial["session"], trial["id"], trial_type))
try:
processTrial(trial["session"], trial["id"], trial_type=trial_type, isDocker=isDocker)
# note a result needs to be posted for the API to know we finished, but we are posting them
# automatically thru procesTrial now
r = requests.patch(trial_url, data={"status": "done"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
logging.info('0.5s pause if need to restart.')
time.sleep(0.5)
except:
r = requests.patch(trial_url, data={"status": "error"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
traceback.print_exc()
# Clean data directory
if isDocker:
folders = glob.glob(os.path.join(getDataDirectory(isDocker=True),'Data','*'))
for f in folders:
shutil.rmtree(f)
logging.info('deleting ' + f)