diff --git a/operators/mental_health_clinics/__init__.py b/operators/mental_health_clinics/__init__.py index 04d8ca5..c859e82 100644 --- a/operators/mental_health_clinics/__init__.py +++ b/operators/mental_health_clinics/__init__.py @@ -8,18 +8,25 @@ from operators.shil import ORGANIZATION from srm_tools.update_table import airtable_updater from srm_tools.processors import update_mapper - +from srm_tools.gov_data_proxy import collect_gov_rows FIELD_RENAME = { - 'שם המרפאה': 'name', - 'ישוב': 'city', - 'כתובת': 'street_address', - 'טלפון': 'phone_numbers', - 'למבוטחי איזו קופה.+': 'hmo', - 'מבוגרים / ילדים': 'age_group', - 'סוגי התערבויות.+': 'interventions', - 'מומחיות המרפאה.+': 'expertise', - 'המתנה ממוצעת לאינטק.+': 'intake_wait', + 'name': 'clinic_name', + 'city': 'city', + 'age_group': 'audience', + 'intake_wait': 'intake_wait', + 'phone_numbers': 'phone', + 'expertise': 'specialization', + 'interventions': 'intervention_type', + 'street_address': 'street', + 'hmo': 'HMO_code', +} +HMOS = { + '1': 'לאומית', + '2': 'מכבי', + '3': 'כללית', + '4': 'מאוחדת', + '5': 'כל הקופות', } MISSING_VALUES = [ 'אין מומחיות מיוחדת', @@ -133,8 +140,6 @@ def description(row): ret += title + ': ' + ', '.join(set(snippet)) + '\n\n' return ret -FILENAME = Path(__file__).resolve().with_name('mentalhealthclinics.xlsx') - def clinic_hash(row): items = [ @@ -149,11 +154,19 @@ def clinic_hash(row): def operator(*_): # Prepare data - DF.Flow( + def ren(k, v): + return DF.add_field(k, 'string', default=lambda row: row['Data'].pop(v, None)) + def del_data(row): + for k in ['follow_up_wait', 'group_therapy_wait', 'individual_therapy_wait', ]: + row['Data'].pop(k, None) + DF.Flow( # Load and clean - DF.load(str(FILENAME), name='clinics'), + collect_gov_rows('70c0b6fd-0c36-4b7a-a087-3157123403d9'), + DF.update_resource(-1, name='clinics'), + *[ren(k, v) for k, v in FIELD_RENAME.items()], + del_data, + DF.set_type('hmo', transform=HMOS.get), DF.select_fields(FIELD_RENAME.keys(), resources=-1), - DF.rename_fields(FIELD_RENAME, resources=-1), DF.update_schema(-1, missingValues=MISSING_VALUES), DF.validate(on_error=DF.schema_validator.clear), diff --git a/operators/meser/__init__.py b/operators/meser/__init__.py index 56fbaf9..665541e 100644 --- a/operators/meser/__init__.py +++ b/operators/meser/__init__.py @@ -138,21 +138,6 @@ def operator(*_): airtable_base=settings.AIRTABLE_ENTITIES_IMPORT_BASE ) - - DF.Flow( - DF.load('temp/meser/denormalized/datapackage.json'), - DF.rename_fields({ - 'service_id': 'id', - 'service_name': 'name', - 'service_description': 'description', - }, resources='meser'), - DF.add_field('data_sources', 'string', 'מידע על מסגרות רווחה התקבל ממשרד הרווחה והשירותים החברתיים', resources='meser'), - DF.add_field('branches', 'array', lambda r: [r['branch_id']], resources='meser'), - DF.select_fields(['id', 'name', 'description', 'data_sources', 'situations', 'responses', 'branches'], resources='meser'), - DF.add_field('data', 'object', lambda r: dict((k,v) for k,v in r.items() if k!='id'), resources='meser'), - DF.printer() - ).process() - airtable_updater( settings.AIRTABLE_SERVICE_TABLE, 'meser', ['id', 'name', 'description', 'data_sources', 'situations', 'responses', 'branches'], diff --git a/operators/revaha/__init__.py b/operators/revaha/__init__.py index 5da5b58..4c5bf32 100644 --- a/operators/revaha/__init__.py +++ b/operators/revaha/__init__.py @@ -15,7 +15,7 @@ from srm_tools.logger import logger from srm_tools.processors import ensure_fields, update_mapper from srm_tools.update_table import airtable_updater -from srm_tools.scraping_utils import overcome_blocking +from srm_tools.gov_data_proxy import collect_gov_rows def transform_phone_numbers(r): @@ -74,7 +74,6 @@ def make_unique_id_from_values(row): }, } -session = requests.Session() SERVICES = [ { @@ -164,48 +163,8 @@ def make_unique_id_from_values(row): } -def gov_data_proxy(template_id, skip): - data = { - 'DynamicTemplateID': template_id, - 'QueryFilters': {'skip': {'Query': skip}}, - 'From': skip, - } - timeout = 30 - resp = overcome_blocking( - session, - lambda: session.post( - settings.GOV_DATA_PROXY, - json=data, - timeout=timeout, - ) - ) - response = resp.json() - total, results = response['TotalResults'], response['Results'] - - return total, results - - def get_revaha_data(): - skip = 0 - # seems to only ever return 10 results in a call - skip_by = 10 - template_id = '23ede39d-968c-4e5c-8098-9c58b037a0c3' - total, results = gov_data_proxy(template_id, skip) - - while len(results) < total: - skip += skip_by - for _ in range(3): - _, batch = gov_data_proxy(template_id, skip) - if len(batch) > 0: - results.extend(batch) - print(f'SKIPPED {skip}, GOT {len(results)}, TOTAL {total}') - break - time.sleep(10) - else: - break - print('FETCHED {} REVAHA RECORDS'.format(len(results))) - assert len(results) > 0 - return results + return collect_gov_rows('23ede39d-968c-4e5c-8098-9c58b037a0c3') def revaha_organization_data_flow(): diff --git a/srm_tools/gov_data_proxy.py b/srm_tools/gov_data_proxy.py new file mode 100644 index 0000000..e12d1db --- /dev/null +++ b/srm_tools/gov_data_proxy.py @@ -0,0 +1,54 @@ +import requests +import time + +from conf import settings + +from srm_tools.scraping_utils import overcome_blocking + + +session = requests.Session() + + +def gov_data_proxy(template_id, skip): + data = { + 'DynamicTemplateID': template_id, + 'QueryFilters': {'skip': {'Query': skip}}, + 'From': skip, + } + timeout = 30 + resp = overcome_blocking( + session, + lambda: session.post( + settings.GOV_DATA_PROXY, + json=data, + timeout=timeout, + ) + ) + response = resp.json() + total, results = response['TotalResults'], response['Results'] + + return total, results + + +def collect_gov_rows(template_id): + def func(rows): + skip = 0 + # seems to only ever return 10 results in a call + skip_by = 10 + total, results = gov_data_proxy(template_id, skip) + yield from results + + while len(results) < total: + skip += skip_by + for _ in range(3): + _, batch = gov_data_proxy(template_id, skip) + if len(batch) > 0: + yield from batch + print(f'SKIPPED {skip}, TOTAL {total}') + total += len(batch) + break + time.sleep(10) + else: + break + print(f'FETCHED {total} FOR {template_id}') + assert total > 0