Skip to content

Commit

Permalink
spark fair scheduling, asynchronous job in apii (jobs/), multiple api…
Browse files Browse the repository at this point in the history
… end-points (...Spark for legacy, or algorithm/...)

create handler manager to have multiple endpoint for the same algorithm

implement a demo asynchronous mode in the restapi

remove pydataclasses dependency
  • Loading branch information
thomas loubrieu committed Sep 29, 2020
1 parent af2b234 commit 9fa5a80
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 80 deletions.
3 changes: 2 additions & 1 deletion analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ Python module that exposes NEXUS analytical capabilities via a HTTP webservice.
conda activate nexus-analysis
````

2. Install conda dependencies
2. Install conda dependencies and other dependencies

````
cd analysis
pip install asyncio # for asynchronous job management
conda install pyspark
conda install -c conda-forge --file conda-requirements.txt
#conda install numpy matplotlib mpld3 scipy netCDF4 basemap gdal pyproj=1.9.5.1 libnetcdf=4.3.3.1
Expand Down
1 change: 1 addition & 0 deletions analysis/conda-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ gdal==3.0.2
mock==2.0.0
singledispatch==3.4.0.3


14 changes: 11 additions & 3 deletions analysis/webservice/NexusHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

import logging
import types
from functools import partial

AVAILABLE_HANDLERS = []
AVAILABLE_LEGACY_HANDLERS = []
AVAILABLE_RESTAPI_HANDLERS = []
AVAILABLE_WPS_HANDLERS = []
AVAILABLE_INITIALIZERS = []


Expand All @@ -32,17 +35,22 @@ def nexus_initializer(clazz):
return clazz


def nexus_handler(clazz):
def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS):
log = logging.getLogger(__name__)
try:
clazz.validate()
log.info("Adding algorithm module '%s' with path '%s' (%s)" % (clazz.name, clazz.path, clazz))
AVAILABLE_HANDLERS.append(clazz)
handler_list.append(clazz)
except Exception as ex:
log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True)
return clazz


nexus_restapi_handler = partial(nexus_handler, handler_list=AVAILABLE_RESTAPI_HANDLERS)
nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS)



DEFAULT_PARAMETERS_SPEC = {
"ds": {
"name": "Dataset",
Expand Down
4 changes: 2 additions & 2 deletions analysis/webservice/algorithms/Capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json

from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS
from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.webmodel import NexusResults

Expand All @@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
def calc(self, computeOptions, **args):
capabilities = []

for capability in AVAILABLE_HANDLERS:
for capability in AVAILABLE_LEGACY_HANDLERS:
capabilityDef = {
"name": capability.name,
"path": capability.path,
Expand Down
2 changes: 1 addition & 1 deletion analysis/webservice/algorithms/NexusCalcHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def validate(cls):
if "params" not in cls.__dict__:
raise Exception("Property 'params' has not been defined")

def __init__(self, tile_service_factory, skipCassandra=False, skipSolr=False):
def __init__(self, tile_service_factory):
# self.algorithm_config = algorithm_config
# self._skipCassandra = skipCassandra
# self._skipSolr = skipSolr
Expand Down
37 changes: 15 additions & 22 deletions analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
from datetime import datetime
from functools import partial

import uuid
import numpy as np
import shapely.geometry
from pytz import timezone

from webservice.NexusHandler import nexus_handler
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException

Expand All @@ -29,6 +30,7 @@


@nexus_handler
@nexus_restapi_handler
class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
# __singleton_lock = threading.Lock()
# __singleton_instance = None
Expand Down Expand Up @@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
}
singleton = True

# @classmethod
# def instance(cls, algorithm_config=None, sc=None):
# with cls.__singleton_lock:
# if not cls.__singleton_instance:
# try:
# singleton_instance = cls()
# singleton_instance.set_config(algorithm_config)
# singleton_instance.set_spark_context(sc)
# cls.__singleton_instance = singleton_instance
# except AttributeError:
# pass
# return cls.__singleton_instance

def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
Expand Down Expand Up @@ -118,7 +107,8 @@ def parse_arguments(self, request):

return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, nparts_requested

def calc(self, compute_options, **args):
def calc(self, compute_options,
**args):
"""
:param compute_options: StatsComputeOptions
Expand All @@ -130,6 +120,7 @@ def calc(self, compute_options, **args):
metrics_record = self._create_metrics_record()

ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)

self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
Expand All @@ -147,13 +138,13 @@ def calc(self, compute_options, **args):
print('Found {} tiles'.format(len(nexus_tiles)))

daysinrange = self._get_tile_service().find_days_in_range_asc(bbox.bounds[1],
bbox.bounds[3],
bbox.bounds[0],
bbox.bounds[2],
ds,
start_time,
end_time,
metrics_callback=metrics_record.record_metrics)
bbox.bounds[3],
bbox.bounds[0],
bbox.bounds[2],
ds,
start_time,
end_time,
metrics_callback=metrics_record.record_metrics)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
Expand Down Expand Up @@ -262,6 +253,8 @@ def calc(self, compute_options, **args):
maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
endTime=end_time)



@staticmethod
def _map(tile_service_factory, metrics_callback, tile_in_spark):
tile_bounds = tile_in_spark[0]
Expand Down
3 changes: 2 additions & 1 deletion analysis/webservice/algorithms_spark/TimeSeriesSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pytz import timezone
from scipy import stats
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler
from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, nexus_wps_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException

Expand All @@ -43,6 +43,7 @@


@nexus_handler
@nexus_restapi_handler
class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
name = "Time Series Spark"
path = "/timeSeriesSpark"
Expand Down
3 changes: 1 addition & 2 deletions analysis/webservice/algorithms_spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import DailyDifferenceAverageSpark
import HofMoellerSpark
import MaximaMinimaSpark
import NexusCalcSparkHandler
import TimeAvgMapSpark
import TimeSeriesSpark
import VarianceSpark

import NexusCalcSparkHandler

log = logging.getLogger(__name__)

Expand Down
8 changes: 8 additions & 0 deletions analysis/webservice/config/spark_pools.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0"?>
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
</allocations>
1 change: 1 addition & 0 deletions analysis/webservice/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .job import Job
12 changes: 12 additions & 0 deletions analysis/webservice/jobs/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from datetime import datetime

class Job():
def __init__(self):
self.request = None # NexusRequestObject
self.result_future = None # tornado.gen.Future
self.time_created = datetime.now()
self.time_done = None




Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import json
import uuid
from datetime import datetime, timedelta
import tornado.web
import tornado.ioloop
from webservice.nexus_tornado.request.renderers import NexusRendererFactory


class NexusAsyncJobHandler(tornado.web.RequestHandler):

_job_pool = {}
__logger = logging.getLogger('nexus')

obsolete_after = timedelta(hours=12)
clean_obsolete_every = timedelta(minutes=15)

@classmethod
def get_job_pool(cls):
return cls._job_pool

@classmethod
def start_jobs_cleaner(cls):

def clean():
for key, job in cls._job_pool.iteritems():
if datetime.now() - job.time_done > cls.obsolete_after:
cls.__logger.info("clean job {}".format(key))
del cls._job_pool[key]

tornado.ioloop.IOLoop.current().call_later(cls.clean_obsolete_every.seconds, clean)

def get(self, job_id):
self.__logger.info("get job among {}".format(self._job_pool))
if job_id in self._job_pool:
job = self._job_pool[job_id]
if job.result_future.done():
renderer = NexusRendererFactory.get_renderer(job.request)
renderer.render(self, job.result_future.result())
else:
self._non_completed_job_callback(job_id)

else:
self._non_existing_job_callback(job_id)

def _non_existing_job_callback(self, job_id, code=404):
message = "Job {} does not exist".format(job_id)
self._error_callback(message, code)

def _non_completed_job_callback(self, job_id, code=202):
message = "Job {} is being processed".format(job_id)
self._error_callback(message, code)

def _error_callback(self, message, code):
self.__logger.info(message, exc_info=True)

self.set_header("Content-Type", "application/json")
self.set_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.set_header("Pragma", "no-cache")
self.set_header("Expires", 0)
self.set_status(code)

response = {
"error": message,
"code": code
}

self.write(json.dumps(response, indent=5))
self.finish()

def data_received(self, chunk):
pass

@classmethod
def get_short_job_id(cls):
while True:
job_id = str(uuid.uuid4())[:6]
if job_id not in cls._job_pool:
return job_id


Loading

0 comments on commit 9fa5a80

Please sign in to comment.