Skip to content

Commit

Permalink
Merge pull request #134 from hms-dbmi/keller-mark/aggregation-post-api
Browse files Browse the repository at this point in the history
Aggregation by POST request to /tiles endpoint
  • Loading branch information
keller-mark authored Jun 18, 2020
2 parents a2c7003 + dec93a9 commit 8e38810
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ static/
hgs-static/

config.json
.envrc
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Future version
- Added support for bigBed files
- Update readme installation instructions and troubleshooting instructions for macOS 10.15
- Always consider proxy headers (X-Forwarded-Host, X-Forwarded-Proto) for redirect URL construction
- Added support for server-side aggregation of multivec tiles by sending a `POST` request to the `/tiles` endpoint, where the body contains a JSON object mapping tileset UIDs to objects with properties `agg_groups` (a 2D array where each subarray is a group of rows to aggregate) and `agg_func` (the name of an aggregation function).

v1.13.0

Expand Down
4 changes: 2 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ channels:
- conda-forge
- bioconda
- defaults

dependencies:
- python>=3.6
- pip
- pip:
- pybbi==0.2.0
- pybbi==0.2.2
- bumpversion==0.5.3
- CacheControl==0.12.4
- cooler==0.8.6
Expand All @@ -19,6 +18,7 @@ dependencies:
- djangorestframework==3.9.1
- h5py==2.6.0
- higlass-python==0.2.1
- jsonschema==3.2.0
- numba==0.46.0
- numpy==1.17.3
- pandas==0.23.4
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pybbi==0.2.0
pybbi==0.2.2
bumpversion==0.5.3
CacheControl==0.12.4
cooler==0.8.6
Expand All @@ -9,6 +9,7 @@ django==2.1.11
djangorestframework==3.9.1
h5py==2.6.0
higlass-python==0.2.1
jsonschema==3.2.0
numba==0.46.0
numpy==1.17.3
pandas==0.23.4
Expand Down
31 changes: 28 additions & 3 deletions tilesets/generate_tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def extract_tileset_uid(tile_id):
def get_tileset_filetype(tileset):
return tileset.filetype

def generate_1d_tiles(filename, tile_ids, get_data_function):
def generate_1d_tiles(filename, tile_ids, get_data_function, tileset_options):
'''
Generate a set of tiles for the given tile_ids.
Expand All @@ -122,12 +122,25 @@ def generate_1d_tiles(filename, tile_ids, get_data_function):
to be retrieved
get_data_function: lambda
A function which retrieves the data for this tile
tileset_options: dict or None
An optional dict containing options, including aggregation options.
Returns
-------
tile_list: [(tile_id, tile_data),...]
A list of tile_id, tile_data tuples
'''

agg_func_map = {
"sum": lambda x: np.sum(x, axis=0),
"mean": lambda x: np.mean(x, axis=0),
"median": lambda x: np.median(x, axis=0),
"std": lambda x: np.std(x, axis=0),
"var": lambda x: np.var(x, axis=0),
"max": lambda x: np.amax(x, axis=0),
"min": lambda x: np.amin(x, axis=0),
}

generated_tiles = []

for tile_id in tile_ids:
Expand All @@ -136,6 +149,11 @@ def generate_1d_tiles(filename, tile_ids, get_data_function):

dense = get_data_function(filename, tile_position)

if tileset_options != None and "aggGroups" in tileset_options and "aggFunc" in tileset_options:
agg_func_name = tileset_options["aggFunc"]
agg_group_arr = [ x if type(x) == list else [x] for x in tileset_options["aggGroups"] ]
dense = np.array(list(map(agg_func_map[agg_func_name], [ dense[arr] for arr in agg_group_arr ])))

