From 40b1a318a18352e8c7916cf85dfbb2a68f9a7f73 Mon Sep 17 00:00:00 2001 From: jared Date: Mon, 11 Feb 2019 15:39:41 -0600 Subject: [PATCH 1/2] Add vscode, secrets, and app_secret to gitignore --- .gitignore | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.gitignore b/.gitignore index 5c64cda..ee64e0a 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,12 @@ ENV/ client_secret.json config.py + +#VS Code +/.vscode + +#secrets +/.secrets + +#dev config +app_secret.config \ No newline at end of file From f0ef2d228f475fd27f10d2b6715ec8e961413d53 Mon Sep 17 00:00:00 2001 From: Hil Liao Date: Mon, 11 Feb 2019 21:29:34 -0800 Subject: [PATCH 2/2] refactored Google Cloud BigQuery dependent code to fit.py; the rest remains in main.py. Added Postman tests for both. --- app.yaml | 3 +- fit.py | 451 +++++++++++++++ fit_api.postman_collection.json | 970 +++++++++++++++++++------------- main.py | 441 +-------------- 4 files changed, 1033 insertions(+), 832 deletions(-) create mode 100644 fit.py diff --git a/app.yaml b/app.yaml index a89c2c8..cc4ff52 100644 --- a/app.yaml +++ b/app.yaml @@ -1,6 +1,7 @@ runtime: python env: flex -entrypoint: gunicorn --timeout 3600 --workers 4 --threads 12 -b :$PORT main:app +# --threads 48 fails with ERROR: (gcloud.app.deploy) Error Response: [13] An internal error occurred during deployment. You may need to delete this version manually. +entrypoint: gunicorn --timeout 3600 --workers 8 --threads 10 -b :$PORT fit:app runtime_config: python_version: 2 diff --git a/fit.py b/fit.py new file mode 100644 index 0000000..fe283c3 --- /dev/null +++ b/fit.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python +import json +from threading import Thread + +import googleapiclient.errors +import httplib2 +import pytz +from bottle import * +from google.cloud import datastore +from google.cloud import error_reporting +from google.cloud import storage +from googleapiclient.discovery import build +from oauth2client import client + +import backend + +# bottle web framework init +app = Bottle() +application = app + +# Google Cloud Stackdriver Debugger https://cloud.google.com/debugger/docs/setup/python +try: + import googleclouddebugger + + googleclouddebugger.enable() + print("Google Cloud Debugger enabled") +except ImportError as e: + print >> sys.stderr, "Failed to load Google Cloud Debugger for Python 2: ".format(e) + + +@app.get('/') +def default_get(): + redirect('/v1') + + +@app.get('/oauth2callback') +def oauth2callback(): + urlparts = request.urlparts + redirect_uri = "{}://{}{}".format(urlparts.scheme, urlparts.netloc, urlparts.path) + timezone = request.query.get('state', None) + + flow = client.flow_from_clientsecrets( + backend.client_secret_file, + scope=["profile", "email", 'https://www.googleapis.com/auth/fitness.activity.read', + 'https://www.googleapis.com/auth/fitness.body.read'], + redirect_uri=redirect_uri) + flow.params['access_type'] = 'offline' + flow.params['prompt'] = 'consent' + creds = flow.step2_exchange(code=request.query.code) + http_auth = creds.authorize(httplib2.Http()) + user_info_service = build('oauth2', 'v2', http=http_auth) + get_user_task = user_info_service.userinfo().get() + ds = datastore.Client() + u = get_user_task.execute() + + # insert to Cloud Datastore + entity = datastore.Entity(key=ds.key(backend.DATASTORE_KIND, u['email'])) + now = datetime.utcnow() + entity.update({ + 'refresh_token': creds.refresh_token, + 'google_id': u['id'], + 'gender': u.get('gender'), + 'picture': u['picture'], + 'timezone': unicode(timezone), + 'last_updated': now + }) + ds.put(entity) + response.content_type = 'application/json' + + # required to serialize entity + entity['last_updated'] = now.strftime('%Y-%m-%d %H:%M:%S %Z') + return json.dumps(entity.items()) + + +@app.post('/v1/users//steps') +def insert_steps(username): + error = check_headers_apikey() + if error: + return error + steps = get_steps(username) + if isinstance(steps, HTTPError) or isinstance(steps, HTTPResponse): + return steps + + insert_result = { + 'inserted_count': backend.insert_steps(username, steps), + 'steps': steps + } + + response.content_type = 'application/json' + return insert_result + + +@app.get('/v1/users//steps') +def get_steps(username): + error = check_headers_apikey() + if error: + return error + http_auth, timezone = get_google_http_auth_n_user_timezone(username) + end_time_millis, start_date, error = extract_header_dates() + + if error: + if isinstance(error, HTTPError): + return error + else: + return HTTPResponse({ + 'code': httplib.BAD_REQUEST, + 'error': str(error)}, httplib.BAD_REQUEST) + else: + try: + # end_time_millis in headers data is optional + if end_time_millis is None: + end_time_millis = backend.current_milli_time() + + steps = backend.get_daily_steps(http_auth, start_date['year'], start_date['month'], start_date['day'], + end_time_millis, local_timezone=timezone) + response.content_type = 'application/json' + return steps + except client.HttpAccessTokenRefreshError as err: + return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) + except googleapiclient.errors.HttpError as err: + return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) + + +@app.get('/v1') +def main(): + return static_file("post.html", ".") + + +@app.post('/v1/auth') +def google_auth(): + parts = request.urlparts + redirect_uri = "{}://{}/oauth2callback".format(parts.scheme, parts.netloc) + + flow = client.flow_from_clientsecrets( + backend.client_secret_file, + scope=["profile", "email", 'https://www.googleapis.com/auth/fitness.activity.read', + 'https://www.googleapis.com/auth/fitness.body.read'], + redirect_uri=redirect_uri) + flow.params['access_type'] = 'offline' + flow.params['prompt'] = 'consent' + error = check_forms_apikey() + if error: + return error + timezone = request.forms['timezone'] + auth_uri = flow.step1_get_authorize_url(state=timezone) + redirect(auth_uri) + + +def check_headers_apikey(): + if 'apikey' not in request.headers or request.headers['apikey'] != backend.API_key: + return HTTPError(httplib.UNAUTHORIZED, "invalid API key in {}".format("request.headers['apikey']")) + + +def check_forms_apikey(): + if 'apikey' not in request.forms or request.forms['apikey'] != backend.API_key: + return HTTPError(httplib.UNAUTHORIZED, "invalid API key in {}".format("request.forms['apikey']")) + + +@app.post('/v1/users//activities') +def insert_user_activities(username): + error = check_headers_apikey() + if error: + return error + activities = get_user_activities(username) + if isinstance(activities, HTTPError) or isinstance(activities, HTTPResponse): + return activities + + insert_result = { + 'inserted_count': backend.insert_activities(username, activities), + 'activities': activities + } + + response.content_type = 'application/json' + return insert_result + + +@app.get('/v1/users//activities') +def get_user_activities(username): + error = check_headers_apikey() + if error: + return error + http_auth, timezone = get_google_http_auth_n_user_timezone(username) + end_time_millis, start_date, error = extract_header_dates() + + if error: + if isinstance(error, HTTPError): + return error + else: + return HTTPResponse({ + 'code': httplib.BAD_REQUEST, + 'error': str(error)}, httplib.BAD_REQUEST) + else: + try: + # end_time_millis in headers data is optional + if end_time_millis is None: + end_time_millis = backend.current_milli_time() + + activities = backend.get_daily_activities(http_auth, start_date['year'], start_date['month'], + start_date['day'], end_time_millis, local_timezone=timezone) + response.content_type = 'application/json' + return activities + except client.HttpAccessTokenRefreshError as err: + return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) + except googleapiclient.errors.HttpError as err: + return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) + + +def extract_header_dates(): + """ + Extract headers of start_year, start_month, start_day, and end_time_millis + where the start_* are local date and end_time_millis is the Unix Epoch time in milliseconds + :return: end time in Unix Epoch time in milliseconds, start date dictionary, error if any + """ + # parse headers data in request body + start_date = {'year': request.headers.get('start_year', None), 'month': request.headers.get('start_month', None), + 'day': request.headers.get('start_day', None)} + end_time_millis = request.headers.get('end_time_millis', None) + if end_time_millis is not None: + try: + end_time_millis = int(end_time_millis) + except ValueError as e: + return None, None, HTTPError(httplib.BAD_REQUEST, + 'Failed to convert end_time_millis in request.headers to int: ' + str(e)) + if start_date['year'] is None or start_date['month'] is None or start_date['day'] is None: + return None, None, HTTPError(httplib.BAD_REQUEST, "headers did not contain start_year, start_month, start_day") + else: + start_date['year'] = int(start_date['year']) + start_date['month'] = int(start_date['month']) + start_date['day'] = int(start_date['day']) + + return end_time_millis, start_date, None + + +@app.post('/v1/users//heart') +def insert_heart_rate(username): + error = check_headers_apikey() + if error: + return error + http_auth, timezone = get_google_http_auth_n_user_timezone(username) + end_time_millis, start_date, error = extract_header_dates() + + if error: + if isinstance(error, HTTPError): + return error + else: + return HTTPResponse({ + 'code': httplib.BAD_REQUEST, + 'error': str(error)}, httplib.BAD_REQUEST) + else: + try: + # end_time_millis in form data is optional + if end_time_millis is None: + end_time_millis = backend.current_milli_time() + + result = backend.get_and_insert_heart_rate(http_auth, username, start_date['year'], start_date['month'], + start_date['day'], end_time_millis, local_timezone=timezone) + response.content_type = 'application/json' + return result + except client.HttpAccessTokenRefreshError as err: + return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) + except googleapiclient.errors.HttpError as err: + return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) + + +def get_google_http_auth_n_user_timezone(username): + with open(backend.client_secret_file) as f: + client_secret_json = json.load(f) + client_id = client_secret_json['web']['client_id'] + client_secret = client_secret_json['web']['client_secret'] + ds = datastore.Client() + key = ds.key('credentials', username) + user = ds.get(key) + assert user.key.id_or_name == username + refresh_token = user['refresh_token'] + timezone = user['timezone'] + creds = client.GoogleCredentials(None, client_id, client_secret, refresh_token, None, + "https://accounts.google.com/o/oauth2/token", "Python") + http_auth = creds.authorize(httplib2.Http()) + return http_auth, timezone + + +@app.post('/v1/insert_daily_fitness') +def insert_daily_fitness_data_ondemand(): + """ + The query string needs to contain a list of users in the form of ?users=hil@gmail.com,estes@gmail.com,paes@gmail.com + :return: + """ + users_param = 'users' + if users_param not in request.query: + return HTTPError(httplib.BAD_REQUEST, + '{} does not exist in query string parameters; specify ?{}=user1@gmail.com,user2@company.com'.format( + users_param)) + usernames = request.query[users_param].split(',') + + return insert_daily_fitness_data_impl(usernames) + + +@app.get('/v1/insert_daily_fitness') +def insert_daily_fitness_data(): + """ + callable only from App Engine cron jobs + :return: + """ + # validating request is from App Engine cron jobs + app_engine_cron_header = 'X-Appengine-Cron' + if app_engine_cron_header not in request.headers: + return HTTPError(httplib.UNAUTHORIZED, + 'Endpoint can only be invoked from Google App Engine cron jobs per https://cloud.google.com/appengine/docs/flexible/python/scheduling-jobs-with-cron-yaml') + + ds = datastore.Client() + query = ds.query(kind=backend.DATASTORE_KIND) + query.keys_only() + usernames = list(query.fetch()) + usernames = [u.key.id_or_name for u in usernames] + + return insert_daily_fitness_data_impl(usernames) + + +def insert_daily_fitness_data_impl(usernames, bucket_name=backend.DEFAULT_BUCKET): + """ + Call Google Fitness API for users in the Cloud Datastore credentials kind, save the responses in Cloud Storage, + insert the fitness data to Cloud BigQuery. + key is retry[username][category]['countdown'] + if value >= 0, retry down to value -1 or set value to -2 for non-recoverable errors + if value is None, op has succeeded + :param usernames: a list of usernames to call Google Fitness API with + :param bucket_name: save responses from Google Fitness API to a Google Cloud Storage bucket + :return: The results of getting from Google Fitness API and inserting to Cloud BigQuery + """ + retry = {} + threads = [] + + for username in usernames: + t = Thread(target=insert_daily_fitness_data_thread, args=(bucket_name, retry, username)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + is_error = False + response.content_type = 'application/json' + for username, category in retry.iteritems(): + for cat, cat_result in category.iteritems(): + if 'error' in cat_result: + is_error = True + break + if is_error: + return HTTPResponse(retry, httplib.INTERNAL_SERVER_ERROR) + else: + return retry + + +def insert_daily_fitness_data_thread(bucket_name, retry, username): + error_reporting_client = error_reporting.Client() + http_context = error_reporting.HTTPContext(method='GET', url='/v1/insert_daily_fitness', + user_agent='cron job for user {}'.format(username)) + storage_client = storage.Client() + bucket = storage_client.get_bucket(bucket_name) + http_auth, timezone = get_google_http_auth_n_user_timezone(username) + # get today's local date - 1 day + yesterday_local = datetime.now(pytz.timezone(timezone)) - timedelta(days=1) + yesterday_local_str = yesterday_local.strftime(backend.DATE_FORMAT) + df = backend.UserDataFlow(username, http_auth, yesterday_local.year, + yesterday_local.month, + yesterday_local.day, backend.current_milli_time(), timezone) + retry[username] = {} + categories = {'heartrate', 'activities', 'steps'} + for category in categories: + retry[username][category] = {} + # countdown is the number of retries + retry[username][category]['countdown'] = 1 + gs_path_get = '{}/{}/{}.json'.format(username, yesterday_local_str, category) + gs_path_insert = '{}/{}/{}_inserted_count.json'.format(username, yesterday_local_str, category) + get_result = None + insert_result = None + + # start of the retry logic + while retry[username][category]['countdown'] >= 0: + try: + if category == 'heartrate': + # get and insert heart rate data + insert_result = df.get_and_post_heart_rate() + get_result = insert_result['heart_datasets'] + elif category == 'activities': + # get and insert activities data + get_result = df.get_activities() + insert_result = df.post_activities() + elif category == 'steps': + # get and insert step counts + get_result = df.get_steps() + insert_result = df.post_steps() + # set to None upon success of getting API data and inserting to BigQuery + retry[username][category]['countdown'] = None + except client.HttpAccessTokenRefreshError as err: + http_context.responseStatusCode = httplib.UNAUTHORIZED + user_token_err = '{} has invalid refresh token'.format(username) + error_reporting_client.report_exception(http_context=http_context, + user=user_token_err) + retry[username][category]['error'] = "{}: {}".format(user_token_err, err) + # can't recover; abandon retry + retry[username][category]['countdown'] = -2 + except googleapiclient.errors.HttpError as err: + http_context.responseStatusCode = err.resp.status + error_reporting_client.report_exception(http_context=http_context, + user='Google API HttpError for user {}'.format(username)) + retry[username][category]['error'] = str(err) + if err.resp.status in ( + httplib.BAD_REQUEST, httplib.UNAUTHORIZED, httplib.NOT_FOUND, httplib.FORBIDDEN): + # can't recover; abandon retry + retry[username][category]['countdown'] = -2 + except Exception as err: + # https://googleapis.github.io/google-cloud-python/latest/error-reporting/usage.html + error_reporting_client.report_exception(http_context=http_context, + user='get and insert {} data for {} failed'.format(category, + username)) + retry[username][category]['error'] = str(err) + + # if retry for user on category isn't None, recoverable failure happened, decrement the retry count + if retry[username][category]['countdown'] is not None: + retry[username][category]['countdown'] -= 1 + else: + # exiting while loop because None >= 0 is False + pass + + # per category, putting the get, insert results on Cloud Storage upon success + if retry[username][category]['countdown'] is None: + retry[username][category]['gs://'] = [] + blob_get_result = bucket.blob(gs_path_get) + blob_get_result.upload_from_string(json.dumps(get_result)) + retry[username][category]['gs://'].append("{}/{}".format(bucket_name, gs_path_get)) + blob_insert_result = bucket.blob(gs_path_insert) + blob_insert_result.upload_from_string(json.dumps(insert_result)) + retry[username][category]['gs://'].append("{}/{}".format(bucket_name, gs_path_insert)) + + retry[username][category].pop('countdown') + + +port = int(os.environ.get('PORT', 8080)) +prefix = os.environ.get('PREFIX', None) +if prefix: + app.mount(prefix=prefix, app=app) + +if __name__ == "__main__": + try: + try: + app.run(host='0.0.0.0', port=port, debug=True, server='gunicorn', workers=2, timeout=1200) + except ImportError: + app.run(host='0.0.0.0', port=port, debug=True) + except Exception as e: + print >> sys.stderr, "error: {}".format(e) diff --git a/fit_api.postman_collection.json b/fit_api.postman_collection.json index 6157494..1f3aec0 100644 --- a/fit_api.postman_collection.json +++ b/fit_api.postman_collection.json @@ -6,417 +6,605 @@ }, "item": [ { - "name": "heart rate", - "request": { - "method": "POST", - "header": [ - { - "key": "apikey", - "value": "{{apikey}}", - "type": "text" - }, - { - "key": "start_year", - "value": "2019", - "type": "text" - }, - { - "key": "start_month", - "value": "2", - "type": "text" - }, - { - "key": "start_day", - "value": "7", - "type": "text" - }, - { - "key": "end_time_millis", - "value": "1549605600000", - "type": "text" - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "apikey", - "value": "secret", - "type": "text", - "disabled": true + "name": "MySQL dependent", + "item": [ + { + "name": "get sum of activity minutes", + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text", + "disabled": true + }, + { + "key": "start_year", + "value": "2019", + "type": "text", + "disabled": true + }, + { + "key": "start_month", + "value": "2", + "type": "text", + "disabled": true + }, + { + "key": "start_day", + "value": "5", + "type": "text", + "disabled": true + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" }, - { - "key": "start_year", - "value": "2019", - "type": "text", - "disabled": true - }, - { - "key": "start_month", - "value": "1", - "type": "text", - "disabled": true - }, - { - "key": "start_day", - "value": "24", - "type": "text", - "disabled": true - }, - { - "key": "end_time_millis", - "value": "1548468932000", - "type": "text", - "disabled": true + "url": { + "raw": "{{host_url}}/activity_for_user/{{hil_gmail}}?key=secret", + "host": [ + "{{host_url}}" + ], + "path": [ + "activity_for_user", + "{{hil_gmail}}" + ], + "query": [ + { + "key": "key", + "value": "secret" + } + ] } - ] - }, - "url": { - "raw": "{{host_url}}/v1/users/{{hil_gmail}}/heart", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "users", - "{{hil_gmail}}", - "heart" - ] - }, - "description": "call Google Fitness API for user's heart rate bmp numbers and \n insert them to a BigQuery table except existing_rows of recordedTimeNanos" - }, - "response": [] - }, - { - "name": "get steps", - "protocolProfileBehavior": { - "disableBodyPruning": true - }, - "request": { - "method": "GET", - "header": [ - { - "key": "apikey", - "value": "{{apikey}}", - "type": "text" - }, - { - "key": "start_year", - "value": "2019", - "type": "text" }, - { - "key": "start_month", - "value": "2", - "type": "text" - }, - { - "key": "start_day", - "value": "7", - "type": "text" - }, - { - "key": "end_time_millis", - "value": "1549907482000", - "type": "text", - "disabled": true - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "apikey", - "value": "secret", - "type": "text", - "disabled": true - } - ] - }, - "url": { - "raw": "{{host_url}}/v1/users/{{jared_email}}/steps", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "users", - "{{jared_email}}", - "steps" - ] + "response": [] }, - "description": "call Google fitness API to get user's steps" - }, - "response": [] - }, - { - "name": "activities fitness API->BigQuery", - "request": { - "method": "POST", - "header": [ - { - "key": "apikey", - "value": "{{apikey}}", - "type": "text" - }, - { - "key": "start_year", - "value": "2019", - "type": "text" - }, - { - "key": "start_month", - "value": "1", - "type": "text" - }, - { - "key": "start_day", - "value": "19", - "type": "text" - }, - { - "key": "end_time_millis", - "value": "1547971200000", - "type": "text" - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "", - "value": "secret", - "type": "text", - "disabled": true + { + "name": "get all users", + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text", + "disabled": true + }, + { + "key": "start_year", + "value": "2019", + "type": "text", + "disabled": true + }, + { + "key": "start_month", + "value": "2", + "type": "text", + "disabled": true + }, + { + "key": "start_day", + "value": "5", + "type": "text", + "disabled": true + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" + }, + "url": { + "raw": "{{host_url}}/users?key=secret", + "host": [ + "{{host_url}}" + ], + "path": [ + "users" + ], + "query": [ + { + "key": "key", + "value": "secret" + } + ] } - ] - }, - "url": { - "raw": "{{host_url}}/v1/users/{{hil_gmail}}/activities", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "users", - "{{hil_gmail}}", - "activities" - ] - }, - "description": "call Google Fitness API for user's activities and insert them to a BigQuery table except local datetime today's activities.\nReason: updating or delete today's activities will cause streaming buffer error in BigQuery" - }, - "response": [] - }, - { - "name": "activities from fitness API", - "request": { - "method": "GET", - "header": [ - { - "key": "apikey", - "value": "{{apikey}}", - "type": "text" - }, - { - "key": "start_year", - "value": "2019", - "type": "text" }, - { - "key": "start_month", - "value": "2", - "type": "text" - }, - { - "key": "start_day", - "value": "7", - "type": "text" - }, - { - "key": "end_time_millis", - "value": "1548489599000", - "type": "text", - "disabled": true - } - ], - "body": { - "mode": "raw", - "raw": "" + "response": [] }, - "url": { - "raw": "{{host_url}}/v1/users/{{hil_gmail}}/activities", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "users", - "{{hil_gmail}}", - "activities" - ] + { + "name": "last day steps", + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text", + "disabled": true + }, + { + "key": "start_year", + "value": "2019", + "type": "text", + "disabled": true + }, + { + "key": "start_month", + "value": "2", + "type": "text", + "disabled": true + }, + { + "key": "start_day", + "value": "5", + "type": "text", + "disabled": true + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" + }, + "url": { + "raw": "{{host_url}}/steps_for_user/last_day/{{hil_gmail}}?key=secret", + "host": [ + "{{host_url}}" + ], + "path": [ + "steps_for_user", + "last_day", + "{{hil_gmail}}" + ], + "query": [ + { + "key": "key", + "value": "secret" + } + ] + } + }, + "response": [] }, - "description": "call Google fitness API to get user's activities" - }, - "response": [] + { + "name": "step leaderboard", + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text", + "disabled": true + }, + { + "key": "start_year", + "value": "2019", + "type": "text", + "disabled": true + }, + { + "key": "start_month", + "value": "2", + "type": "text", + "disabled": true + }, + { + "key": "start_day", + "value": "5", + "type": "text", + "disabled": true + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" + }, + "url": { + "raw": "{{host_url}}/step_leaderboard?key=secret", + "host": [ + "{{host_url}}" + ], + "path": [ + "step_leaderboard" + ], + "query": [ + { + "key": "key", + "value": "secret" + } + ] + } + }, + "response": [] + } + ], + "description": "prior Github project's HTTP rest methods which depend on MySQL" }, { - "name": "insert steps", - "request": { - "method": "POST", - "header": [ - { - "key": "apikey", - "value": "secret", - "type": "text" - }, - { - "key": "start_year", - "value": "2019", - "type": "text" + "name": "Google Cloud BigQuery dependent", + "item": [ + { + "name": "heart rate", + "request": { + "method": "POST", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text" + }, + { + "key": "start_year", + "value": "2019", + "type": "text" + }, + { + "key": "start_month", + "value": "2", + "type": "text" + }, + { + "key": "start_day", + "value": "7", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1549605600000", + "type": "text" + } + ], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "apikey", + "value": "secret", + "type": "text", + "disabled": true + }, + { + "key": "start_year", + "value": "2019", + "type": "text", + "disabled": true + }, + { + "key": "start_month", + "value": "1", + "type": "text", + "disabled": true + }, + { + "key": "start_day", + "value": "24", + "type": "text", + "disabled": true + }, + { + "key": "end_time_millis", + "value": "1548468932000", + "type": "text", + "disabled": true + } + ] + }, + "url": { + "raw": "{{host_url}}/v1/users/{{hil_gmail}}/heart", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "users", + "{{hil_gmail}}", + "heart" + ] + }, + "description": "call Google Fitness API for user's heart rate bmp numbers and \n insert them to a BigQuery table except existing_rows of recordedTimeNanos" }, - { - "key": "start_month", - "value": "2", - "type": "text" + "response": [] + }, + { + "name": "get steps", + "protocolProfileBehavior": { + "disableBodyPruning": true }, - { - "key": "start_day", - "value": "4", - "type": "text" + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text" + }, + { + "key": "start_year", + "value": "2019", + "type": "text" + }, + { + "key": "start_month", + "value": "2", + "type": "text" + }, + { + "key": "start_day", + "value": "7", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1549907482000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "apikey", + "value": "secret", + "type": "text", + "disabled": true + } + ] + }, + "url": { + "raw": "{{host_url}}/v1/users/{{hil_gmail}}/steps", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "users", + "{{hil_gmail}}", + "steps" + ] + }, + "description": "call Google fitness API to get user's steps" }, - { - "key": "end_time_millis", - "value": "1547971200000", - "type": "text", - "disabled": true - } - ], - "body": { - "mode": "formdata", - "formdata": [ - { - "key": "apikey", - "value": "secret", - "type": "text", - "disabled": true - } - ] + "response": [] }, - "url": { - "raw": "{{host_url}}/v1/users/{{hil_gmail}}/steps", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "users", - "{{hil_gmail}}", - "steps" - ] - }, - "description": "call Google fitness API to get user's steps and insert step counts to BigQuery except local datetime today's steps.\nReason: updating or delete today's steps will cause streaming buffer error in BigQuery" - }, - "response": [] - }, - { - "name": "Cron job endpoint", - "request": { - "method": "POST", - "header": [ - { - "key": "X-Appengine-Cron", - "value": "true", - "type": "text" + { + "name": "activities fitness API->BigQuery", + "request": { + "method": "POST", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text" + }, + { + "key": "start_year", + "value": "2019", + "type": "text" + }, + { + "key": "start_month", + "value": "1", + "type": "text" + }, + { + "key": "start_day", + "value": "19", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1547971200000", + "type": "text" + } + ], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "", + "value": "secret", + "type": "text", + "disabled": true + } + ] + }, + "url": { + "raw": "{{host_url}}/v1/users/{{hil_gmail}}/activities", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "users", + "{{hil_gmail}}", + "activities" + ] + }, + "description": "call Google Fitness API for user's activities and insert them to a BigQuery table except local datetime today's activities.\nReason: updating or delete today's activities will cause streaming buffer error in BigQuery" }, - { - "key": "end_time_millis", - "value": "1548489599000", - "type": "text", - "disabled": true - } - ], - "body": { - "mode": "raw", - "raw": "" - }, - "url": { - "raw": "{{host_url}}/v1/insert_daily_fitness?{{test_users}}", - "host": [ - "{{host_url}}" - ], - "path": [ - "v1", - "insert_daily_fitness" - ], - "query": [ - { - "key": "{{test_users}}", - "value": null - } - ] + "response": [] }, - "description": "Call the on demand endpoint to similate cron job runninng. Cron job does not need query string. The query string of users is for debugging." - }, - "response": [] - }, - { - "name": "get user's single activity from MySQL", - "request": { - "method": "GET", - "header": [ - { - "key": "apikey", - "value": "{{apikey}}", - "type": "text", - "disabled": true - }, - { - "key": "start_year", - "value": "2019", - "type": "text", - "disabled": true - }, - { - "key": "start_month", - "value": "2", - "type": "text", - "disabled": true + { + "name": "activities from fitness API", + "request": { + "method": "GET", + "header": [ + { + "key": "apikey", + "value": "{{apikey}}", + "type": "text" + }, + { + "key": "start_year", + "value": "2019", + "type": "text" + }, + { + "key": "start_month", + "value": "2", + "type": "text" + }, + { + "key": "start_day", + "value": "7", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" + }, + "url": { + "raw": "{{host_url}}/v1/users/{{hil_gmail}}/activities", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "users", + "{{hil_gmail}}", + "activities" + ] + }, + "description": "call Google fitness API to get user's activities" }, - { - "key": "start_day", - "value": "5", - "type": "text", - "disabled": true + "response": [] + }, + { + "name": "insert steps", + "request": { + "method": "POST", + "header": [ + { + "key": "apikey", + "value": "secret", + "type": "text" + }, + { + "key": "start_year", + "value": "2019", + "type": "text" + }, + { + "key": "start_month", + "value": "2", + "type": "text" + }, + { + "key": "start_day", + "value": "4", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1547971200000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "apikey", + "value": "secret", + "type": "text", + "disabled": true + } + ] + }, + "url": { + "raw": "{{host_url}}/v1/users/{{hil_gmail}}/steps", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "users", + "{{hil_gmail}}", + "steps" + ] + }, + "description": "call Google fitness API to get user's steps and insert step counts to BigQuery except local datetime today's steps.\nReason: updating or delete today's steps will cause streaming buffer error in BigQuery" }, - { - "key": "end_time_millis", - "value": "1548489599000", - "type": "text", - "disabled": true - } - ], - "body": { - "mode": "raw", - "raw": "" + "response": [] }, - "url": { - "raw": "{{host_url}}/activity_for_user_details/{{hil_gmail}}?key=secret", - "host": [ - "{{host_url}}" - ], - "path": [ - "activity_for_user_details", - "{{hil_gmail}}" - ], - "query": [ - { - "key": "key", - "value": "secret" - } - ] + { + "name": "Cron job endpoint", + "request": { + "method": "POST", + "header": [ + { + "key": "X-Appengine-Cron", + "value": "true", + "type": "text" + }, + { + "key": "end_time_millis", + "value": "1548489599000", + "type": "text", + "disabled": true + } + ], + "body": { + "mode": "raw", + "raw": "" + }, + "url": { + "raw": "{{host_url}}/v1/insert_daily_fitness?{{test_users}}", + "host": [ + "{{host_url}}" + ], + "path": [ + "v1", + "insert_daily_fitness" + ], + "query": [ + { + "key": "{{test_users}}", + "value": null + } + ] + }, + "description": "Call the on demand endpoint to similate cron job runninng. Cron job does not need query string. The query string of users is for debugging." + }, + "response": [] } - }, - "response": [] + ], + "description": "new /v1 methods that depend on Google Cloud BigQuery" } ] } \ No newline at end of file diff --git a/main.py b/main.py index 12483f0..2b102dc 100644 --- a/main.py +++ b/main.py @@ -3,14 +3,8 @@ from collections import OrderedDict import bottle_mysql -import googleapiclient.errors import httplib2 -import pytz -from threading import Thread from bottle import * -from google.cloud import datastore -from google.cloud import error_reporting -from google.cloud import storage from googleapiclient.discovery import build from oauth2client import client @@ -30,15 +24,6 @@ dbname=backend.config.get('database_config', 'dbname')) app.install(plugin) -# Google Cloud Stackdriver Debugger https://cloud.google.com/debugger/docs/setup/python -try: - import googleclouddebugger - - googleclouddebugger.enable() - print("Google Cloud Debugger enabled") -except ImportError as e: - print >> sys.stderr, "Failed to load Google Cloud Debugger for Python 2: ".format(e) - def require_key(): key = request.query.get('key', '') @@ -87,134 +72,6 @@ def default_get(db): return json.dumps(dict(steps), sort_keys=True, indent=4) -@app.get('/oauth2callback') -def oauth2callback(db): - urlparts = request.urlparts - redirect_uri = "{}://{}{}".format(urlparts.scheme, urlparts.netloc, urlparts.path) - timezone = request.query.get('state', None) - - flow = client.flow_from_clientsecrets( - backend.client_secret_file, - scope=["profile", "email", 'https://www.googleapis.com/auth/fitness.activity.read', - 'https://www.googleapis.com/auth/fitness.body.read'], - redirect_uri=redirect_uri) - flow.params['access_type'] = 'offline' - flow.params['prompt'] = 'consent' - creds = flow.step2_exchange(code=request.query.code) - http_auth = creds.authorize(httplib2.Http()) - user_info_service = build('oauth2', 'v2', http=http_auth) - get_user_task = user_info_service.userinfo().get() - ds = datastore.Client() - u = get_user_task.execute() - - # insert to cloud SQL - db.execute( - "REPLACE INTO google_fit SET username=%s, google_id=%s, full_name=%s, gender=%s, image_url=%s, email=%s, refresh_token=%s", - (u['email'], u['id'], u['name'], u.get('gender'), u['picture'], u['email'], creds.refresh_token)) - - # insert to Cloud Datastore - entity = datastore.Entity(key=ds.key(backend.DATASTORE_KIND, u['email'])) - now = datetime.utcnow() - entity.update({ - 'refresh_token': creds.refresh_token, - 'google_id': u['id'], - 'gender': u.get('gender'), - 'picture': u['picture'], - 'timezone': unicode(timezone), - 'last_updated': now - }) - ds.put(entity) - response.content_type = 'application/json' - - # required to serialize entity - entity['last_updated'] = now.strftime('%Y-%m-%d %H:%M:%S %Z') - return json.dumps(entity.items()) - - -@app.post('/v1/users//steps') -def insert_steps(username): - error = check_headers_apikey() - if error: - return error - steps = get_steps(username) - if isinstance(steps, HTTPError) or isinstance(steps, HTTPResponse): - return steps - - insert_result = { - 'inserted_count': backend.insert_steps(username, steps), - 'steps': steps - } - - response.content_type = 'application/json' - return insert_result - - -@app.get('/v1/users//steps') -def get_steps(username): - error = check_headers_apikey() - if error: - return error - http_auth, timezone = get_google_http_auth_n_user_timezone(username) - end_time_millis, start_date, error = extract_header_dates() - - if error: - if isinstance(error, HTTPError): - return error - else: - return HTTPResponse({ - 'code': httplib.BAD_REQUEST, - 'error': str(error)}, httplib.BAD_REQUEST) - else: - try: - # end_time_millis in headers data is optional - if end_time_millis is None: - end_time_millis = backend.current_milli_time() - - steps = backend.get_daily_steps(http_auth, start_date['year'], start_date['month'], start_date['day'], - end_time_millis, local_timezone=timezone) - response.content_type = 'application/json' - return steps - except client.HttpAccessTokenRefreshError as err: - return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) - except googleapiclient.errors.HttpError as err: - return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) - - -@app.get('/v1') -def main(): - return static_file("post.html", ".") - - -@app.post('/v1/auth') -def google_auth(): - parts = request.urlparts - redirect_uri = "{}://{}/oauth2callback".format(parts.scheme, parts.netloc) - - flow = client.flow_from_clientsecrets( - backend.client_secret_file, - scope=["profile", "email", 'https://www.googleapis.com/auth/fitness.activity.read', - 'https://www.googleapis.com/auth/fitness.body.read'], - redirect_uri=redirect_uri) - flow.params['access_type'] = 'offline' - flow.params['prompt'] = 'consent' - error = check_forms_apikey() - if error: - return error - timezone = request.forms['timezone'] - auth_uri = flow.step1_get_authorize_url(state=timezone) - redirect(auth_uri) - - -def check_headers_apikey(): - if 'apikey' not in request.headers or request.headers['apikey'] != backend.API_key: - return HTTPError(httplib.UNAUTHORIZED, "invalid API key in {}".format("request.headers['apikey']")) - - -def check_forms_apikey(): - if 'apikey' not in request.forms or request.forms['apikey'] != backend.API_key: - return HTTPError(httplib.UNAUTHORIZED, "invalid API key in {}".format("request.forms['apikey']")) - - @app.get('/steps_for_user/') def steps_for_user(name, db): require_key() @@ -269,97 +126,6 @@ def query_activities(db, name): return activities -@app.post('/v1/users//activities') -def insert_user_activities(username): - error = check_headers_apikey() - if error: - return error - activities = get_user_activities(username) - if isinstance(activities, HTTPError) or isinstance(activities, HTTPResponse): - return activities - - insert_result = { - 'inserted_count': backend.insert_activities(username, activities), - 'activities': activities - } - - response.content_type = 'application/json' - return insert_result - - -@app.get('/v1/users//activities') -def get_user_activities(username): - error = check_headers_apikey() - if error: - return error - http_auth, timezone = get_google_http_auth_n_user_timezone(username) - end_time_millis, start_date, error = extract_header_dates() - - if error: - if isinstance(error, HTTPError): - return error - else: - return HTTPResponse({ - 'code': httplib.BAD_REQUEST, - 'error': str(error)}, httplib.BAD_REQUEST) - else: - try: - # end_time_millis in headers data is optional - if end_time_millis is None: - end_time_millis = backend.current_milli_time() - - activities = backend.get_daily_activities(http_auth, start_date['year'], start_date['month'], - start_date['day'], end_time_millis, local_timezone=timezone) - response.content_type = 'application/json' - return activities - except client.HttpAccessTokenRefreshError as err: - return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) - except googleapiclient.errors.HttpError as err: - return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) - - -def extract_header_dates(): - """ - Extract headers of start_year, start_month, start_day, and end_time_millis - where the start_* are local date and end_time_millis is the Unix Epoch time in milliseconds - :return: end time in Unix Epoch time in milliseconds, start date dictionary, error if any - """ - # parse headers data in request body - start_date = {'year': request.headers.get('start_year', None), 'month': request.headers.get('start_month', None), - 'day': request.headers.get('start_day', None)} - end_time_millis = request.headers.get('end_time_millis', None) - if end_time_millis is not None: - try: - end_time_millis = int(end_time_millis) - except ValueError as e: - return None, None, HTTPError(httplib.BAD_REQUEST, - 'Failed to convert end_time_millis in request.headers to int: ' + str(e)) - if start_date['year'] is None or start_date['month'] is None or start_date['day'] is None: - return None, None, HTTPError(httplib.BAD_REQUEST, "headers did not contain start_year, start_month, start_day") - else: - start_date['year'] = int(start_date['year']) - start_date['month'] = int(start_date['month']) - start_date['day'] = int(start_date['day']) - - return end_time_millis, start_date, None - - -# bug: only 1 activity per day returned. correct result has multiple activities per day. -# Fix: refer to @app.get('/users//activities') -@app.get('/activity_for_user_details/') -def activity_for_user_details(name, db): - require_key() - print(name) - db.execute( - "SELECT a.day, ROUND(a.length_ms / 1000 / 60) AS minutes, t.name as activity_type FROM activity a INNER JOIN activity_types t ON a.activity_type=t.id WHERE a.username=%s AND a.activity_type NOT IN {}".format( - bad_activities), (name,)) - result = dict( - [(r['day'], {"minutes": int(r['minutes']), "activity_type": r['activity_type']}) for r in db.fetchall()]) - print(result) - response.content_type = 'application/json' - return json.dumps(result, sort_keys=True, indent=4) - - @app.get('/steps_for_user/last_week/') def steps_for_user_last_week(name, db): require_key() @@ -449,211 +215,6 @@ def set_goal(name, goal, db): return "Goal set" -@app.post('/v1/users//heart') -def insert_heart_rate(username): - error = check_headers_apikey() - if error: - return error - http_auth, timezone = get_google_http_auth_n_user_timezone(username) - end_time_millis, start_date, error = extract_header_dates() - - if error: - if isinstance(error, HTTPError): - return error - else: - return HTTPResponse({ - 'code': httplib.BAD_REQUEST, - 'error': str(error)}, httplib.BAD_REQUEST) - else: - try: - # end_time_millis in form data is optional - if end_time_millis is None: - result = backend.get_and_insert_heart_rate(http_auth, username, - start_date['year'], start_date['month'], start_date['day'], - local_timezone=timezone) - else: - result = backend.get_and_insert_heart_rate(http_auth, username, - start_date['year'], start_date['month'], start_date['day'], - end_time_millis, local_timezone=timezone) - response.content_type = 'application/json' - return result - except client.HttpAccessTokenRefreshError as err: - return HTTPError(httplib.UNAUTHORIZED, "Refresh token invalid: " + str(err)) - except googleapiclient.errors.HttpError as err: - return HTTPError(err.resp.status, "Google API HttpError: " + str(err)) - - -def get_google_http_auth_n_user_timezone(username): - with open(backend.client_secret_file) as f: - client_secret_json = json.load(f) - client_id = client_secret_json['web']['client_id'] - client_secret = client_secret_json['web']['client_secret'] - ds = datastore.Client() - key = ds.key('credentials', username) - user = ds.get(key) - assert user.key.id_or_name == username - refresh_token = user['refresh_token'] - timezone = user['timezone'] - creds = client.GoogleCredentials(None, client_id, client_secret, refresh_token, None, - "https://accounts.google.com/o/oauth2/token", "Python") - http_auth = creds.authorize(httplib2.Http()) - return http_auth, timezone - - -@app.post('/v1/insert_daily_fitness') -def insert_daily_fitness_data_ondemand(): - """ - The query string needs to contain a list of users in the form of ?users=hil@gmail.com,estes@gmail.com,paes@gmail.com - :return: - """ - users_param = 'users' - if users_param not in request.query: - return HTTPError(httplib.BAD_REQUEST, - '{} does not exist in query string parameters; specify ?{}=user1@gmail.com,user2@company.com'.format( - users_param)) - usernames = request.query[users_param].split(',') - - return insert_daily_fitness_data_impl(usernames) - - -# callable only from App Engine cron jobs -@app.get('/v1/insert_daily_fitness') -def insert_daily_fitness_data(): - # validating request is from App Engine cron jobs - app_engine_cron_header = 'X-Appengine-Cron' - if app_engine_cron_header not in request.headers: - return HTTPError(httplib.UNAUTHORIZED, - 'Endpoint can only be invoked from Google App Engine cron jobs per https://cloud.google.com/appengine/docs/flexible/python/scheduling-jobs-with-cron-yaml') - - ds = datastore.Client() - query = ds.query(kind=backend.DATASTORE_KIND) - query.keys_only() - usernames = list(query.fetch()) - usernames = [u.key.id_or_name for u in usernames] - - return insert_daily_fitness_data_impl(usernames) - - -def insert_daily_fitness_data_impl(usernames, bucket_name=backend.DEFAULT_BUCKET): - """ - Call Google Fitness API for users in the Cloud Datastore credentials kind, save the responses in Cloud Storage, - insert the fitness data to Cloud BigQuery. - key is retry[username][category]['countdown'] - if value >= 0, retry down to value -1 or set value to -2 for non-recoverable errors - if value is None, op has succeeded - :param usernames: a list of usernames to call Google Fitness API with - :param bucket_name: save responses from Google Fitness API to a Google Cloud Storage bucket - :return: The results of getting from Google Fitness API and inserting to Cloud BigQuery - """ - retry = {} - threads = [] - - for username in usernames: - t = Thread(target=insert_daily_fitness_data_thread, args=(bucket_name, retry, username)) - threads.append(t) - t.start() - - for t in threads: - t.join() - - is_error = False - response.content_type = 'application/json' - for username, category in retry.iteritems(): - for cat, cat_result in category.iteritems(): - if 'error' in cat_result: - is_error = True - break - if is_error: - return HTTPResponse(retry, httplib.INTERNAL_SERVER_ERROR) - else: - return retry - - -def insert_daily_fitness_data_thread(bucket_name, retry, username): - error_reporting_client = error_reporting.Client() - http_context = error_reporting.HTTPContext(method='GET', url='/v1/insert_daily_fitness', - user_agent='cron job for user {}'.format(username)) - storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - http_auth, timezone = get_google_http_auth_n_user_timezone(username) - # get today's local date - 1 day - yesterday_local = datetime.now(pytz.timezone(timezone)) - timedelta(days=1) - yesterday_local_str = yesterday_local.strftime(backend.DATE_FORMAT) - df = backend.UserDataFlow(username, http_auth, yesterday_local.year, - yesterday_local.month, - yesterday_local.day, backend.current_milli_time(), timezone) - retry[username] = {} - categories = {'heartrate', 'activities', 'steps'} - for category in categories: - retry[username][category] = {} - # countdown is the number of retries - retry[username][category]['countdown'] = 1 - gs_path_get = '{}/{}/{}.json'.format(username, yesterday_local_str, category) - gs_path_insert = '{}/{}/{}_inserted_count.json'.format(username, yesterday_local_str, category) - get_result = None - insert_result = None - - # start of the retry logic - while retry[username][category]['countdown'] >= 0: - try: - if category == 'heartrate': - # get and insert heart rate data - insert_result = df.get_and_post_heart_rate() - get_result = insert_result['heart_datasets'] - elif category == 'activities': - # get and insert activities data - get_result = df.get_activities() - insert_result = df.post_activities() - elif category == 'steps': - # get and insert step counts - get_result = df.get_steps() - insert_result = df.post_steps() - # set to None upon success of getting API data and inserting to BigQuery - retry[username][category]['countdown'] = None - except client.HttpAccessTokenRefreshError as err: - http_context.responseStatusCode = httplib.UNAUTHORIZED - user_token_err = '{} has invalid refresh token'.format(username) - error_reporting_client.report_exception(http_context=http_context, - user=user_token_err) - retry[username][category]['error'] = "{}: {}".format(user_token_err, err) - # can't recover; abandon retry - retry[username][category]['countdown'] = -2 - except googleapiclient.errors.HttpError as err: - http_context.responseStatusCode = err.resp.status - error_reporting_client.report_exception(http_context=http_context, - user='Google API HttpError for user {}'.format(username)) - retry[username][category]['error'] = str(err) - if err.resp.status in ( - httplib.BAD_REQUEST, httplib.UNAUTHORIZED, httplib.NOT_FOUND, httplib.FORBIDDEN): - # can't recover; abandon retry - retry[username][category]['countdown'] = -2 - except Exception as err: - # https://googleapis.github.io/google-cloud-python/latest/error-reporting/usage.html - error_reporting_client.report_exception(http_context=http_context, - user='get and insert {} data for {} failed'.format(category, - username)) - retry[username][category]['error'] = str(err) - - # if retry for user on category isn't None, recoverable failure happened, decrement the retry count - if retry[username][category]['countdown'] is not None: - retry[username][category]['countdown'] -= 1 - else: - # exiting while loop because None >= 0 is False - pass - - # per category, putting the get, insert results on Cloud Storage upon success - if retry[username][category]['countdown'] is None: - retry[username][category]['gs://'] = [] - blob_get_result = bucket.blob(gs_path_get) - blob_get_result.upload_from_string(json.dumps(get_result)) - retry[username][category]['gs://'].append("{}/{}".format(bucket_name, gs_path_get)) - blob_insert_result = bucket.blob(gs_path_insert) - blob_insert_result.upload_from_string(json.dumps(insert_result)) - retry[username][category]['gs://'].append("{}/{}".format(bucket_name, gs_path_insert)) - - retry[username][category].pop('countdown') - - port = int(os.environ.get('PORT', 8080)) prefix = os.environ.get('PREFIX', None) if prefix: @@ -662,7 +223,7 @@ def insert_daily_fitness_data_thread(bucket_name, retry, username): if __name__ == "__main__": try: try: - app.run(host='0.0.0.0', port=port, debug=True, server='gunicorn', workers=8, timeout=1200) + app.run(host='0.0.0.0', port=port, debug=True, server='gunicorn', workers=2, timeout=1200) except ImportError: app.run(host='0.0.0.0', port=port, debug=True) except Exception as e: