Skip to content

Commit

Permalink
DEV-132 add deferred response reader
Browse files Browse the repository at this point in the history
This allows us to build all of the requests and pass them to the
library for aggregation, but defer the actual network request
until the library is ready to read the file. By deferring the
requests, we don't need to pre-download all of the files before
aggregating them.
  • Loading branch information
philtom-ctds committed Mar 31, 2020
1 parent 5c21680 commit 31cb4e3
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 56 deletions.
75 changes: 75 additions & 0 deletions gdc_maf_tool/defer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import hashlib
import io
from typing import Callable, Optional

import requests

ResponseProvider = Callable[[], requests.Response]


class DeferredRequestReader(io.BufferedIOBase):
"""Defer a request until the caller is ready to read the response.
Attributes:
provider: A function that returns a response object.
md5sum: An optional md5 digest in hex format.
"""

def __init__(self, provider: ResponseProvider, md5sum: Optional[str] = None):
self._provider = provider
self._md5sum = md5sum

self._response = None
self._content_position = 0
self._content_length = 0

def _realize(self):
"""Realize the response."""
if self._response:
return

response = self._provider()
response.raise_for_status()
self._validate_checksum(response.content)

self._content_position = 0
self._content_length = len(response.content)
self._response = response

def _validate_checksum(self, content):
if not self._md5sum:
return

hash_md5 = hashlib.md5()
hash_md5.update(content)
md5 = hash_md5.hexdigest()
if self._md5sum != md5:
raise ValueError(f"Failed checksum. Expected {self._md5sum}. Got {md5}.")

@property
def response(self) -> requests.Response:
self._realize()
return self._response

def readable(self):
return True

def read(self, size=-1):
"""Read from the response."""
self._realize()

if self._content_position >= self._content_length:
return b""

if size == -1:
start = self._content_position
self._content_position = self._content_length
return self.response.content[start:]

if size == 0:
return b""

start = self._content_position
end = start + size
self._content_position = end
return self.response.content[start:end]
53 changes: 12 additions & 41 deletions gdc_maf_tool/gdc_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
select_primary_aliquots,
)

from gdc_maf_tool import log
from gdc_maf_tool import log, defer
from gdc_maf_tool.log import logger


Expand Down Expand Up @@ -125,53 +125,24 @@ def _parse_hit(hit: Dict) -> Dict:
}


def download_maf(uuid: str, md5sum: str, token: str = None, retry_amount: int = 3):
def download_maf(
uuid: str, md5sum: str, token: str = None
) -> defer.DeferredRequestReader:
"""
Downloads each MAF file and returns the resulting bytes of response content.
Verify that the MD5 matches the maf metadata.
"""
headers = {}
if token:
headers = {"X-Auth-Token": token}

for _ in range(retry_amount):
# progress bar?
logger.info("Downloading File: %s ", uuid)
resp = requests.get(
"https://api.gdc.cancer.gov/data/{}".format(uuid), headers=headers,
)
if resp.status_code == 200:
break
if resp.status_code == 403:
logger.warning("You do not have access to %s", uuid)
continue
logger.info("Retrying Download...")

else:
log.fatal("Maximum retries exceeded")

if not check_md5sum(resp.content, md5sum): log.fatal("md5sum not matching expected value for {}".format(uuid))
else:
return resp.content

return ""
def provider() -> requests.Response:
headers = {}
if token:
headers = {"X-Auth-Token": token}

logger.info("Downloading File: %s ", uuid)
return requests.get(f"https://api.gdc.cancer.gov/data/{uuid}", headers=headers,)

def chunk_iterator(iterator: Any, size: int = 4096) -> Iterator:
for i in range(0, len(iterator), size):
yield iterator[i : i + size]


def check_md5sum(contents: bytes, expected_md5: str, chunk_size: int = 4096) -> bool:
"""
Checks the MD5SUM matches the one in the GDC index
"""
hash_md5 = hashlib.md5()
for chunk in chunk_iterator(contents, size=chunk_size):
hash_md5.update(chunk)

return expected_md5 == hash_md5.hexdigest()
return defer.DeferredRequestReader(provider, md5sum)


def only_one_project_id(hit_map: Dict) -> None:
Expand Down Expand Up @@ -236,7 +207,7 @@ def collect_mafs(
)
mafs.append(
AliquotLevelMaf(
file=io.BytesIO(maf_file_contents),
file=maf_file_contents,
tumor_aliquot_submitter_id=sample_id,
)
)
Expand Down
69 changes: 69 additions & 0 deletions tests/test_defer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
import requests

from gdc_maf_tool import defer


def test_deferredrequestreader__read():
class FakeResponse(requests.Response):
@property
def content(self):
return b"one\ntwo\nthree"

def raise_for_status(self):
return

def provider():
return FakeResponse()

reader = defer.DeferredRequestReader(provider)
lines = [line for line in reader]
assert lines == [b"one\n", b"two\n", b"three"]


def test_deferredrequestreader__failed_request():
class FakeResponse(requests.Response):
def raise_for_status(self):
raise requests.HTTPError()

def provider():
return FakeResponse()

with pytest.raises(requests.HTTPError):
reader = defer.DeferredRequestReader(provider)
reader.read()


def test_deferredrequestreader__md5_match():
class FakeResponse(requests.Response):
@property
def content(self):
return b"md5_match\n"

def raise_for_status(self):
return

def provider():
return FakeResponse()

reader = defer.DeferredRequestReader(provider, "d8ab26d704d5d89a5356609ec42c2691")
assert reader.read() == b"md5_match\n"


def test_deferredrequestreader__md5_mismatch():
class FakeResponse(requests.Response):
@property
def content(self):
return b"md5_mismatch\n"

def raise_for_status(self):
return

def provider():
return FakeResponse()

with pytest.raises(ValueError):
reader = defer.DeferredRequestReader(
provider, "d8ab26d704d5d89a5356609ec42c2691"
)
reader.read()
16 changes: 1 addition & 15 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@
import pytest

from gdc_maf_tool import gdc_api_client, cli


@pytest.mark.parametrize(
"given,expected,chunk_size",
[
("abcdefg", ["a", "b", "c", "d", "e", "f", "g"], 1),
("abcdefg", ["ab", "cd", "ef", "g"], 2),
("abcdefg", ["abc", "def", "g"], 3),
],
)
def test_chunk_iterator(given, expected, chunk_size):
assert list(gdc_api_client.chunk_iterator(given, chunk_size)) == expected
from gdc_maf_tool import cli


def test_ids_from_manifest(fake_manifest):
Expand Down

0 comments on commit 31cb4e3

Please sign in to comment.