if len(dense):
max_dense = max(dense.reshape(-1,))
min_dense = min(dense.reshape(-1,))
Expand Down Expand Up @@ -477,18 +495,24 @@ def generate_tiles(tileset_tile_ids):
Parameters
----------
tileset_tile_ids: tuple
A four-tuple containing the following parameters.
tileset: tilesets.models.Tileset object
The tileset that the tile ids should be retrieved from
tile_ids: [str,...]
A list of tile_ids (e.g. xyx.0.0.1) identifying the tiles
to be retrieved
raw: str or False
The value of the GET request parameter `raw`.
tileset_options: dict or None
An optional dict containing tileset options, including aggregation options.
Returns
-------
tile_list: [(tile_id, tile_data),...]
A list of tile_id, tile_data tuples
'''
tileset, tile_ids, raw = tileset_tile_ids
tileset, tile_ids, raw, tileset_options = tileset_tile_ids

if tileset.filetype == 'hitile':
return generate_hitile_tiles(tileset, tile_ids)
Expand All @@ -512,7 +536,8 @@ def generate_tiles(tileset_tile_ids):
return generate_1d_tiles(
tileset.datafile.path,
tile_ids,
ctmu.get_single_tile)
ctmu.get_single_tile,
tileset_options)
elif tileset.filetype == 'imtiles':
return hgim.get_tiles(tileset.datafile.path, tile_ids, raw)
elif tileset.filetype == 'bam':
Expand Down
39 changes: 39 additions & 0 deletions tilesets/json_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
tiles_post_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"multivecRowAggregationOptions": {
"type": "object",
"required": ["aggGroups", "aggFunc"],
"additionalProperties": False,
"properties": {
"aggGroups": {
"type": "array",
"items": {
"oneOf": [
{ "type": "integer" },
{ "type": "array", "items": { "type": "integer" }}
]
}
},
"aggFunc": {
"type": "string",
"enum": ["sum", "mean", "median", "std", "var", "min", "max"]
}
}
}
},
"type": "array",
"items": {
"type": "object",
"required": ["tilesetUid", "tileIds"],
"properties": {
"tilesetUid": { "type": "string" },
"tileIds": { "type": "array", "items": { "type": "string" }},
"options": {
"oneOf": [
{ "$ref": "#/definitions/multivecRowAggregationOptions" }
]
}
}
}
}
55 changes: 55 additions & 0 deletions tilesets/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,43 @@ def test_get_tile(self):
r = base64.decodestring(content['a.11.0']['dense'].encode('utf-8'))
q = np.frombuffer(r, dtype=np.float16)

assert q.shape[0] == 512

def test_get_tiles_via_post_with_aggregation(self):
self.user1 = dcam.User.objects.create_user(
username='user1', password='pass'
)
upload_file = open('data/chr21.KL.bed.multires.mv5', 'rb')
mv = tm.Tileset.objects.create(
datafile=dcfu.SimpleUploadedFile(
upload_file.name, upload_file.read()
),
filetype='multivec',
datatype='multivec',
coordSystem="hg38",
owner=self.user1,
uuid='chr21_KL'
)

body = [
{
"tilesetUid": "chr21_KL",
"tileIds": ["0.0", "0.1"],
"options": {
"aggGroups": [[0, 1], [2, 3, 4], [5, 6]],
"aggFunc": "sum"
}
}
]

ret = self.client.post('/api/v1/tiles/', json.dumps(body), content_type="application/json")
assert ret.status_code == 200
content = json.loads(ret.content.decode('utf-8'))
r = base64.decodestring(content['chr21_KL.0.0']['dense'].encode('utf-8'))
q = np.frombuffer(r, dtype=np.float16)

assert q.shape[0] == 768


class ChromosomeSizes(dt.TestCase):
def test_list_chromsizes(self):
Expand Down Expand Up @@ -968,6 +1005,24 @@ def test_get_tiles(self):
except OSError:
pass

def test_get_tiles_via_post(self):
c1 = dt.Client()
c1.login(username='user1', password='pass')

body = [
{
"tilesetUid": "bb",
"tileIds": ["14.12"]
}
]

ret = c1.post('/api/v1/tiles/', json.dumps(body), content_type="application/json")
assert ret.status_code == 200
content = json.loads(ret.content.decode('utf-8'))
content_len = len(content['bb.14.12'])

assert content_len == 200


class CoolerTest(dt.TestCase):
def setUp(self):
Expand Down
69 changes: 62 additions & 7 deletions tilesets/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import tilesets.chromsizes as tcs
import tilesets.generate_tiles as tgt
import tilesets.json_schemas as tjs

import clodius.tiles.bam as ctb
import clodius.tiles.cooler as hgco
Expand Down Expand Up @@ -51,6 +52,9 @@

import slugid
import urllib
import hashlib
from jsonschema import validate as json_validate
from jsonschema.exceptions import ValidationError as JsonValidationError

try:
import cPickle as pickle
Expand Down Expand Up @@ -376,7 +380,7 @@ def add_transform_type(tile_id):
return new_tile_id


@api_view(['GET'])
@api_view(['GET', 'POST'])
def tiles(request):
'''Retrieve a set of tiles
Expand All @@ -393,8 +397,51 @@ def tiles(request):
(tile_id, tile_data) items.
'''
# create a set so that we don't fetch the same tile multiple times
tileids_to_fetch = set(request.GET.getlist("d"))
tileids_to_fetch = set()
tileset_to_options = dict()

TILE_LIMIT = 1000

if request.method == 'POST':
# This is a POST request, so try to parse the request body as JSON.
try:
body = json.loads(request.body.decode('utf-8'))
except:
return JsonResponse({
'error': 'Unable to parse request body as JSON.'
}, status=rfs.HTTP_400_BAD_REQUEST)

# Validate against the JSON schema.
try:
json_validate(instance=body, schema=tjs.tiles_post_schema)
except JsonValidationError as e:
return JsonResponse({
'error': f"Invalid request body: {e.message}.",
}, status=rfs.HTTP_400_BAD_REQUEST)

# Iterate over tilesets to obtain the associated tile IDs and options.
for tileset_info in body:
tileset_uid = tileset_info["tilesetUid"]
# Prepend the tileset UID to each tile ID suffix.
tile_ids = [ f"{tileset_uid}.{tile_id}" for tile_id in tileset_info["tileIds"] ]
tileids_to_fetch.update(tile_ids)

tileset_options = tileset_info.get("options", None)
# The "options" property is optional.
if type(tileset_options) == dict:
tileset_to_options[tileset_uid] = tileset_options
# Hash the options object so that the tile can be cached.
tileset_to_options[tileset_uid]["options_hash"] = hashlib.md5(json.dumps(tileset_options).encode('utf-8')).hexdigest()

elif request.method == 'GET':
# create a set so that we don't fetch the same tile multiple times
tileids_to_fetch = set(request.GET.getlist("d"))

if len(tileids_to_fetch) > TILE_LIMIT:
return JsonResponse({
'error': "Too many tiles were requested.",
}, status=rfs.HTTP_400_BAD_REQUEST)

# with ProcessPoolExecutor() as executor:
# res = executor.map(parallelize, hargs)
'''
Expand Down Expand Up @@ -435,7 +482,11 @@ def tiles(request):
# see if the tile is cached
tile_value = None
try:
tile_value = rdb.get(tile_id)
if tileset_uuid in tileset_to_options:
tileset_options = tileset_to_options[tileset_uuid]
tile_value = rdb.get(tile_id + tileset_options["options_hash"])
else:
tile_value = rdb.get(tile_id)
except Exception as ex:
# there was an error accessing the cache server
# log the error and carry forward fetching the tile
Expand All @@ -454,7 +505,7 @@ def tiles(request):

# fetch the tiles
tilesets = [tilesets[tu] for tu in tileids_by_tileset]
accessible_tilesets = [(t, tileids_by_tileset[t.uuid], raw) for t in tilesets if ((not t.private) or request.user == t.owner)]
accessible_tilesets = [(t, tileids_by_tileset[t.uuid], raw, tileset_to_options.get(t.uuid, None)) for t in tilesets if ((not t.private) or request.user == t.owner)]

#pool = mp.Pool(6)

Expand All @@ -477,9 +528,13 @@ def tiles(request):
tiles_to_return = {}

for (tile_id, tile_value) in generated_tiles:

tileset_uuid = tgt.extract_tileset_uid(tile_id)
try:
rdb.set(tile_id, pickle.dumps(tile_value))
if tileset_uuid in tileset_to_options:
tileset_options = tileset_to_options[tileset_uuid]
rdb.set(tile_id + tileset_options["options_hash"], pickle.dumps(tile_value))
else:
rdb.set(tile_id, pickle.dumps(tile_value))
except Exception as ex:
# error caching a tile
# log the error and carry forward, this isn't critical
Expand Down

0 comments on commit 8e38810

Please sign in to comment.