Skip to content

Commit

Permalink
Ignore NA properties, adjust httpx config, add place types. (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyurva authored Jan 19, 2024
1 parent 347309f commit 5ed6033
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
2 changes: 1 addition & 1 deletion simple/stats/events_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _write_event_triples(self) -> None:
properties: dict[str, str] = {}

for i, (k, v) in enumerate(row.items()):
if i < 2:
if i < 2 or pd.isna(v):
continue
properties[k] = v

Expand Down
16 changes: 11 additions & 5 deletions simple/util/dc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@
NGRAM_MIN_MATCH_FRACTION = 0.8

# Place types support by the resolve API.
# Reference: https://source.corp.google.com/piper///depot/google3/datacommons/import/otherids/dc_ke_recon.cc;l=123
_RESOLVE_PLACE_TYPES = set([
"Place", "Continent", "Country", "State", "Province", "City",
"CensusZipCodeTabulationArea"
"Country", "State", "County", "City", "Village", "CensusCountyDivision",
"SchoolDistrict", "ElementarySchoolDistrict", "HighSchoolDistrict",
"UnifiedSchoolDistrict", "CensusZipCodeTabulationArea", "EurostatNUTS1",
"EurostatNUTS2", "EurostatNUTS3", "AdministrativeArea1",
"AdministrativeArea2", "AdministrativeArea3", "AdministrativeArea4",
"AdministrativeArea5", "Neighborhood", "AdministrativeArea", "Place"
])

_MAX_NODES = 10_000
Expand Down Expand Up @@ -102,7 +107,7 @@ async def resolve_place_entities_async(
chunks = chunked(entities, _RESOLVE_BATCH_SIZE)

resolved: dict[str, str] = {}
async with AsyncClient(limits=_HTTPX_LIMITS) as client:
async with AsyncClient(limits=_HTTPX_LIMITS, timeout=None) as client:
futures: dict[str, str] = [
_resolve_place_entities_chunk(client, chunk, entity_type, property_name)
for chunk in chunks
Expand Down Expand Up @@ -263,12 +268,13 @@ async def post_async(client: AsyncClient, path: str, data={}) -> dict:
if api_key:
headers["x-api-key"] = api_key
logging.debug("Request: %s", json.dumps(data, indent=1))
resp = await client.post(url, json=data, headers=headers)
async with asyncio.Semaphore(_HTTPX_LIMITS.max_connections):
resp = await client.post(url, json=data, headers=headers)
response = resp.json()
logging.debug("Response: %s", json.dumps(response, indent=1))
if resp.status_code != 200:
raise Exception(
f'{resp.status_code}: {resp.reason}\n{response["message"]}\nRequest: {path}\n{data}'
f'{resp.status_code}: {resp.reason_phrase}\n{response["message"]}\nRequest: {path}\n{data}'
)
return response

Expand Down

0 comments on commit 5ed6033

Please sign in to comment.