From 5ed6033d9636e5de7faea2a344dd85a088a5e3c4 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Fri, 19 Jan 2024 09:50:14 -0800 Subject: [PATCH] Ignore NA properties, adjust httpx config, add place types. (#273) --- simple/stats/events_importer.py | 2 +- simple/util/dc_client.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/simple/stats/events_importer.py b/simple/stats/events_importer.py index c7219046..11fb97e7 100644 --- a/simple/stats/events_importer.py +++ b/simple/stats/events_importer.py @@ -187,7 +187,7 @@ def _write_event_triples(self) -> None: properties: dict[str, str] = {} for i, (k, v) in enumerate(row.items()): - if i < 2: + if i < 2 or pd.isna(v): continue properties[k] = v diff --git a/simple/util/dc_client.py b/simple/util/dc_client.py index 48ebb237..6c53b121 100644 --- a/simple/util/dc_client.py +++ b/simple/util/dc_client.py @@ -40,9 +40,14 @@ NGRAM_MIN_MATCH_FRACTION = 0.8 # Place types support by the resolve API. +# Reference: https://source.corp.google.com/piper///depot/google3/datacommons/import/otherids/dc_ke_recon.cc;l=123 _RESOLVE_PLACE_TYPES = set([ - "Place", "Continent", "Country", "State", "Province", "City", - "CensusZipCodeTabulationArea" + "Country", "State", "County", "City", "Village", "CensusCountyDivision", + "SchoolDistrict", "ElementarySchoolDistrict", "HighSchoolDistrict", + "UnifiedSchoolDistrict", "CensusZipCodeTabulationArea", "EurostatNUTS1", + "EurostatNUTS2", "EurostatNUTS3", "AdministrativeArea1", + "AdministrativeArea2", "AdministrativeArea3", "AdministrativeArea4", + "AdministrativeArea5", "Neighborhood", "AdministrativeArea", "Place" ]) _MAX_NODES = 10_000 @@ -102,7 +107,7 @@ async def resolve_place_entities_async( chunks = chunked(entities, _RESOLVE_BATCH_SIZE) resolved: dict[str, str] = {} - async with AsyncClient(limits=_HTTPX_LIMITS) as client: + async with AsyncClient(limits=_HTTPX_LIMITS, timeout=None) as client: futures: dict[str, str] = [ _resolve_place_entities_chunk(client, chunk, entity_type, property_name) for chunk in chunks @@ -263,12 +268,13 @@ async def post_async(client: AsyncClient, path: str, data={}) -> dict: if api_key: headers["x-api-key"] = api_key logging.debug("Request: %s", json.dumps(data, indent=1)) - resp = await client.post(url, json=data, headers=headers) + async with asyncio.Semaphore(_HTTPX_LIMITS.max_connections): + resp = await client.post(url, json=data, headers=headers) response = resp.json() logging.debug("Response: %s", json.dumps(response, indent=1)) if resp.status_code != 200: raise Exception( - f'{resp.status_code}: {resp.reason}\n{response["message"]}\nRequest: {path}\n{data}' + f'{resp.status_code}: {resp.reason_phrase}\n{response["message"]}\nRequest: {path}\n{data}' ) return response