diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f3b6411 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +**/.git diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d2d70c --- /dev/null +++ b/.gitignore @@ -0,0 +1,153 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# static files generated from Django application using `collectstatic` +media +static + +# Env files +.DominoEnv/ +.vscode/ +data/ +.Domino/ +.XLS2CSV/ +modules/TempNB/all_US_studies_by_keyword.json +modules/TempNB/drug_vocab.csv.zip +modules/TempNB/vocab.csv diff --git a/README.md b/README.md index 4c77259..1690aa0 100644 --- a/README.md +++ b/README.md @@ -2,24 +2,34 @@ ## Scaling COVID public behavior change and anti-misinformation -* [Private repository](https://github.com/graphistry/ProjectDomino-internal) - * [Community Slack channel: #COVID](https://thedataridealongs.slack.com/) via an [open invite link](https://join.slack.com/t/thedataridealongs/shared_invite/zt-d06nq64h-P1_3sENXG4Gg0MjWh1jPEw) -* **Graph4good contributors:** We're excited to work with you! Check out the subprojects below we are actively seeking help on, and ping on Slack for which you're curious about tackling. We can then share our current thoughts and tips for getting started. Most will be useful as pure Python [Google Collab notebook](https://colab.research.google.com) proveouts and local Neo4j Docker + Python proveouts: You can move quickly, and we can more easily integrate into our automation pipelines. +* [Meetings: Google calendar](https://calendar.google.com/calendar?cid=Z3JhcGhpc3RyeS5jb21fdTQ3bmQ3YTdiZzB0aTJtaW9kYTJybGx2cTBAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbQ) + +* Project Tackers: [Core](https://github.com/TheDataRideAlongs/ProjectDomino/projects/1), [Interventions](https://github.com/TheDataRideAlongs/ProjectDomino/projects/2), and [Issues](https://github.com/TheDataRideAlongs/ProjectDomino/issues) + +* Infra - ask for account access in Slack + * [Private repository](https://github.com/graphistry/ProjectDomino-internal) + * Observability - [Grafana](http://13.68.225.97:10007/d/n__-I5jZk/neo4j-4-dashboard?orgId=1) + * DB - Neo4j + * Orchestration - Prefect + * Key sharing - Keybase team + * CI - Github Actions + +* **Graph4good contributors:** We're excited to work with you! Check out the subprojects below we are actively seeking help on, look at some of the Github Issues, and ping on Slack for which you're curious about tackling and others not here. We can then share our current thoughts and tips for getting started. Most will be useful as pure Python [Google Collab notebook](https://colab.research.google.com) proveouts and local Neo4j Docker + Python proveouts: You can move quickly, and we can more easily integrate into our automation pipelines. **One of the most important steps in stopping the COVID-19 pandemic is influencing mass behavior change for citizens to take appropriate, swift action on mitigating infection and human-to-human contact.** Government officials at all levels have advocated misinformed practices such as dining out or participating in outdoor gatherings that have contributed to amplifying the curve rather than flattening it. At time of writing, the result of poor crisis emergency risk communication has led to over 14,000 US citizens testing positive, 2-20X more are likely untested, and over 200 deaths. The need to influence appropriate behavior and mitigation actions are extreme: The US has shot up from untouched to become the 6th most infected nation. **Project Domino accelerates research on developing capabilities for information hygiene at the mass scale necessary for the current national disaster and for future ones.** We develop and enable the use of 3 key data capabilities for modern social discourse: -* Identifying at-risk behavior groups and viable local behavior change influencers * Detecting misinformation campaigns +* Identifying at-risk behavior groups and viable local behavior change influencers * Automating high-precision interventions ## The interventions We are working with ethics groups to identify safe interventions along the following lines: -* **Targeting of specific underserved issues**: Primary COVID public health issues such as unsafe social behavior, unsafe medicine, unsafe science, dangerous government policy influence, and adjacent issues such as fake charities, phishing, malware, and hate groups +* **Targeting of specific underserved issues**: Primary COVID public health issues such as unsafe social behavior, unsafe medicine, unsafe science, dangerous government policy influence, and adjacent issues such as fake charities, phishing, malware, and hate group propaganda * **Help top social platforms harden themselves**: Trust and safety teams at top social networks need to be able to warn users about misinformation, de-trend it, and potentially take it down before it has served its purpose. The status quo is handling incidents months after the fact. We will provide real-time alert feeds and scoring APIs to help take action during the critical minutes before misinformation gains significant reach. @@ -36,9 +46,10 @@ We are working with ethics groups to identify safe interventions along the follo * Twitter firehose monitor * Data integration pipeline for sources of known scams, fraud, lies, bots, propaganda, extremism, and other misinformation sources * Misinformation knowledge graph connecting accounts, posts, reports, and models -* Automated GPU / graph / machine learning pipeline +* Automated GPU / graph / machine learning pipeline: general classification (bot, community, ...) and targeted (clinical disinformation, ...) * Automated alerting & reporting pipeline * Interactive visual analytics environment for data scientists and analysts: GPU, graph, notebooks, ML, ... +* Intervention bots ## How to help @@ -46,11 +57,12 @@ We are actively seeking several forms of support: * **Volunteers**: Most immediate priority is on data engineering and advisors on marketing/public health * **Data engineers: Orchestration (Airflow, Prefect.io, Nifi, ...), streaming (Kafka, ...), graph (Neo4j, cuGraph), GPU (RAPIDS), ML (NLP libs), and databases** - * Analysts: OSINT, threat intel, campaign tracking, ... - * Data scientists: especially around graph, misinformation, neural networks, NLP, with backgrounds such as security, fraud, misinformation, marketing + * **Analysts: OSINT, threat intel, campaign tracking, ...** + * **Data scientists: especially around graph, misinformation, neural networks, NLP, with backgrounds such as security, fraud, misinformation, marketing** * Developers & designers: intelligence integrations, website for search & reports, automations, intervention bots, API * Marketing: Strategy & implementation - * Public health and communications: Especially around intervention design + * Public health and communications: Especially around safe and effective intervention design + * Legal: Risk & legal analysis for various interventions * **APIs and Data**: * Feeds & enriching APIs: Lists and intel on URLs, domains, keywords, emails, topics, blockchain accounts, social media accounts & content, clinical trials, esp. if tunable on topic diff --git a/infra/metrics/docker-compose.yml b/infra/metrics/docker-compose.yml new file mode 100644 index 0000000..1bfdb6b --- /dev/null +++ b/infra/metrics/docker-compose.yml @@ -0,0 +1,44 @@ +version: '3.3' + +networks: + lan: + external: + name: i-already-created-this + +services: + prometheus: + image: prom/prometheus + network_mode: 'bridge' + ports: + - 10005:9090 + volumes: + - /datadrive/prometheus:/etc/prometheus + - /datadrive/prometheus/storage:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + restart: always + + cadvisor: + image: google/cadvisor:latest + volumes: + - /:/rootfs:ro + - /var/run:/var/run:rw + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + ports: + - 10006:8080 + network_mode: 'bridge' + restart: always + + grafana: + image: grafana/grafana + depends_on: + - prometheus + network_mode: 'bridge' + ports: + - 10007:3000 + volumes: + - /datadrive/grafana/storage:/var/lib/grafana + - /datadrive/grafana/provisioning/:/etc/grafana/provisioning/ + restart: always diff --git a/infra/metrics/prometheus/prometheus.yml b/infra/metrics/prometheus/prometheus.yml new file mode 100644 index 0000000..f0a5b91 --- /dev/null +++ b/infra/metrics/prometheus/prometheus.yml @@ -0,0 +1,30 @@ +global: + scrape_interval: 15s + scrape_timeout: 10s + evaluation_interval: 15s + alerting: + alertmanagers: + - static_configs: + - targets: [] + scheme: http + timeout: 10s + api_version: v1 + scrape_configs: + - job_name: prometheus + honor_timestamps: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - localhost:9090 + - job_name: neo4j-prometheus + honor_timestamps: true + scrape_interval: 10s + scrape_timeout: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - 172.17.0.3:2004 \ No newline at end of file diff --git a/infra/neo4j/docker/docker-compose.yml b/infra/neo4j/docker/docker-compose.yml index f730ef6..677a052 100644 --- a/infra/neo4j/docker/docker-compose.yml +++ b/infra/neo4j/docker/docker-compose.yml @@ -1,17 +1,22 @@ +version: '3.3' + services: neo4j: image: neo4j:4.0.2-enterprise - network_mode: "bridge" + container_name: neo4j-server + network_mode: 'bridge' ports: - - "10001:7687" - - "10002:7473" - - "10003:7474" + - '10001:7687' + - '10002:7473' + - '10003:7474' + - '2004:2004' restart: unless-stopped volumes: - /datadrive/neo4j/plugins:/plugins - /datadrive/neo4j/data:/data - /datadrive/neo4j/import:/import - /datadrive/neo4j/logs:/logs + - /datadrive/neo4j/conf:/conf environment: - NEO4JLABS_PLUGINS=["apoc"] - NEO4J_AUTH=neo4j/neo123 @@ -20,6 +25,44 @@ services: - NEO4J_apoc_export_file_enabled=true - NEO4J_dbms_backup_enabled=true - NEO4J_dbms_transaction_timeout=60s + - NEO4j_apoc_trigger_enabled=true logging: options: - tag: "ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}" + tag: 'ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}' + prometheus: + image: prom/prometheus + network_mode: 'bridge' + ports: + - 10005:9090 + volumes: + - /datadrive/prometheus:/etc/prometheus + - /datadrive/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - /datadrive/prometheus/storage:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + restart: always + + cadvisor: + image: google/cadvisor:latest + volumes: + - /:/rootfs:ro + - /var/run:/var/run:rw + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + ports: + - 10006:8080 + network_mode: 'bridge' + restart: always + + grafana: + image: grafana/grafana + depends_on: + - prometheus + network_mode: 'bridge' + ports: + - 10007:3000 + volumes: + - /datadrive/grafana/storage:/var/lib/grafana + - /datadrive/grafana/provisioning/:/etc/grafana/provisioning/ + restart: always diff --git a/infra/neo4j/scripts/neo4j-indexes.cypher b/infra/neo4j/scripts/neo4j-indexes.cypher index a9605a6..f06f9eb 100644 --- a/infra/neo4j/scripts/neo4j-indexes.cypher +++ b/infra/neo4j/scripts/neo4j-indexes.cypher @@ -9,4 +9,23 @@ ON (n:Url) ASSERT n.full_url IS UNIQUE CREATE INDEX tweet_by_type FOR (n:Tweet) -ON (n.tweet_type) \ No newline at end of file +ON (n.tweet_type) + +CREATE INDEX tweet_by_hydrated +FOR (n:Tweet) +ON (n.hydrated) + +CREATE INDEX tweet_by_text +FOR (n:Tweet) +ON (n.text) + +CREATE INDEX tweet_by_created_at +FOR (n:Tweet) +ON (n.created_at) + +CREATE INDEX tweet_by_record_created_at +FOR (n:Tweet) +ON (n.record_created_at) + +CALL db.index.fulltext.createNodeIndex("tweet_by_text_fulltext",["Tweet"],["text"]) + diff --git a/infra/pipelines/docker/Dockerfile b/infra/pipelines/docker/Dockerfile new file mode 100644 index 0000000..3d70dc6 --- /dev/null +++ b/infra/pipelines/docker/Dockerfile @@ -0,0 +1,10 @@ +FROM prefecthq/prefect + +RUN pip install supervisor arrow graphistry pyarrow simplejson twarc neo4j +RUN prefect agent install local > supervisord.conf + +COPY . . +RUN prefect backend server + +RUN date > /dev/null +CMD ["./infra/pipelines/docker/entrypoint.sh"] diff --git a/infra/pipelines/docker/entrypoint.sh b/infra/pipelines/docker/entrypoint.sh new file mode 100755 index 0000000..1480ec6 --- /dev/null +++ b/infra/pipelines/docker/entrypoint.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Run prefect agent using supervisord +# supervisord + +# Register pipelines +# git fetch && git checkout wzy/dockerizePipelines + +python -m pipelines.Pipeline + +# Keep the container running +prefect agent start diff --git a/infra/prefect/docker/docker-compose.yml b/infra/prefect/docker/docker-compose.yml new file mode 100644 index 0000000..b9317f9 --- /dev/null +++ b/infra/prefect/docker/docker-compose.yml @@ -0,0 +1,92 @@ +# Copied from https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/docker-compose.yml +# +# Note: The server port cannot be overriden (issue: https://github.com/PrefectHQ/prefect/issues/2237); +# When support is added APOLLO_HOST_PORT should be set to the server port number. + +version: '3.3' + +services: + postgres: + image: 'postgres:11' + ports: + - '${POSTGRES_HOST_PORT:-5432}:5432' + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + networks: + - prefect-server + restart: 'always' + command: + - 'postgres' + # explicitly set max connections + - '-c' + - 'max_connections=150' + hasura: + image: 'hasura/graphql-engine:v1.1.0' + ports: + - '${HASURA_HOST_PORT:-3000}:3000' + command: 'graphql-engine serve' + environment: + HASURA_GRAPHQL_DATABASE_URL: ${DB_CONNECTION_URL} + HASURA_GRAPHQL_ENABLE_CONSOLE: 'true' + HASURA_GRAPHQL_SERVER_PORT: '3000' + HASURA_GRAPHQL_QUERY_PLAN_CACHE_SIZE: 100 + networks: + - prefect-server + restart: 'always' + depends_on: + - postgres + graphql: + image: 'prefecthq/server:${PREFECT_SERVER_TAG:-latest}' + ports: + - '${GRAPHQL_HOST_PORT:-4201}:4201' + command: bash -c "${PREFECT_SERVER_DB_CMD} && python src/prefect_server/services/graphql/server.py" + environment: + PREFECT_SERVER_DB_CMD: ${PREFECT_SERVER_DB_CMD:-"echo 'DATABASE MIGRATIONS SKIPPED'"} + PREFECT_SERVER__DATABASE__CONNECTION_URL: ${DB_CONNECTION_URL} + PREFECT_SERVER__HASURA__ADMIN_SECRET: ${PREFECT_SERVER__HASURA__ADMIN_SECRET:-hasura-secret-admin-secret} + PREFECT_SERVER__HASURA__HOST: hasura + networks: + - prefect-server + restart: 'always' + depends_on: + - hasura + scheduler: + image: 'prefecthq/server:${PREFECT_SERVER_TAG:-latest}' + command: 'python src/prefect_server/services/scheduler/scheduler.py' + environment: + PREFECT_SERVER__HASURA__ADMIN_SECRET: ${PREFECT_SERVER__HASURA__ADMIN_SECRET:-hasura-secret-admin-secret} + PREFECT_SERVER__HASURA__HOST: hasura + networks: + - prefect-server + restart: 'always' + depends_on: + - graphql + apollo: + image: 'prefecthq/apollo:${PREFECT_SERVER_TAG:-latest}' + ports: + - '${APOLLO_HOST_PORT:-4200}:4200' + command: 'npm run serve' + environment: + HASURA_API_URL: ${HASURA_API_URL:-http://hasura:3000/v1alpha1/graphql} + PREFECT_API_URL: ${PREFECT_API_URL:-http://graphql:4201/graphql/} + PREFECT_API_HEALTH_URL: ${PREFECT_API_HEALTH_URL:-http://graphql:4201/health} + networks: + - prefect-server + restart: 'always' + depends_on: + - graphql + ui: + image: 'prefecthq/ui:${PREFECT_SERVER_TAG:-latest}' + ports: + - '${UI_HOST_PORT:-8080}:8080' + command: '/intercept.sh' + networks: + - prefect-server + restart: 'always' + depends_on: + - apollo + +networks: + prefect-server: diff --git a/modules/DfHelper.py b/modules/DfHelper.py index f2895e7..a2040b3 100644 --- a/modules/DfHelper.py +++ b/modules/DfHelper.py @@ -3,16 +3,21 @@ from datetime import datetime import time -class DfHelper: - def __init__(self, debug=False): - self.debug=debug - +import logging +logger = logging.getLogger('DfHelper') + + +class DfHelper: + def __init__(self): + pass + def __clean_timeline_tweets(self, pdf): #response_gdf = gdf[ (gdf['in_reply_to_status_id'] > 0) | (gdf['quoted_status_id'] > 0) ].drop_duplicates(['id']) - pdf = pdf.rename(columns={'id': 'status_id', 'id_str': 'status_id_str'}) + pdf = pdf.rename(columns={'id': 'status_id', + 'id_str': 'status_id_str'}) #pdf = pdf.reset_index(drop=True) return pdf - + def normalize_parquet_dataframe(self, df): pdf = df\ .pipe(self.__clean_timeline_tweets)\ @@ -26,15 +31,20 @@ def normalize_parquet_dataframe(self, df): return pdf def __clean_datetimes(self, pdf): - if self.debug: print('cleaning datetimes...') - - pdf = pdf.assign(created_at=pd.to_datetime(pdf['created_at'])) - pdf = pdf.assign(created_date=pdf['created_at'].apply(lambda dt: dt.timestamp())) - if self.debug: print(' ...cleaned') + logger.debug('cleaning datetimes...') + try: + pdf = pdf.assign(created_at=pd.to_datetime(pdf['created_at'])) + pdf = pdf.assign(created_date=pdf['created_at'].apply( + lambda dt: dt.timestamp())) + except Exception as e: + logger.error('Error __clean_datetimes', e) + logger.error(pdf) + raise e + logger.debug(' ...cleaned') return pdf - #some reason always False - #this seems to match full_text[:2] == 'RT' + # some reason always False + # this seems to match full_text[:2] == 'RT' def __clean_retweeted(self, pdf): return pdf.assign(retweeted=pdf['retweeted_status'] != 'None') @@ -48,72 +58,120 @@ def __update_to_type(self, row): return 'original' def __tag_status_type(self, pdf): - ##only materialize required fields.. - if self.debug: print('tagging status...') + # only materialize required fields.. + logger.debug('tagging status...') pdf2 = pdf\ .assign(status_type=pdf[['is_quote_status', 'retweeted', 'in_reply_to_status_id']].apply(self.__update_to_type, axis=1)) - if self.debug: print(' ...tagged') + logger.debug(' ...tagged') return pdf2 def __flatten_status_col(self, pdf, col, status_type, prefix): - if self.debug: print('flattening %s...' % col) - if self.debug: print(' ', pdf.columns) - #retweet_status -> hash -> lookup json for hash -> pull out id/created_at/user_id - pdf_hashed = pdf.assign(hashed=pdf[col].apply(hash)) - retweets = pdf_hashed[ pdf_hashed['status_type'] == status_type ][['hashed', col]]\ - .drop_duplicates('hashed').reset_index(drop=True) - #print('sample', retweets[col].head(10), retweets[col].apply(type)) - retweets_flattened = pd.io.json.json_normalize( - retweets[col].replace("(").replace(")")\ + debug_retweets_flattened = None + debug_retweets = None + try: + logger.debug('flattening %s...', col) + logger.debug(' %s x %s: %s', len(pdf), + len(pdf.columns), pdf.columns) + if len(pdf) == 0: + logger.debug( + 'Warning: did not add mt case col output addition - pdf') + return pdf + # retweet_status -> hash -> lookup json for hash -> pull out id/created_at/user_id + pdf_hashed = pdf.assign(hashed=pdf[col].apply(hash)) + retweets = pdf_hashed[pdf_hashed['status_type'] == status_type][['hashed', col]]\ + .drop_duplicates('hashed').reset_index(drop=True) + if len(retweets) == 0: + logger.debug( + 'Warning: did not add mt case col output addition - retweets') + return pdf + #print('sample', retweets[col].head(10), retweets[col].apply(type)) + retweets_flattened = pd.io.json.json_normalize( + retweets[col].replace("(").replace(")") .apply(self.__try_load)) - if self.debug: print(' ... fixing dates') - retweets_flattened = retweets_flattened.assign( - created_at = pd.to_datetime(retweets_flattened['created_at']).apply(lambda dt: dt.timestamp), - user_id = retweets_flattened['user.id']) - if self.debug: print(' ... fixing dates') - retweets = retweets[['hashed']]\ - .assign(**{ - prefix + c: retweets_flattened[c] - for c in retweets_flattened if c in ['id', 'created_at', 'user_id'] - }) - if self.debug: print(' ... remerging') - pdf_with_flat_retweets = pdf_hashed.merge(retweets, on='hashed', how='left').drop(columns='hashed') - if self.debug: print(' ...flattened', pdf_with_flat_retweets.shape) - return pdf_with_flat_retweets + if len(retweets_flattened.columns) == 0: + logger.debug('No tweets of type %s, early exit', status_type) + return pdf + debug_retweets_flattened = retweets_flattened + debug_retweets = retweets + logger.debug(' ... fixing dates') + logger.debug('avail cols of %s x %s: %s', len(retweets_flattened), len( + retweets_flattened.columns), retweets_flattened.columns) + logger.debug(retweets_flattened) + if 'created_at' in retweets_flattened: + retweets_flattened = retweets_flattened.assign( + created_at=pd.to_datetime(retweets_flattened['created_at']).apply(lambda dt: dt.timestamp)) + if 'user.id' in retweets_flattened: + retweets_flattened = retweets_flattened.assign( + user_id=retweets_flattened['user.id']) + logger.debug(' ... fixing dates') + retweets = retweets[['hashed']]\ + .assign(**{ + prefix + c: retweets_flattened[c] + for c in retweets_flattened if c in ['id', 'created_at', 'user_id'] + }) + logger.debug(' ... remerging') + pdf_with_flat_retweets = pdf_hashed.merge( + retweets, on='hashed', how='left').drop(columns='hashed') + logger.debug(' ...flattened: %s', pdf_with_flat_retweets.shape) + return pdf_with_flat_retweets + except Exception as e: + logger.error(('Exception __flatten_status_col', e)) + logger.error(('params', col, status_type, prefix)) + # print(pdf[:10]) + logger.error('cols debug_retweets %s x %s : %s', + len(debug_retweets), len(debug_retweets.columns), debug_retweets.columns) + logger.error('--------') + logger.error(debug_retweets[:3]) + logger.error('--------') + logger.error('cols debug_retweets_flattened %s x %s : %s', + len(debug_retweets_flattened), len(debug_retweets_flattened.columns), debug_retweets_flattened.columns) + logger.error('--------') + logger.error(debug_retweets_flattened[:3]) + logger.error('--------') + logger.error(debug_retweets_flattened['created_at']) + logger.error('--------') + raise e def __flatten_retweets(self, pdf): - if self.debug: print('flattening retweets...') - pdf2 = self.__flatten_status_col(pdf, 'retweeted_status', 'retweet', 'retweet_') - if self.debug: print(' ...flattened', pdf2.shape) + logger.debug('flattening retweets...') + pdf2 = self.__flatten_status_col( + pdf, 'retweeted_status', 'retweet', 'retweet_') + logger.debug(' ...flattened', pdf2.shape) return pdf2 def __flatten_quotes(self, pdf): - if self.debug: print('flattening quotes...') - pdf2 = self.__flatten_status_col(pdf, 'quoted_status', 'retweet_quote', 'quote_') - if self.debug: print(' ...flattened', pdf2.shape) + logger.debug('flattening quotes...') + pdf2 = self.__flatten_status_col( + pdf, 'quoted_status', 'retweet_quote', 'quote_') + logger.debug(' ...flattened', pdf2.shape) return pdf2 def __flatten_users(self, pdf): - if self.debug: print('flattening users') - pdf_user_cols = pd.io.json.json_normalize(pdf['user'].replace("(").replace(")").apply(ast.literal_eval)) + logger.debug('flattening users') + pdf_user_cols = pd.io.json.json_normalize( + pdf['user'].replace("(").replace(")").apply(ast.literal_eval)) pdf2 = pdf.assign(**{ - 'user_' + c: pdf_user_cols[c] + 'user_' + c: pdf_user_cols[c] for c in pdf_user_cols if c in [ - 'id', 'screen_name', 'created_at', 'followers_count', 'friends_count', 'favourites_count', + 'id', 'screen_name', 'created_at', 'followers_count', 'friends_count', 'favourites_count', 'utc_offset', 'time_zone', 'verified', 'statuses_count', 'profile_image_url', 'location', 'name', 'description' ]}) - if self.debug: print(' ... fixing dates') - pdf2 = pdf2.assign(user_created_at=pd.to_datetime(pdf2['user_created_at']).apply(lambda dt: dt.timestamp())) - if self.debug: print(' ...flattened') + logger.debug(' ... fixing dates') + pdf2 = pdf2.assign(user_created_at=pd.to_datetime( + pdf2['user_created_at']).apply(lambda dt: dt.timestamp())) + logger.debug(' ...flattened') return pdf2 + def __flatten_entities(self, pdf): - if self.debug: print('flattening urls') - pdf_entities = pd.io.json.json_normalize(pdf['entities'].replace("(").replace(")").apply(ast.literal_eval)) - pdf['urls']=pdf_entities['urls'] - pdf['hashtags']=pdf_entities['hashtags'] - pdf['user_mentions']=pdf_entities['user_mentions'] + logger.debug('flattening urls') + pdf_entities = pd.io.json.json_normalize( + pdf['entities'].replace("(").replace(")").apply(ast.literal_eval)) + pdf['urls'] = pdf_entities['urls'] + pdf['hashtags'] = pdf_entities['hashtags'] + pdf['user_mentions'] = pdf_entities['user_mentions'] return pdf + def __try_load(self, s): try: out = ast.literal_eval(s) @@ -123,5 +181,5 @@ def __try_load(self, s): } except: if s != 0.0: - print('bad s',s) - return {} \ No newline at end of file + logger.debug('bad s', s) + return {} diff --git a/modules/DrugSynonymDataToNeo4j.py b/modules/DrugSynonymDataToNeo4j.py new file mode 100644 index 0000000..d47e75e --- /dev/null +++ b/modules/DrugSynonymDataToNeo4j.py @@ -0,0 +1,135 @@ +from neo4j import GraphDatabase +from typing import Optional +from pandas import DataFrame +from numpy import isnan +import logging +logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) +logger = logging.getLogger('ds-neo4j') + +def dict_to_property_str(properties:Optional[dict] = None) -> str: + + def property_type_checker(property_value): + if isinstance(property_value,int) or isinstance(property_value,float): + pass + elif isinstance(property_value,str): + property_value = '''"''' + property_value.replace('"',r"\"") + '''"''' + elif not property_value: + property_value = "" + return property_value + + resp:str = "" + if properties: + resp = "{" + for key in properties.keys(): + resp += """{key}:{value},""".format(key=key,value=property_type_checker(properties[key])) + resp = resp[:-1] + "}" + return resp + +def cypher_template_filler(cypher_template:str,data:dict) -> str: + return cypher_template.format(**data).replace("\n","") + +class DrugSynonymDataToNeo4j(object): + + def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="letmein", encrypted=False): + self._driver = GraphDatabase.driver(uri, auth=(user, password), encrypted=encrypted) + + def close(self): + self._driver.close() + + def upload_studies(self,studies:DataFrame): + node_merging_func = self._merge_node + with self._driver.session() as session: + logger.info("> Importing Studies Job is Started") + count_node = 0 + prev_count_node = 0 + + for study in studies.T.to_dict().values(): + node_type = "Study" + properties:dict = study + session.write_transaction(node_merging_func, node_type, properties) + count_node += 1 + if count_node > prev_count_node + 100: + prev_count_node = count_node + logger.info("> {} nodes already imported".format(count_node)) + + logger.info("> Importing Studies Job is >> Done << with {} nodes imported".format(count_node)) + + + def upload_drugs_and_synonyms(self,drug_vocab): + node_merging_func = self._merge_node + edge_merging_func = self._merge_edge + with self._driver.session() as session: + logger.info("> Importing Drugs and Synonyms Job is Started") + count_node = 0 + count_edge = 0 + prev_count_node = 0 + prev_count_edge = 0 + + for key in drug_vocab.keys(): + node_type = "Drug" + properties:dict = { + "name":key + } + + drug_id = session.write_transaction(node_merging_func, node_type, properties) + count_node += 1 + if isinstance(drug_vocab[key],list): + for synonym in drug_vocab[key]: + node_type = "Synonym" + properties:dict = { + "name":synonym + } + synonym_id = session.write_transaction(node_merging_func, node_type, properties) + count_node += 1 + + edge_type = "KNOWN_AS" + session.write_transaction(edge_merging_func, drug_id, synonym_id, edge_type) + count_edge += 1 + + if count_node > prev_count_node + 1000 or count_edge > prev_count_edge + 1000: + prev_count_node = count_node + prev_count_edge = count_edge + logger.info("> {} nodes and {} edges already imported".format(count_node,count_edge)) + + logger.info("> Importing Drugs and Synonyms Job is >> Done << with {} nodes and {} edges imported".format(count_node,count_edge)) + + @staticmethod + def _merge_node(tx, node_type, properties:Optional[dict] = None): + + + + data:dict = { + "node_type":node_type, + "properties":dict_to_property_str(properties) + } + base_cypher = """ + MERGE (n:{node_type} {properties}) + RETURN id(n) + """ + + result = tx.run(cypher_template_filler(base_cypher,data)) + return result.single()[0] + + @staticmethod + def _merge_edge(tx, from_id, to_id, edge_type, properties:Optional[dict] = None, direction = ">"): + if not direction in [">",""]: + raise ValueError + + data:dict = { + "from_id":int(from_id), + "to_id":int(to_id), + "edge_type":edge_type, + "direction":direction, + "properties":dict_to_property_str(properties) + } + base_cypher = """ + MATCH (from) + WHERE ID(from) = {from_id} + MATCH (to) + WHERE ID(to) = {to_id} + MERGE (from)-[r:{edge_type} {properties}]-{direction}(to) + RETURN id(r) + """ + result = tx.run(cypher_template_filler(base_cypher,data)) + return result.single()[0] + diff --git a/modules/FirehoseJob.py b/modules/FirehoseJob.py index 1ba8f11..a2e4dd7 100644 --- a/modules/FirehoseJob.py +++ b/modules/FirehoseJob.py @@ -1,7 +1,7 @@ ### from collections import deque, defaultdict -import cudf, datetime, gc, os, string, sys, time, uuid +import datetime, gc, os, string, sys, time, uuid import numpy as np import pandas as pd import pyarrow as pa @@ -13,6 +13,14 @@ from .TwarcPool import TwarcPool from .Neo4jDataAccess import Neo4jDataAccess +import logging +logger = logging.getLogger('fh') + +try: + import cudf +except: + logger.debug('Warning: no cudf') + ############################# ################### @@ -43,7 +51,7 @@ ('in_reply_to_user_id_str', string_dtype, None), ('is_quote_status', np.bool, None), ('lang', string_dtype, None), - ('place', string_dtype, None), + ('place', string_dtype, None), ('possibly_sensitive', np.bool_, False), ('quoted_status', string_dtype, 0.0), ('quoted_status_id', id_type, 0), @@ -96,14 +104,18 @@ # }))}) ### When dtype -> arrow ambiguious, override -KNOWN_FIELDS = [ - #[0, 'contributors', ??], +KNOWN_FIELDS = [ + [0, 'contributors', pa.string()], [1, 'coordinates', pa.string()], [2, 'created_at', pa.string()], - [3, 'display_text_range', pa.list_(pa.int64())], + + #[3, 'display_text_range', pa.list_(pa.int64())], + [3, 'display_text_range', pa.string()], + [4, 'entities', pa.string()], [5, 'extended_entities', pa.string()], #extended_entities_t ], [7, 'favorited', pa.bool_()], + [8, 'favorite_count', pa.int64()], [9, 'full_text', pa.string()], [10, 'geo', pa.string()], [11, 'id', pa.int64() ], @@ -115,56 +127,62 @@ [17, 'in_reply_to_user_id_str', pa.string() ], [18, 'is_quote_status', pa.bool_() ], [19, 'lang', pa.string() ], - # [20, 'place', pa.string()], - # [21, 'possibly_sensitive', pa.bool_()], - #[22, 'quoted_status', pa.string()], - #[23, 'quoted_status_id', pa.int64()], - #[24, 'quoted_status_id_str', pa.string()], - #[25, 'quoted_status_permalink', pa.string()], - # [26, 'retweet_count', pa.int64()], - # [27, 'retweeted', pa.bool_()], - #[28, 'retweeted_status', pa.string()], - # [29, 'scopes', pa.string()], # pa.struct({'followers': pa.bool_()})], - #[30, 'source', pa.string()], - # [31, 'truncated', pa.bool_()], - # [32, 'user', pa.string()], + [20, 'place', pa.string()], + [21, 'possibly_sensitive', pa.bool_()], + [22, 'quoted_status', pa.string()], + [23, 'quoted_status_id', pa.int64()], + [24, 'quoted_status_id_str', pa.string()], + [25, 'quoted_status_permalink', pa.string()], + [26, 'retweet_count', pa.int64()], + [27, 'retweeted', pa.bool_()], + [28, 'retweeted_status', pa.string()], + [29, 'scopes', pa.string()], + [30, 'source', pa.string()], + [31, 'truncated', pa.bool_()], + [32, 'user', pa.string()], + #[33, 'withheld_in_countries', pa.list_(pa.string())], + [33, 'withheld_in_countries', pa.string()], + + #[34, 'followers', pa.struct({'followers': pa.bool_()})] + [34, 'followers', pa.string()] ] ############################# class FirehoseJob: - + ################### MACHINE_IDS = (375, 382, 361, 372, 364, 381, 376, 365, 363, 362, 350, 325, 335, 333, 342, 326, 327, 336, 347, 332) SNOWFLAKE_EPOCH = 1288834974657 - + EXPECTED_COLS = EXPECTED_COLS KNOWN_FIELDS = KNOWN_FIELDS DROP_COLS = DROP_COLS - - - def __init__(self, creds = [], neo4j_creds = None, TWEETS_PER_PROCESS=100, TWEETS_PER_ROWGROUP=5000, save_to_neo=False, PARQUET_SAMPLE_RATE_TIME_S=None): + + + def __init__(self, creds = [], neo4j_creds = None, TWEETS_PER_PROCESS=100, TWEETS_PER_ROWGROUP=5000, save_to_neo=False, PARQUET_SAMPLE_RATE_TIME_S=None, debug=False, BATCH_LEN=100, writers = {'snappy': None}): self.queue = deque() - self.writers = { - 'snappy': None - #,'vanilla': None - } + self.writers = writers self.last_write_epoch = '' self.current_table = None - self.schema = None + self.schema = pa.schema([ + (name, t) + for (i, name, t) in KNOWN_FIELDS + ]) self.timer = Timer() - - self.twarc_pool = TwarcPool([ + self.debug = debug + + self.twarc_pool = TwarcPool([ Twarc(o['consumer_key'], o['consumer_secret'], o['access_token'], o['access_token_secret']) - for o in creds + for o in creds ]) - self.save_to_neo=save_to_neo + self.save_to_neo = save_to_neo self.TWEETS_PER_PROCESS = TWEETS_PER_PROCESS #100 self.TWEETS_PER_ROWGROUP = TWEETS_PER_ROWGROUP #100 1KB x 1000 = 1MB uncompressed parquet - self.PARQUET_SAMPLE_RATE_TIME_S = PARQUET_SAMPLE_RATE_TIME_S + self.PARQUET_SAMPLE_RATE_TIME_S = PARQUET_SAMPLE_RATE_TIME_S self.last_df = None self.last_arr = None self.last_write_arr = None @@ -172,32 +190,40 @@ def __init__(self, creds = [], neo4j_creds = None, TWEETS_PER_PROCESS=100, TWEET self.neo4j_creds = neo4j_creds + self.BATCH_LEN = BATCH_LEN + self.needs_to_flush = False self.__file_names = [] - + def __del__(self): - print('__del__') + + logger.debug('__del__') self.destroy() + def destroy(self, job_name='generic_job'): - print('flush before destroying..') + logger.debug('flush before destroying..') self.flush(job_name) - print('destroy', self.writers.keys()) + + logger.debug('destroy', self.writers.keys()) + for k in self.writers.keys(): if not (self.writers[k] is None): - print('Closing parquet writer %s' % k) + logger.debug('Closing parquet writer %s' % k) writer = self.writers[k] writer.close() - print('... sleep 1s...') + logger.debug('... sleep 1s...') time.sleep(1) self.writers[k] = None - print('... sleep 1s...') + logger.debug('... sleep 1s...') time.sleep(1) - print('... Safely closed %s' % k) + logger.debug('... Safely closed %s' % k) else: - print('Nothing to close for writer %s' % k) + + logger.debug('Nothing to close for writer %s' % k) + ################### def get_creation_time(self, id): @@ -208,25 +234,25 @@ def machine_id(self, id): def sequence_id(self, id): return id & 0b111111111111 - - + + ################### - + valid_file_name_chars = frozenset("-_%s%s" % (string.ascii_letters, string.digits)) - + def clean_file_name(self, filename): return ''.join(c for c in filename if c in FirehoseJob.valid_file_name_chars) #clean series before reaches arrow - def clean_series(self, series): + def clean_series(self, series): try: identity = lambda x: x - + series_to_json_string = (lambda series: series.apply(lambda x: json.dumps(x, ignore_nan=True))) - + ##objects: put here to skip str coercion coercions = { - 'display_text_range': identity, + 'display_text_range': series_to_json_string, 'contributors': identity, 'created_at': lambda series: series.values.astype('unicode'), 'possibly_sensitive': (lambda series: series.fillna(False)), @@ -235,8 +261,8 @@ def clean_series(self, series): 'in_reply_to_status_id': (lambda series: series.fillna(0).astype('int64')), 'in_reply_to_user_id': (lambda series: series.fillna(0).astype('int64')), 'scopes': series_to_json_string, - 'followers': identity, #(lambda series: pd.Series([str(x) for x in series.tolist()]).values.astype('unicode')), - 'withheld_in_countries': identity, + 'followers': series_to_json_string, + 'withheld_in_countries': series_to_json_string } if series.name in coercions.keys(): return coercions[series.name](series) @@ -245,9 +271,9 @@ def clean_series(self, series): else: return series except Exception as exn: - print('coerce exn on col', series.name, series.dtype) - print('first', series[:1]) - print(exn) + logger.error('coerce exn on col', series.name, series.dtype) + logger.error('first', series[:1]) + logger.error(exn) return series #clean df before reaches arrow @@ -255,42 +281,42 @@ def clean_df(self, raw_df): self.timer.tic('clean', 1000) try: new_cols = { - c: pd.Series([c_default] * len(raw_df), dtype=c_dtype) - for (c, c_dtype, c_default) in FirehoseJob.EXPECTED_COLS + c: pd.Series([c_default] * len(raw_df), dtype=c_dtype) + for (c, c_dtype, c_default) in FirehoseJob.EXPECTED_COLS if not c in raw_df } all_cols_df = raw_df.assign(**new_cols) sorted_df = all_cols_df.reindex(sorted(all_cols_df.columns), axis=1) return pd.DataFrame({c: self.clean_series(sorted_df[c]) for c in sorted_df.columns}) except Exception as exn: - print('failed clean') - print(exn) + logger.error('failed clean') + logger.error(exn) raise exn finally: self.timer.toc('clean') - - + + def folder_last(self): return self.__folder_last - + def files(self): return self.__file_names.copy() - + #TODO ////<24hour_utc>_.parquet (don't clobber..) def pq_writer(self, table, job_name='generic_job'): try: self.timer.tic('write', 1000) - + job_name = self.clean_file_name(job_name) - - folder = "firehose_data/%s" % job_name - print('make folder if not exists: %s' % folder) + + folder = "firehose_data/%s" % job_name + logger.debug('make folder if not exists: %s' % folder) os.makedirs(folder, exist_ok=True) self.__folder_last = folder - + vanilla_file_suffix = 'vanilla2.parquet' - snappy_file_suffix = 'snappy2.parquet' + snappy_file_suffix = 'snappy2.parquet' time_prefix = datetime.datetime.now().strftime("%Y_%m_%d_%H") run = 0 file_prefix = "" @@ -300,64 +326,70 @@ def pq_writer(self, table, job_name='generic_job'): run = run + 1 file_prefix = "%s/%s_b%s." % ( folder, time_prefix, run ) if run > 1: - print('Starting new batch for existing hour') + logger.debug('Starting new batch for existing hour') vanilla_file_name = file_prefix + vanilla_file_suffix snappy_file_name = file_prefix + snappy_file_suffix - + ######################################################### - - + + ######################################################### if ('vanilla' in self.writers) and ( (self.writers['vanilla'] is None) or self.last_write_epoch != file_prefix ): - print('Creating vanilla writer', vanilla_file_name) + logger.debug('Creating vanilla writer: %s', vanilla_file_name) try: #first write #os.remove(vanilla_file_name) 1 except Exception as exn: - print('Could not rm vanilla parquet', exn) + logger.debug(('Could not rm vanilla parquet', exn)) self.writers['vanilla'] = pq.ParquetWriter( - vanilla_file_name, + vanilla_file_name, schema=table.schema, compression='NONE') self.__file_names.append(vanilla_file_name) - + if ('snappy' in self.writers) and ( (self.writers['snappy'] is None) or self.last_write_epoch != file_prefix ): - print('Creating snappy writer', snappy_file_name) + logger.debug('Creating snappy writer: %s', snappy_file_name) try: #os.remove(snappy_file_name) 1 except Exception as exn: - print('Could not rm snappy parquet', exn) + logger.error(('Could not rm snappy parquet', exn)) self.writers['snappy'] = pq.ParquetWriter( - snappy_file_name, + snappy_file_name, schema=table.schema, compression={ field.name.encode(): 'SNAPPY' for field in table.schema }) self.__file_names.append(snappy_file_name) - + self.last_write_epoch = file_prefix ###################################################### - + for name in self.writers.keys(): try: - print('Writing %s (%s x %s)' % ( + logger.debug('Writing %s (%s x %s)' % ( name, table.num_rows, table.num_columns)) self.timer.tic('writing_%s' % name, 20, 1) writer = self.writers[name] writer.write_table(table) + logger.debug('========') + logger.debug(table.schema) + logger.debug('--------') + logger.debug(table.to_pandas()[:10]) + logger.debug('--------') self.timer.toc('writing_%s' % name, table.num_rows) ######### - print('######## TRANSACTING') + logger.debug('######## TRANSACTING') self.last_write_arr = table self.last_writes_arr.append(table) ######### except Exception as exn: - print('... failed to write to parquet') - print(exn) - print('######### ALL WRITTEN #######') + logger.error('... failed to write to parquet') + logger.error(exn) + raise exn + logger.debug('######### ALL WRITTEN #######') finally: self.timer.toc('write') @@ -366,7 +398,7 @@ def flush(self, job_name="generic_job"): try: if self.current_table is None or self.current_table.num_rows == 0: return - print('writing to parquet then clearing current_table..') + logger.debug('writing to parquet then clearing current_table..') deferred_pq_exn = None try: self.pq_writer(self.current_table, job_name) @@ -374,19 +406,19 @@ def flush(self, job_name="generic_job"): deferred_pq_exn = e try: if self.save_to_neo: - print('Writing to Neo4j') - Neo4jDataAccess(True, self.neo4j_creds).save_parquet_df_to_graph(self.current_table.to_pandas(), job_name) + logger.debug('Writing to Neo4j') + Neo4jDataAccess(self.debug, self.neo4j_creds).save_parquet_df_to_graph(self.current_table.to_pandas(), job_name) else: - print('Skipping Neo4j write') + logger.debug('Skipping Neo4j write') except Exception as e: - print('Neo4j write exn', e) + logger.error('Neo4j write exn', e) raise e if not (deferred_pq_exn is None): raise deferred_pq_exn finally: - print('flush clearing self.current_table') + logger.debug('flush clearing self.current_table') self.current_table = None - + def tweets_to_df(self, tweets): try: self.timer.tic('to_pandas', 1000) @@ -395,12 +427,12 @@ def tweets_to_df(self, tweets): self.last_df = df return df except Exception as exn: - print('Failed tweets->pandas') - print(exn) + logger.error('Failed tweets->pandas') + logger.error(exn) raise exn finally: self.timer.toc('to_pandas') - + def df_with_schema_to_arrow(self, df, schema): try: self.timer.tic('df_with_schema_to_arrow', 1000) @@ -412,147 +444,109 @@ def df_with_schema_to_arrow(self, df, schema): # raise Exception('ok') table = pa.Table.from_pandas(df, schema) if len(df.columns) != len(schema): - print('=========================') - print('DATA LOSS WARNING: df has cols not in schema, dropping') #reverse is an exn + logger.debug('=========================') + logger.debug('DATA LOSS WARNING: df has cols not in schema, dropping') #reverse is an exn for col_name in df.columns: hits = [field for field in schema if field.name==col_name] if len(hits) == 0: - print('-------') - print('arrow schema missing col %s ' % col_name) - print('df dtype',df[col_name].dtype) - print(df[col_name].dropna()) - print('-------') + logger.debug('-------') + logger.debug('arrow schema missing col %s ' % col_name) + logger.debug('df dtype',df[col_name].dtype) + logger.debug(df[col_name].dropna()) + logger.debug('-------') except Exception as exn: - print('============================') - print('failed nth arrow from_pandas') - print('-------') - print(exn) - print('-------') + logger.error('============================') + logger.error('failed nth arrow from_pandas') + logger.error('-------') + logger.error(exn) + logger.error('-------') try: - print('followers', df['followers'].dropna()) - print('--------') - print('coordinates', df['coordinates'].dropna()) - print('--------') - print('dtypes', df.dtypes) - print('--------') - print(df.sample(min(5, len(df)))) - print('--------') - print('arrow') - print([schema[k] for k in range(0, len(schema))]) - print('~~~~~~~~') + logger.error(('followers', df['followers'].dropna())) + logger.error('--------') + logger.error(('coordinates', df['coordinates'].dropna())) + logger.error('--------') + logger.error('dtypes: %s', df.dtypes) + logger.error('--------') + logger.error(df.sample(min(5, len(df)))) + logger.error('--------') + logger.error('arrow') + logger.error([schema[k] for k in range(0, len(schema))]) + logger.error('~~~~~~~~') if not (self.current_table is None): try: - print(self.current_table.to_pandas()[:3]) - print('----') - print([self.current_table.schema[k] for k in range(0, self.current_table.num_columns)]) + logger.error(self.current_table.to_pandas()[:3]) + logger.error('----') + logger.error([self.current_table.schema[k] for k in range(0, self.current_table.num_columns)]) except Exception as exn2: - print('cannot to_pandas print..', exn2) + logger.error(('cannot to_pandas print..', exn2)) except: 1 - print('-------') + logger.error('-------') err_file_name = 'fail_' + str(uuid.uuid1()) - print('Log failed batch and try to continue! %s' % err_file_name) + logger.error('Log failed batch and try to continue! %s' % err_file_name) df.to_csv('./' + err_file_name) raise exn for i in range(len(schema)): if not (schema[i].equals(table.schema[i])): - print('EXN: Schema mismatch on col # %s', i) - print(schema[i]) - print('-----') - print(table.schema[i]) - print('-----') + logger.error('EXN: Schema mismatch on col # %s', i) + logger.error(schema[i]) + logger.error('-----') + logger.error(table.schema[i]) + logger.error('-----') raise Exception('mismatch on col # ' % i) return table finally: self.timer.toc('df_with_schema_to_arrow') - - + + def concat_tables(self, table_old, table_new): try: self.timer.tic('concat_tables', 1000) return pa.concat_tables([table_old, table_new]) #promote.. except Exception as exn: - print('=========================') - print('Error combining arrow tables, likely new table mismatches old') - print('------- cmp') + logger.error('=========================') + logger.error('Error combining arrow tables, likely new table mismatches old') + logger.error('------- cmp') for i in range(0, table_old.num_columns): if i >= table_new.num_columns: - print('new table does not have enough columns to handle %i' % i) + logger.error('new table does not have enough columns to handle %i' % i) elif table_old.schema[i].name != table_new.schema[i].name: - print('ith col name mismatch', i, - 'old', (table_old.schema[i]), 'vs new', table_new.schema[i]) - print('------- exn') - print(exn) - print('-------') + logger.error(('ith col name mismatch', i, + 'old', (table_old.schema[i]), 'vs new', table_new.schema[i])) + logger.error('------- exn') + logger.error(exn) + logger.error('-------') raise exn finally: self.timer.toc('concat_tables') - - - def df_to_arrow(self, df): - - print('first process..') - table = pa.Table.from_pandas(df) - - print('patching lossy dtypes...') - schema = table.schema - for [i, name, field_t] in FirehoseJob.KNOWN_FIELDS: - if schema[i].name != name: - raise Exception('Mismatched index %s: %s -> %s' % ( - i, schema[i].name, name - )) - schema = schema.set(i, pa.field(name, field_t)) - - print('re-coercing..') - table_2 = pa.Table.from_pandas(df, schema) - #if not table_2.schema.equals(schema): - for i in range(0, len(schema)): - if not (schema[i].equals(table_2.schema[i])): - print('=======================') - print('EXN: schema mismatch %s' % i) - print(schema[i]) - print('-----') - print(table_2.schema[i]) - print('-----') - raise Exception('schema mismatch %s' % i) - print('Success!') - print('=========') - print('First tweets arrow schema') - for i in range(0, len(table_2.schema)): - print(i, table_2.schema[i]) - print('////////////') - return table_2 - + + def process_tweets_notify_hydrating(self): if not (self.current_table is None): self.timer.toc('tweet', self.current_table.num_rows) self.timer.tic('tweet', 40, 40) - + self.timer.tic('hydrate', 40, 40) - + # Call process_tweets_notify_hydrating() before def process_tweets(self, tweets, job_name='generic_job'): - + self.timer.toc('hydrate') - + self.timer.tic('overall_compute', 40, 40) raw_df = self.tweets_to_df(tweets) df = self.clean_df(raw_df) table = None - if self.schema is None: - table = self.df_to_arrow(df) - self.schema = table.schema - else: - try: - table = self.df_with_schema_to_arrow(df, self.schema) - except: - print('conversion failed, skipping batch...') - self.timer.toc('overall_compute') - return - + try: + table = self.df_with_schema_to_arrow(df, self.schema) + except Exception as e: + #logger.error('conversion failed, skipping batch...') + self.timer.toc('overall_compute') + raise e + self.last_arr = table if self.current_table is None: @@ -561,135 +555,149 @@ def process_tweets(self, tweets, job_name='generic_job'): self.current_table = self.concat_tables(self.current_table, table) out = self.current_table #or just table (without intermediate concats since last flush?) - + if not (self.current_table is None) \ - and ((self.current_table.num_rows > self.TWEETS_PER_ROWGROUP) or self.needs_to_flush): + and ((self.current_table.num_rows > self.TWEETS_PER_ROWGROUP) or self.needs_to_flush) \ + and self.current_table.num_rows > 0: self.flush(job_name) self.needs_to_flush = False else: 1 - #print('skipping, has table ? %s, num rows %s' % ( - # not (self.current_table is None), + #print('skipping, has table ? %s, num rows %s' % ( + # not (self.current_table is None), # 0 if self.current_table is None else self.current_table.num_rows)) self.timer.toc('overall_compute') - + return out - + def process_tweets_generator(self, tweets_generator, job_name='generic_job'): - + def flusher(tweets_batch): try: self.needs_to_flush = True return self.process_tweets(tweets_batch, job_name) - except: - print('failed processing batch, continuing...') - + except Exception as e: + #logger.debug('failed processing batch, continuing...') + raise e + tweets_batch = [] last_flush_time_s = time.time() - + try: for tweet in tweets_generator: tweets_batch.append(tweet) - + if len(tweets_batch) > self.TWEETS_PER_PROCESS: self.needs_to_flush = True elif not (self.PARQUET_SAMPLE_RATE_TIME_S is None) \ - and time.time() - last_flush_time_s >= self.PARQUET_SAMPLE_RATE_TIME_S: + and time.time() - last_flush_time_s >= self.PARQUET_SAMPLE_RATE_TIME_S: self.needs_to_flush = True last_flush_time_s = time.time() - + if self.needs_to_flush: try: yield flusher(tweets_batch) - except: - print('Write fail, continuing..') + except Exception as e: + #logger.debug('Write fail, continuing..') + raise e finally: tweets_batch = [] - print('===== PROCESSED ALL GENERATOR TASKS, FINISHING ====') + logger.debug('===== PROCESSED ALL GENERATOR TASKS, FINISHING ====') yield flusher(tweets_batch) - print('/// FLUSHED, DONE') + logger.debug('/// FLUSHED, DONE') except KeyboardInterrupt as e: - print('========== FLUSH IF SLEEP INTERRUPTED') + logger.debug('========== FLUSH IF SLEEP INTERRUPTED') self.destroy() - del fh gc.collect() - print('explicit GC...') - print('Safely exited!') - - + logger.debug('explicit GC...') + logger.debug('Safely exited!') + + ################################################################################ def process_ids(self, ids_to_process, job_name=None): self.process_tweets_notify_hydrating() - + if job_name is None: job_name = "process_ids_%s" % (ids_to_process[0] if len(ids_to_process) > 0 else "none") - hydration_statuses_df = Neo4jDataAccess(True, self.neo4j_creds)\ - .get_tweet_hydrated_status_by_id(pd.DataFrame({'id': ids_to_process})) - missing_ids = hydration_statuses_df[ hydration_statuses_df['hydrated'] != 'FULL' ]['id'].tolist() - - print('Skipping cached %s, fetching %s, of requested %s' % ( - len(ids_to_process) - len(missing_ids), - len(missing_ids), - len(ids_to_process))) - - tweets = ( tweet for tweet in self.twarc_pool.next_twarc().hydrate(missing_ids) ) - - for arr in self.process_tweets_generator(tweets, job_name): - yield arr - + for i in range(0, len(ids_to_process), self.BATCH_LEN): + ids_to_process_batch = ids_to_process[i : (i + self.BATCH_LEN)] + + logger.info('Starting batch offset %s ( + %s) of %s', i, self.BATCH_LEN, len(ids_to_process)) + + hydration_statuses_df = Neo4jDataAccess(self.debug, self.neo4j_creds)\ + .get_tweet_hydrated_status_by_id(pd.DataFrame({'id': ids_to_process_batch})) + missing_ids = hydration_statuses_df[ hydration_statuses_df['hydrated'] != 'FULL' ]['id'].tolist() + + logger.debug('Skipping cached %s, fetching %s, of requested %s' % ( + len(ids_to_process_batch) - len(missing_ids), + len(missing_ids), + len(ids_to_process_batch))) + + tweets = ( tweet for tweet in self.twarc_pool.next_twarc().hydrate(missing_ids) ) + + for arr in self.process_tweets_generator(tweets, job_name): + yield arr + def process_id_file(self, path, job_name=None): - - pdf = cudf.read_csv(path, header=None).to_pandas() - lst = pdf['0'].to_list() + + pdf = None + lst = None + try: + pdf = cudf.read_csv(path, header=None).to_pandas() + lst = pdf['0'].to_list() + except: + pdf = pd.read_csv(path, header=None) + lst = pdf[0].to_list() + if job_name is None: job_name = "id_file_%s" % path - print('loaded %s ids, hydrating..' % len(lst)) - + logger.debug('loaded %s ids, hydrating..' % len(lst)) + for arr in self.process_ids(lst, job_name): yield arr - - - def search(self,input="", job_name=None): - + + + def search(self,input="", job_name=None): + self.process_tweets_notify_hydrating() - + if job_name is None: job_name = "search_%s" % input[:20] - + tweets = (tweet for tweet in self.twarc_pool.next_twarc().search(input)) self.process_tweets_generator(tweets, job_name) - - + + def search_stream_by_keyword(self,input="", job_name=None): self.process_tweets_notify_hydrating() - + if job_name is None: job_name = "search_stream_by_keyword_%s" % input[:20] tweets = [tweet for tweet in self.twarc_pool.next_twarc().filter(track=input)] - + self.process_tweets(tweets, job_name) def search_by_location(self,input="", job_name=None): - + self.process_tweets_notify_hydrating() if job_name is None: job_name = "search_by_location_%s" % input[:20] tweets = [tweet for tweet in self.twarc_pool.next_twarc().filter(locations=input)] - + self.process_tweets(tweets, job_name) - - # + + # def user_timeline(self,input=[""], job_name=None, **kwargs): if not (type(input) == list): input = [ input ] @@ -700,28 +708,28 @@ def user_timeline(self,input=[""], job_name=None, **kwargs): job_name = "user_timeline_%s_%s" % ( len(input), '_'.join(input) ) for user in input: - print('starting user %s' % user) + logger.debug('starting user %s' % user) tweet_count = 0 - for tweet in self.twarc_pool.next_twarc().timeline(screen_name=user, **kwargs): - #print('got user', user, 'tweet', str(tweet)[:50]) + for tweet in self.twarc_pool.next_twarc().timeline(screen_name=user, **kwargs): + #logger.debug('got user', user, 'tweet', str(tweet)[:50]) self.process_tweets([tweet], job_name) tweet_count = tweet_count + 1 - print(' ... %s tweets' % tweet_count) - + logger.debug(' ... %s tweets' % tweet_count) + self.destroy() except KeyboardInterrupt as e: - print('Flushing..') + logger.debug('Flushing..') self.destroy(job_name) - print('Explicit GC') + logger.debug('Explicit GC') gc.collect() - print('Safely exited!') + logger.debug('Safely exited!') def ingest_range(self, begin, end, job_name=None): # This method is where the magic happens if job_name is None: job_name = "ingest_range_%s_to_%s" % (begin, end) - + for epoch in range(begin, end): # Move through each millisecond time_component = (epoch - FirehoseJob.SNOWFLAKE_EPOCH) << 22 for machine_id in FirehoseJob.MACHINE_IDS: # Iterate over machine ids @@ -733,4 +741,3 @@ def ingest_range(self, begin, end, job_name=None): # This method is where the m for i in range(0, self.TWEETS_PER_PROCESS): ids_to_process.append(self.queue.popleft()) self.process_ids(ids_to_process, job_name) - diff --git a/modules/IngestDrugSynonyms.py b/modules/IngestDrugSynonyms.py new file mode 100644 index 0000000..24f75e2 --- /dev/null +++ b/modules/IngestDrugSynonyms.py @@ -0,0 +1,177 @@ +import requests +import pandas as pd +import json +import sys +from pathlib import Path +import xlrd +import csv +import os +import tempfile +import numpy as np +from typing import Optional +from functools import partial +import logging +logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) +logger = logging.getLogger('ds') + + +class IngestDrugSynonyms(): + + def __init__(self,configPath:Path=Path("config.json")): + self.check_config(configPath) + self.url_international:str = os.environ["URL_INT"] if isinstance(os.environ["URL_INT"],str) else None + self.url_USA:str = os.environ["URL_USA"] if isinstance(os.environ["URL_USA"],str) else None + self.url_drugbank:str = os.environ["URL_DRUGBANK"] if isinstance(os.environ["URL_DRUGBANK"],str) else None + self.query_keywords:[] = os.environ["QUERY_KEYWORDS"].split(",") if isinstance(os.environ["QUERY_KEYWORDS"],str) else None + + @staticmethod + def check_config(configPath:Path=Path("config.json")): + def load_config(configPath:Path): + with open(configPath) as f: + config = json.load(f) + for key in config: + os.environ[key] = config[key] + rel_path:Path = Path(os.path.realpath(__file__)).parents[0] / configPath + if rel_path.exists: + load_config(rel_path) + elif configPath.exists: + load_config(configPath) + else: + logger.warning("Could not load config file from: {}".format(configPath)) + + @staticmethod + def api(query,from_study,to_study,url): + url = url.format(query,from_study,to_study) + response = requests.request("GET", url) + return response.json() + + def api_wrapper(self,query,from_study): + return self.api(query,from_study,from_study+99,self.url_USA) + + def getAllStudiesByQuery(self,query:str) -> list: + studies:list = [] + from_study = 1 + temp = self.api_wrapper(query,from_study) + nstudies = temp['FullStudiesResponse']['NStudiesFound'] + logger.info("> {} studies found by '{}' keyword".format(nstudies,query)) + if nstudies > 0: + studies = temp['FullStudiesResponse']['FullStudies'] + for study_index in range(from_study+100,nstudies,100): + temp = self.api_wrapper(query,study_index) + studies.extend(temp['FullStudiesResponse']['FullStudies']) + + return studies + + @staticmethod + def xls_handler(r): + df = pd.DataFrame() + with tempfile.NamedTemporaryFile("wb") as xls_file: + xls_file.write(r.content) + + try: + book = xlrd.open_workbook(xls_file.name,encoding_override="utf-8") + except: + book = xlrd.open_workbook(xls_file.name,encoding_override="cp1251") + + sh = book.sheet_by_index(0) + with tempfile.NamedTemporaryFile("w") as csv_file: + wr = csv.writer(csv_file, quoting=csv.QUOTE_ALL) + + for rownum in range(sh.nrows): + wr.writerow(sh.row_values(rownum)) + df = pd.read_csv(csv_file.name) + csv_file.close() + + xls_file.close() + return df + + @staticmethod + def csvzip_handler(r): + df = pd.DataFrame() + with tempfile.NamedTemporaryFile("wb",suffix='.csv.zip') as file: + file.write(r.content) + df = pd.read_csv(file.name) + file.close() + return df + + @staticmethod + def urlToDF(url:str,respHandler) -> pd.DataFrame: + r = requests.get(url, allow_redirects=True) + return respHandler(r) + + @staticmethod + def _convert_US_studies(US_studies:dict) -> pd.DataFrame: + list_of_US_studies:list = [] + for key in US_studies.keys(): + for study in US_studies[key]: + temp_dict:dict = {} + + temp_dict["trial_id"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["NCTId"] + temp_dict["study_url"] = "https://clinicaltrials.gov/show/" + temp_dict["trial_id"] + + try: + temp_dict["intervention"] = study["Study"]["ProtocolSection"]["ArmsInterventionsModule"]["ArmGroupList"]["ArmGroup"][0]["ArmGroupInterventionList"]["ArmGroupInterventionName"][0] + except: + temp_dict["intervention"] = "" + try: + temp_dict["study_type"] = study["Study"]["ProtocolSection"]["DesignModule"]["StudyType"] + except: + temp_dict["study_type"] = "" + try: + temp_dict["target_size"] = study["Study"]["ProtocolSection"]["DesignModule"]["EnrollmentInfo"]["EnrollmentCount"] + except: + temp_dict["target_size"] = "" + try: + if "OfficialTitle" in study["Study"]["ProtocolSection"]["IdentificationModule"].keys(): + temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["OfficialTitle"] + else: + temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["BriefTitle"] + except: + temp_dict["public_title"] = "" + list_of_US_studies.append(temp_dict) + US_studies_df:pd.DataFrame = pd.DataFrame(list_of_US_studies) + return US_studies_df + + def _scrapeData(self): + self.internationalstudies = self.urlToDF(self.url_international,self.xls_handler) + self.drug_vocab_df = self.urlToDF(self.url_drugbank,self.csvzip_handler) + self.all_US_studies_by_keyword:dict = {} + for key in self.query_keywords: + self.all_US_studies_by_keyword[key] = self.getAllStudiesByQuery(key) + + def _filterData(self): + self.drug_vocab_reduced = self.drug_vocab_df[['Common name', 'Synonyms']] + self.internationalstudies_reduced = self.internationalstudies[['TrialID', 'Intervention','Study type','web address','Target size', "Public title"]] + self.internationalstudies_reduced.columns = [col.replace(" ","_").lower() for col in self.internationalstudies_reduced.columns] + cols_to_replace:dict = { + "trialid":"trial_id", + "web_address":"study_url" + } + self.internationalstudies_reduced.columns = [cols_to_replace.get(n, n) for n in self.internationalstudies_reduced.columns] + + self.drug_vocab:dict = {} + for index, row in self.drug_vocab_reduced.iterrows(): + self.drug_vocab[row['Common name']] = row["Synonyms"].split("|") if isinstance(row["Synonyms"],str) else row["Synonyms"] + + self.US_studies_df = self._convert_US_studies(self.all_US_studies_by_keyword) + + self.all_studies_df = pd.concat([self.US_studies_df,self.internationalstudies_reduced]) + self.all_studies_df.drop_duplicates(subset="trial_id",inplace=True) + self.all_studies_df.reset_index(drop=True, inplace=True) + self.all_studies_df.fillna("",inplace=True) + logger.info("> {} distinct studies found".format(len(self.all_studies_df))) + + def save_data_to_fiile(self): + """Saving data option for debug purposes""" + logger.warning("Only Use it for debug purposes!!!") + self.internationalstudies.to_csv("internationalstudies.csv") + self.drug_vocab_df.to_csv("drug_vocab.csv") + with open('all_US_studies_by_keyword.json', 'w', encoding='utf-8') as f: + json.dump(self.all_US_studies_by_keyword, f, ensure_ascii=False, indent=4) + + def auto_get_and_clean_data(self): + self._scrapeData() + self._filterData() + + def create_drug_study_link(self): + pass \ No newline at end of file diff --git a/modules/IngestDrugSynonymsWF.py b/modules/IngestDrugSynonymsWF.py new file mode 100644 index 0000000..2aa3664 --- /dev/null +++ b/modules/IngestDrugSynonymsWF.py @@ -0,0 +1,10 @@ + +from IngestDrugSynonyms import IngestDrugSynonyms +from DrugSynonymDataToNeo4j import DrugSynonymDataToNeo4j + +drugSynonym = IngestDrugSynonyms() +drugSynonym.auto_get_and_clean_data() + +neo4jBridge = DrugSynonymDataToNeo4j() +neo4jBridge.upload_drugs_and_synonyms(drugSynonym.drug_vocab) +neo4jBridge.upload_studies(drugSynonym.all_studies_df) \ No newline at end of file diff --git a/modules/Neo4jDataAccess.py b/modules/Neo4jDataAccess.py index fc2d5e1..f83281f 100644 --- a/modules/Neo4jDataAccess.py +++ b/modules/Neo4jDataAccess.py @@ -1,45 +1,68 @@ -import ast, json, time +import ast +import json +import time +import re +import enum from datetime import datetime import pandas as pd -from py2neo import Graph +from neo4j import GraphDatabase, basic_auth from urllib.parse import urlparse +import logging from .DfHelper import DfHelper +logger = logging.getLogger('Neo4jDataAccess') + + class Neo4jDataAccess: - BATCH_SIZE=2000 - - def __init__(self, debug=False, neo4j_creds = None): + class NodeLabel(enum.Enum): + Tweet = 'Tweet' + Url = 'Url' + Account = 'Account' + + class RelationshipLabel(enum.Enum): + TWEETED = 'TWEETED' + MENTIONED = 'MENTIONED' + QUOTED = 'QUOTED' + REPLIED = 'REPLIED' + RETWEETED = 'RETWEETED' + INCLUDES = 'INCLUDES' + + def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"): self.creds = neo4j_creds - self.debug=debug + self.debug = debug + self.timeout = timeout + self.batch_size = batch_size self.tweetsandaccounts = """ UNWIND $tweets AS t - //Add the Tweet + //Add the Tweet MERGE (tweet:Tweet {id:t.tweet_id}) - ON CREATE SET + ON CREATE SET tweet.text = t.text, tweet.created_at = t.tweet_created_at, tweet.favorite_count = t.favorite_count, tweet.retweet_count = t.retweet_count, tweet.record_created_at = timestamp(), + tweet.job_name = t.job_name, tweet.job_id = t.job_id, tweet.hashtags = t.hashtags, tweet.hydrated = 'FULL', tweet.type = t.tweet_type - ON MATCH SET + ON MATCH SET tweet.text = t.text, tweet.favorite_count = t.favorite_count, tweet.retweet_count = t.retweet_count, tweet.record_updated_at = timestamp(), + tweet.job_name = t.job_name, tweet.job_id = t.job_id, tweet.hashtags = t.hashtags, tweet.hydrated = 'FULL', tweet.type = t.tweet_type - + //Add Account - MERGE (user:Account {id:t.user_id}) - ON CREATE SET + MERGE (user:Account {id:t.user_id}) + ON CREATE SET user.id = t.user_id, user.name = t.name, user.screen_name = t.user_screen_name, @@ -49,8 +72,9 @@ def __init__(self, debug=False, neo4j_creds = None): user.user_profile_image_url = t.user_profile_image_url, user.created_at = t.user_created_at, user.record_created_at = timestamp(), - user.job_name = t.job_id - ON MATCH SET + user.job_name = t.job_name, + user.job_id = t.job_id + ON MATCH SET user.name = t.user_name, user.screen_name = t.user_screen_name, user.followers_count = t.user_followers_count, @@ -59,246 +83,344 @@ def __init__(self, debug=False, neo4j_creds = None): user.location = t.user_location, user.created_at = t.user_created_at, user.record_updated_at = timestamp(), - user.job_name = t.job_id + user.job_name = t.job_name, + user.job_id = t.job_id //Add Reply to tweets if needed - FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END | - MERGE (retweet:Tweet {id:t.reply_tweet_id}) + FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END | + MERGE (retweet:Tweet {id:t.reply_tweet_id}) ON CREATE SET retweet.id=t.reply_tweet_id, retweet.record_created_at = timestamp(), + retweet.job_name = t.job_name, retweet.job_id = t.job_id, retweet.hydrated = 'PARTIAL' ) - + //Add QUOTE_RETWEET to tweets if needed - FOREACH(ignoreMe IN CASE WHEN t.tweet_type='QUOTE_RETWEET' THEN [1] ELSE [] END | - MERGE (quoteTweet:Tweet {id:t.quoted_status_id}) + FOREACH(ignoreMe IN CASE WHEN t.tweet_type='QUOTE_RETWEET' THEN [1] ELSE [] END | + MERGE (quoteTweet:Tweet {id:t.quoted_status_id}) ON CREATE SET quoteTweet.id=t.quoted_status_id, quoteTweet.record_created_at = timestamp(), + quoteTweet.job_name = t.job_name, quoteTweet.job_id = t.job_id, quoteTweet.hydrated = 'PARTIAL' ) - + //Add RETWEET to tweets if needed - FOREACH(ignoreMe IN CASE WHEN t.tweet_type='RETWEET' THEN [1] ELSE [] END | - MERGE (retweet:Tweet {id:t.retweet_id}) + FOREACH(ignoreMe IN CASE WHEN t.tweet_type='RETWEET' THEN [1] ELSE [] END | + MERGE (retweet:Tweet {id:t.retweet_id}) ON CREATE SET retweet.id=t.retweet_id, retweet.record_created_at = timestamp(), + retweet.job_name = t.job_name, retweet.job_id = t.job_id, retweet.hydrated = 'PARTIAL' ) """ - + self.tweeted_rel = """UNWIND $tweets AS t MATCH (user:Account {id:t.user_id}) - MATCH (tweet:Tweet {id:t.tweet_id}) - OPTIONAL MATCH (replied:Tweet {id:t.reply_tweet_id}) - OPTIONAL MATCH (quoteTweet:Tweet {id:t.quoted_status_id}) - OPTIONAL MATCH (retweet:Tweet {id:t.retweet_id}) + MATCH (tweet:Tweet {id:t.tweet_id}) + OPTIONAL MATCH (replied:Tweet {id:t.reply_tweet_id}) + OPTIONAL MATCH (quoteTweet:Tweet {id:t.quoted_status_id}) + OPTIONAL MATCH (retweet:Tweet {id:t.retweet_id}) WITH user, tweet, replied, quoteTweet, retweet - + MERGE (user)-[r:TWEETED]->(tweet) - - FOREACH(ignoreMe IN CASE WHEN tweet.type='REPLY' AND replied.id>0 THEN [1] ELSE [] END | + + FOREACH(ignoreMe IN CASE WHEN tweet.type='REPLY' AND replied.id>0 THEN [1] ELSE [] END | MERGE (tweet)-[:REPLYED]->(replied) ) - - FOREACH(ignoreMe IN CASE WHEN tweet.type='QUOTE_RETWEET' AND quoteTweet.id>0 THEN [1] ELSE [] END | + + FOREACH(ignoreMe IN CASE WHEN tweet.type='QUOTE_RETWEET' AND quoteTweet.id>0 THEN [1] ELSE [] END | MERGE (tweet)-[:QUOTED]->(quoteTweet) ) - - FOREACH(ignoreMe IN CASE WHEN tweet.type='RETWEET' AND retweet.id>0 THEN [1] ELSE [] END | + + FOREACH(ignoreMe IN CASE WHEN tweet.type='RETWEET' AND retweet.id>0 THEN [1] ELSE [] END | MERGE (tweet)-[:RETWEETED]->(retweet) ) - + """ - - self.mentions = """UNWIND $mentions AS t + + self.mentions = """UNWIND $mentions AS t MATCH (tweet:Tweet {id:t.tweet_id}) - MERGE (user:Account {id:t.user_id}) - ON CREATE SET + MERGE (user:Account {id:t.user_id}) + ON CREATE SET user.id = t.user_id, user.mentioned_name = t.name, user.mentioned_screen_name = t.user_screen_name, user.record_created_at = timestamp(), - user.job_name = t.job_id - WITH user, tweet - MERGE (tweet)-[:MENTIONED]->(user) + user.job_name = t.job_name, + user.job_id = t.job_id + WITH user, tweet + MERGE (tweet)-[:MENTIONED]->(user) """ - - self.urls = """UNWIND $urls AS t + + self.urls = """UNWIND $urls AS t MATCH (tweet:Tweet {id:t.tweet_id}) - MERGE (url:Url {full_url:t.url}) - ON CREATE SET + MERGE (url:Url {full_url:t.url}) + ON CREATE SET url.full_url = t.url, - url.job_name = t.job_id, + url.job_name = t.job_name, + url.job_id = t.job_id, url.record_created_at = timestamp(), - url.schema=t.scheme, - url.netloc=t.netloc, - url.path=t.path, - url.params=t.params, - url.query=t.query, - url.fragment=t.fragment, - url.username=t.username, - url.password=t.password, - url.hostname=t.hostname, + url.schema=t.scheme, + url.netloc=t.netloc, + url.path=t.path, + url.params=t.params, + url.query=t.query, + url.fragment=t.fragment, + url.username=t.username, + url.password=t.password, + url.hostname=t.hostname, url.port=t.port - WITH url, tweet - MERGE (tweet)-[:INCLUDES]->(url) + WITH url, tweet + MERGE (tweet)-[:INCLUDES]->(url) """ - - self.fetch_tweet_status = """UNWIND $ids AS i + + self.fetch_tweet_status = """UNWIND $ids AS i MATCH (tweet:Tweet {id:i.id}) RETURN tweet.id, tweet.hydrated """ - + self.fetch_tweet = """UNWIND $ids AS i + MATCH (tweet:Tweet {id:i.id}) + RETURN tweet + """ + def __get_neo4j_graph(self, role_type): creds = None + logging.debug('role_type: %s', role_type) if not (self.creds is None): creds = self.creds else: with open('neo4jcreds.json') as json_file: creds = json.load(json_file) - res = list(filter(lambda c: c["type"]==role_type, creds)) - if len(res): + res = list(filter(lambda c: c["type"] == role_type, creds)) + if len(res): + logging.debug("creds %s", res) creds = res[0]["creds"] - self.graph = Graph(host=creds['host'], port=creds['port'], user=creds['user'], password=creds['password']) - else: + uri = f'bolt://{creds["host"]}:{creds["port"]}' + self.graph = GraphDatabase.driver( + uri, auth=basic_auth(creds['user'], creds['password']), encrypted=False) + else: self.graph = None return self.graph - - def save_parquet_df_to_graph(self, df, job_name): - pdf = DfHelper(self.debug).normalize_parquet_dataframe(df) - if self.debug: print('Saving to Neo4j') + + def get_from_neo(self, cypher, limit=1000): + graph = self.__get_neo4j_graph('reader') + # If the limit isn't set in the traversal then add it + if not re.search('LIMIT', cypher, re.IGNORECASE): + cypher = cypher + " LIMIT " + str(limit) + with graph.session() as session: + result = session.run(cypher, timeout=self.timeout) + df = pd.DataFrame([dict(record) for record in result]) + return df.head(limit) + + def get_tweet_by_id(self, df, cols=[]): + if 'id' in df: + graph = self.__get_neo4j_graph('reader') + ids = [] + for index, row in df.iterrows(): + ids.append({'id': int(row['id'])}) + with graph.session() as session: + result = session.run( + self.fetch_tweet, ids=ids, timeout=self.timeout) + res = pd.DataFrame([dict(record) for record in result]) + logging.debug('Response info: %s rows, %s columns: %s' % + (len(res), len(res.columns), res.columns)) + pdf = pd.DataFrame() + for r in res.iterrows(): + props = {} + for k in r[1]['tweet'].keys(): + if cols: + if k in cols: + props.update({k: r[1]['tweet'][k]}) + else: + props.update({k: r[1]['tweet'][k]}) + pdf = pdf.append(props, ignore_index=True) + return pdf + else: + raise TypeError( + 'Parameter df must be a DataFrame with a column named "id" ') + + def save_enrichment_df_to_graph(self, label: NodeLabel, df: pd.DataFrame, job_name: str, job_id=None): + if not isinstance(label, self.NodeLabel): + raise TypeError('The label parameter is not of type NodeType') + + if not isinstance(df, pd.DataFrame): + raise TypeError( + 'The df parameter is not of type Pandas.DataFrame') + + idColName = 'full_url' if label == self.NodeLabel.Url else 'id' + statement = 'UNWIND $rows AS t' + statement += ' MERGE (n:' + label.value + \ + ' {' + idColName + ':t.' + idColName + '}) ' + \ + ' SET ' + + for column in df: + if not column == idColName: + statement += f' n.{column} = t.{column} ' + + graph = self.__get_neo4j_graph('writer') + with graph.session() as session: + result = session.run( + statement, rows=df.to_dict(orient='records'), timeout=self.timeout) + + def save_parquet_df_to_graph(self, df, job_name, job_id=None): + pdf = DfHelper().normalize_parquet_dataframe(df) + logging.info('Saving to Neo4j') self.__save_df_to_graph(pdf, job_name) # Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status - def get_tweet_hydrated_status_by_id(self, df, job_name='generic_job'): + def get_tweet_hydrated_status_by_id(self, df): if 'id' in df: - graph = self.__get_neo4j_graph('reader') - ids=[] + graph = self.__get_neo4j_graph('reader') + ids = [] for index, row in df.iterrows(): ids.append({'id': int(row['id'])}) - res = graph.run(self.fetch_tweet_status, ids = ids).to_data_frame() - - print('Response info: %s rows, %s columns: %s' % (len(res), len(res.columns), res.columns)) + with graph.session() as session: + result = session.run(self.fetch_tweet_status, ids=ids) + res = pd.DataFrame([dict(record) for record in result]) + logging.debug('Response info: %s rows, %s columns: %s' % + (len(res), len(res.columns), res.columns)) if len(res) == 0: return df[['id']].assign(hydrated=None) else: - res=res.rename(columns={'tweet.id': 'id', 'tweet.hydrated': 'hydrated'}) - res = df[['id']].merge(res, how='left', on='id') #ensures hydrated=None if Neo4j does not answer for id + res = res.rename( + columns={'tweet.id': 'id', 'tweet.hydrated': 'hydrated'}) + # ensures hydrated=None if Neo4j does not answer for id + res = df[['id']].merge(res, how='left', on='id') return res else: - raise Exception('Parameter df must be a DataFrame with a column named "id" ') - + logging.debug('df columns %s', df.columns) + raise Exception( + 'Parameter df must be a DataFrame with a column named "id" ') + # This saves the User and Tweet data right now - def __save_df_to_graph(self, df, job_name): - graph = self.__get_neo4j_graph('writer') - global_tic=time.perf_counter() + def __save_df_to_graph(self, df, job_name, job_id=None): + graph = self.__get_neo4j_graph('writer') + global_tic = time.perf_counter() params = [] mention_params = [] url_params = [] - tic=time.perf_counter() + tic = time.perf_counter() + logging.debug('df columns %s', df.columns) for index, row in df.iterrows(): - #determine the type of tweet - tweet_type='TWEET' - if row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] >0: - tweet_type="REPLY" - elif row["quoted_status_id"] is not None and row["quoted_status_id"] >0: - tweet_type="QUOTE_RETWEET" - elif row["retweet_id"] is not None and row["retweet_id"] >0: - tweet_type="RETWEET" - params.append({'tweet_id': row['status_id'], - 'text': row['full_text'], - 'tweet_created_at': row['created_at'].to_pydatetime(), - 'favorite_count': row['favorite_count'], - 'retweet_count': row['retweet_count'], - 'tweet_type': tweet_type, - 'job_id': job_name, - 'hashtags': self.__normalize_hashtags(row['hashtags']), - 'user_id': row['user_id'], - 'user_name': row['user_name'], - 'user_location': row['user_location'], - 'user_screen_name': row['user_screen_name'], - 'user_followers_count': row['user_followers_count'], - 'user_friends_count': row['user_friends_count'], - 'user_created_at': pd.Timestamp(row['user_created_at'], unit='s').to_pydatetime(), - 'user_profile_image_url': row['user_profile_image_url'], - 'reply_tweet_id': row['in_reply_to_status_id'], - 'quoted_status_id': row['quoted_status_id'], - 'retweet_id': row['retweet_id'], - }) - - #if there are urls then populate the url_params + # determine the type of tweet + tweet_type = 'TWEET' + if row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] > 0: + tweet_type = "REPLY" + elif "quoted_status_id" in row and row["quoted_status_id"] is not None and row["quoted_status_id"] > 0: + tweet_type = "QUOTE_RETWEET" + elif "retweet_id" in row and row["retweet_id"] is not None and row["retweet_id"] > 0: + tweet_type = "RETWEET" + try: + params.append({'tweet_id': row['status_id'], + 'text': row['full_text'], + 'tweet_created_at': row['created_at'].to_pydatetime(), + 'favorite_count': row['favorite_count'], + 'retweet_count': row['retweet_count'], + 'tweet_type': tweet_type, + 'job_id': job_id, + 'job_name': job_name, + 'hashtags': self.__normalize_hashtags(row['hashtags']), + 'user_id': row['user_id'], + 'user_name': row['user_name'], + 'user_location': row['user_location'], + 'user_screen_name': row['user_screen_name'], + 'user_followers_count': row['user_followers_count'], + 'user_friends_count': row['user_friends_count'], + 'user_created_at': pd.Timestamp(row['user_created_at'], unit='s').to_pydatetime(), + 'user_profile_image_url': row['user_profile_image_url'], + 'reply_tweet_id': row['in_reply_to_status_id'], + 'quoted_status_id': row['quoted_status_id'], + 'retweet_id': row['retweet_id'] if 'retweet_id' in row else None, + }) + except Exception as e: + logging.error('params.append exn', e) + logging.error('row', row) + raise e + + # if there are urls then populate the url_params if row['urls']: - url_params = self.__parse_urls(row, url_params, job_name) - #if there are user_mentions then populate the mentions_params + url_params = self.__parse_urls( + row, url_params, job_name, job_id) + # if there are user_mentions then populate the mentions_params if row['user_mentions']: for m in row['user_mentions']: mention_params.append({ - 'tweet_id': row['status_id'], - 'user_id': m['id'], - 'user_name': m['name'], - 'user_screen_name': m['screen_name'], - 'job_id': job_name, + 'tweet_id': row['status_id'], + 'user_id': m['id'], + 'user_name': m['name'], + 'user_screen_name': m['screen_name'], + 'job_id': job_id, + 'job_name': job_name, }) - if index % self.BATCH_SIZE == 0 and index>0: + if index % self.batch_size == 0 and index > 0: self.__write_to_neo(params, url_params, mention_params) - toc=time.perf_counter() - if self.debug: print(f"Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds") + toc = time.perf_counter() + logging.info( + f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds') params = [] mention_params = [] url_params = [] - tic=time.perf_counter() - + tic = time.perf_counter() + self.__write_to_neo(params, url_params, mention_params) - toc=time.perf_counter() - print(f"Neo4j Import Complete in {toc - global_tic:0.4f} seconds") - + toc = time.perf_counter() + logging.info( + f"Neo4j Import Complete in {toc - global_tic:0.4f} seconds") + def __write_to_neo(self, params, url_params, mention_params): - try: - tx = self.graph.begin(autocommit=False) - tx.run(self.tweetsandaccounts, tweets = params) - tx.run(self.tweeted_rel, tweets = params) - tx.run(self.mentions, mentions = mention_params) - tx.run(self.urls, urls = url_params) - tx.commit() + try: + with self.graph.session() as session: + session.run(self.tweetsandaccounts, + tweets=params, timeout=self.timeout) + session.run(self.tweeted_rel, tweets=params, + timeout=self.timeout) + session.run(self.mentions, mentions=mention_params, + timeout=self.timeout) + session.run(self.urls, urls=url_params, timeout=self.timeout) except Exception as inst: - print(type(inst)) # the exception instance - print(inst.args) # arguments stored in .args - print(inst) # __str__ allows args to be printed directly, - + logging.error('Neo4j Transaction error') + logging.error(type(inst)) # the exception instance + logging.error(inst.args) # arguments stored in .args + # __str__ allows args to be printed directly, + logging.error(inst) + raise inst + def __normalize_hashtags(self, value): if value: - hashtags=[] + hashtags = [] for h in value: hashtags.append(h['text']) return ','.join(hashtags) else: return None - - def __parse_urls(self, row, url_params, job_name): + + def __parse_urls(self, row, url_params, job_name, job_id=None): for u in row['urls']: - try: + try: parsed = urlparse(u['expanded_url']) url_params.append({ - 'tweet_id': row['status_id'], - 'url': u['expanded_url'], - 'job_id': job_name, - 'schema': parsed.scheme, - 'netloc': parsed.netloc, - 'path': parsed.path, - 'params': parsed.params, - 'query': parsed.query, - 'fragment': parsed.fragment, - 'username': parsed.username, - 'password': parsed.password, - 'hostname': parsed.hostname, - 'port': parsed.port, + 'tweet_id': row['status_id'], + 'url': u['expanded_url'], + 'job_id': job_id, + 'job_name': job_name, + 'schema': parsed.scheme, + 'netloc': parsed.netloc, + 'path': parsed.path, + 'params': parsed.params, + 'query': parsed.query, + 'fragment': parsed.fragment, + 'username': parsed.username, + 'password': parsed.password, + 'hostname': parsed.hostname, + 'port': parsed.port, }) except Exception as inst: - print(type(inst)) # the exception instance - print(inst.args) # arguments stored in .args - print(inst) # __str__ allows args to be printed directly, + logging.error(type(inst)) # the exception instance + logging.error(inst.args) # arguments stored in .args + # __str__ allows args to be printed directly, + logging.error(inst) return url_params diff --git a/modules/Timer.py b/modules/Timer.py index 720e424..199b9a3 100644 --- a/modules/Timer.py +++ b/modules/Timer.py @@ -1,5 +1,8 @@ import time +import logging +logger = logging.getLogger('Timer') + class Timer: def __init__(self): self.counters = {} @@ -42,11 +45,11 @@ def maybe_emit(self, name, show_val_per_second): counter['rolling_val_sum'] = counter['rolling_val_sum'] + sum_val if counter['print_freq'] > 0 and (k % counter['print_freq'] == 0): if sum_s > 0: - print('%s : %s / s (%s total)' % (name, sum_val / sum_s, counter['rolling_val_sum'])) + logger.info('%s : %s / s (%s total)' % (name, sum_val / sum_s, counter['rolling_val_sum'])) else: if counter['print_freq'] > 0 and (k % counter['print_freq'] == 0): lastN = counter['lastN'] sum_s = 0.0 + sum([lastN[i % n ] for i in range(max(k - n,0), k)]) - print('%s : %ss' % (name, sum_s / min(n, k + 1))) + logger.info('%s : %ss' % (name, sum_s / min(n, k + 1))) diff --git a/modules/config.json b/modules/config.json new file mode 100644 index 0000000..b5e16e9 --- /dev/null +++ b/modules/config.json @@ -0,0 +1,6 @@ +{ + "URL_USA": "https://www.clinicaltrials.gov/api/query/full_studies?expr={}&min_rnk={}&max_rnk={}&fmt=json", + "URL_INT": "https://www.who.int/docs/default-source/coronaviruse/covid-19-trials.xls", + "URL_DRUGBANK": "https://www.drugbank.ca/releases/5-1-5/downloads/all-drugbank-vocabulary", + "QUERY_KEYWORDS": "covid-19,SARS-CoV-2,coronavirus" +} \ No newline at end of file diff --git a/modules/tests/__init__.py b/modules/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/tests/requirements.txt b/modules/tests/requirements.txt new file mode 100644 index 0000000..55b033e --- /dev/null +++ b/modules/tests/requirements.txt @@ -0,0 +1 @@ +pytest \ No newline at end of file diff --git a/modules/tests/test_Neo4jDataAccess.py b/modules/tests/test_Neo4jDataAccess.py new file mode 100644 index 0000000..aba8a83 --- /dev/null +++ b/modules/tests/test_Neo4jDataAccess.py @@ -0,0 +1,159 @@ +from modules.Neo4jDataAccess import Neo4jDataAccess +from neo4j import GraphDatabase, basic_auth +import pandas as pd +import pytest +import os +import sys +from pathlib import Path + + +class TestNeo4jDataAccess: + @classmethod + def setup_class(cls): + cls.creds = [ + { + "type": "writer", + "creds": { + "host": "localhost", + "port": "7687", + "user": "neo4j", + "password": "neo4j123" + } + }, + { + "type": "reader", + "creds": { + "host": "localhost", + "port": "7687", + "user": "neo4j", + "password": "neo4j123" + } + } + ] + data = [{'tweet_id': 1, 'text': 'Tweet 1', 'hydrated': 'FULL'}, + {'tweet_id': 2, 'text': 'Tweet 2', 'hydrated': 'FULL'}, + {'tweet_id': 3, 'text': 'Tweet 3'}, + {'tweet_id': 4, 'text': 'Tweet 4', 'hydrated': 'PARTIAL'}, + {'tweet_id': 5, 'text': 'Tweet 5', 'hydrated': 'PARTIAL'}, + ] + + traversal = '''UNWIND $tweets AS t + MERGE (tweet:Tweet {id:t.tweet_id}) + ON CREATE SET + tweet.text = t.text, + tweet.hydrated = t.hydrated + ''' + res = list(filter(lambda c: c["type"] == 'writer', cls.creds)) + creds = res[0]["creds"] + uri = f'bolt://{creds["host"]}:{creds["port"]}' + graph = GraphDatabase.driver( + uri, auth=basic_auth(creds['user'], creds['password']), encrypted=False) + try: + with graph.session() as session: + session.run(traversal, tweets=data) + cls.ids = pd.DataFrame({'id': [1, 2, 3, 4, 5]}) + except Exception as err: + print(err) + + def test_get_tweet_hydrated_status_by_id(self): + df = Neo4jDataAccess( + neo4j_creds=self.creds).get_tweet_hydrated_status_by_id(self.ids) + + assert len(df) == 5 + assert df[df['id'] == 1]['hydrated'][0] == 'FULL' + assert df[df['id'] == 2]['hydrated'][1] == 'FULL' + assert df[df['id'] == 3]['hydrated'][2] == None + assert df[df['id'] == 4]['hydrated'][3] == 'PARTIAL' + assert df[df['id'] == 5]['hydrated'][4] == 'PARTIAL' + + def test_save_parquet_to_graph(self): + filename = os.path.join(os.path.dirname(__file__), + 'data/2020_03_22_02_b1.snappy2.parquet') + tdf = pd.read_parquet(filename, engine='pyarrow') + Neo4jDataAccess( + neo4j_creds=self.creds).save_parquet_df_to_graph(tdf, 'dave') + assert True + + # Test get_tweet_by_id + def test_get_tweet_by_id(self): + df = pd.DataFrame([{'id': 1}]) + df = Neo4jDataAccess( + neo4j_creds=self.creds).get_tweet_by_id(df) + assert len(df) == 1 + assert df.at[0, 'id'] == 1 + assert df.at[0, 'text'] == 'Tweet 1' + assert df.at[0, 'hydrated'] == 'FULL' + + def test_get_tweet_by_id_with_cols(self): + df = pd.DataFrame([{"id": 1}]) + df = Neo4jDataAccess( + neo4j_creds=self.creds).get_tweet_by_id(df, cols=['id', 'text']) + assert len(df) == 1 + assert len(df.columns) == 2 + assert df.at[0, 'id'] == 1 + assert df.at[0, 'text'] == 'Tweet 1' + + def test_get_tweet_by_id_wrong_parameter(self): + with pytest.raises(TypeError) as excinfo: + df = Neo4jDataAccess( + neo4j_creds=self.creds).get_tweet_by_id('test') + assert "df" in str(excinfo.value) + + def test_get_from_neo(self): + df = Neo4jDataAccess(neo4j_creds=self.creds).get_from_neo( + 'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5') + assert len(df) == 5 + assert len(df.columns) == 2 + + def test_get_from_neo_with_limit(self): + df = Neo4jDataAccess(neo4j_creds=self.creds).get_from_neo( + 'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5', limit=1) + assert len(df) == 1 + assert len(df.columns) == 2 + + def test_get_from_neo_with_limit_only(self): + df = Neo4jDataAccess(neo4j_creds=self.creds).get_from_neo( + 'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text', limit=1) + assert len(df) == 1 + assert len(df.columns) == 2 + + def test_save_enrichment_df_to_graph_wrong_parameter_types(self): + with pytest.raises(TypeError) as excinfo: + res = Neo4jDataAccess(neo4j_creds=self.creds).save_enrichment_df_to_graph( + 'test', pd.DataFrame(), 'test') + assert "label parameter" in str(excinfo.value) + with pytest.raises(TypeError) as excinfo: + res = Neo4jDataAccess(neo4j_creds=self.creds).save_enrichment_df_to_graph( + Neo4jDataAccess.NodeLabel.Tweet, [], 'test') + assert "Pandas.DataFrame" in str(excinfo.value) + + def test_save_enrichment_df_to_graph(self): + df = pd.DataFrame([{'id': 111, 'text': 'Tweet 123'}, + {'id': 222, 'text': 'Tweet 234'} + ]) + + res = Neo4jDataAccess(neo4j_creds=self.creds).save_enrichment_df_to_graph( + Neo4jDataAccess.NodeLabel.Tweet, df, 'test') + + df = Neo4jDataAccess(neo4j_creds=self.creds).get_tweet_by_id( + df['id'].to_frame()) + assert len(df) == 2 + assert df.at[0, 'text'] == 'Tweet 123' + assert df.at[1, 'text'] == 'Tweet 234' + + def test_save_enrichment_df_to_graph_new_nodes(self): + df = pd.DataFrame([{'id': 555, 'text': 'Tweet 123'}, + {'id': 666, 'text': 'Tweet 234'} + ]) + Neo4jDataAccess(neo4j_creds=self.creds).save_enrichment_df_to_graph( + Neo4jDataAccess.NodeLabel.Tweet, df, 'test') + + df = Neo4jDataAccess(neo4j_creds=self.creds).get_tweet_by_id( + df['id'].to_frame()) + assert len(df) == 2 + assert df.at[0, 'text'] == 'Tweet 123' + assert df.at[1, 'text'] == 'Tweet 234' + + +def setup_cleanup(self): + print("I want to perfrom some cleanup action!") diff --git a/pipelines/Makefile b/pipelines/Makefile new file mode 100644 index 0000000..a8d5db1 --- /dev/null +++ b/pipelines/Makefile @@ -0,0 +1,28 @@ +prefect-up : + export APOLLO_HOST_PORT=4200 \ + GRAPHQL_HOST_PORT=4201 \ + HASURA_HOST_PORT=3000 \ + POSTGRES_HOST_PORT=5432 \ + UI_HOST_PORT=8080 \ + POSTGRES_USER=prefect \ + POSTGRES_PASSWORD=test-password \ + POSTGRES_DB=prefect_server && \ + export DB_CONNECTION_URL=postgresql://$$POSTGRES_USER:$$POSTGRES_PASSWORD@postgres:$$POSTGRES_HOST_PORT/$$POSTGRES_DB \ + HASURA_API_URL=http://hasura:$$HASURA_HOST_PORT/v1alpha1/graphql \ + HASURA_WS_URL=ws://hasura:$$HASURA_HOST_PORT/v1alpha1/graphql \ + PREFECT_API_HEALTH_URL=http://graphql:$$GRAPHQL_HOST_PORT/health \ + PREFECT_API_URL=http://graphql:$$GRAPHQL_HOST_PORT/graphql/ \ + PREFECT_SERVER_DB_CMD='prefect-server database upgrade -y' && \ + docker-compose -f ../infra/prefect/docker/docker-compose.yml up -d + +agent : + cd .. && \ + prefect backend server && \ + python -m pipelines.Pipeline && \ + prefect agent start + +agent-docker : + cd .. && \ + docker run -e PREFECT__SERVER__HOST=http://host.docker.internal \ + -e PREFECT__SERVER__PORT=4200 \ + $$(docker build -f infra/pipelines/docker/Dockerfile -q .) diff --git a/pipelines/Pipeline.py b/pipelines/Pipeline.py new file mode 100644 index 0000000..77d02ce --- /dev/null +++ b/pipelines/Pipeline.py @@ -0,0 +1,201 @@ +from prefect import Flow, Client, task +from prefect.tasks.shell import ShellTask +import arrow, ast, graphistry, json, os, pprint +import pandas as pd +import numpy as np +from pathlib import Path +from modules.FirehoseJob import FirehoseJob +from datetime import timedelta, datetime +from prefect.schedules import IntervalSchedule +import prefect +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Skipped + +@task(log_stdout=True, skip_on_upstream_skip=True) +def load_creds(): + with open('twittercreds.json') as json_file: + creds = json.load(json_file) + print([{k: "zzz" for k in creds[0].keys()}]) + return creds + +@task(log_stdout=True, skip_on_upstream_skip=True) +def load_path(): + data_dirs = ['COVID-19-TweetIDs/2020-01', 'COVID-19-TweetIDs/2020-02', 'COVID-19-TweetIDs/2020-03'] + + timestamp = None + if 'backfill_timestamp' in prefect.context: + timestamp = arrow.get(prefect.context['backfill_timestamp']) + else: + timestamp = prefect.context['scheduled_start_time'] + print('TIMESTAMP = ', timestamp) + suffix = timestamp.strftime('%Y-%m-%d-%H') + + for data_dir in data_dirs: + if os.path.isdir(data_dir): + for path in Path(data_dir).iterdir(): + if path.name.endswith('{}.txt'.format(suffix)): + print(path) + return str(path) + else: + print('WARNING: not a dir', data_dir) + # TODO: (wzy) Figure out how to cancel this gracefully + raise ENDRUN(state=Skipped()) + +@task(log_stdout=True, skip_on_upstream_skip=True) +def clean_timeline_tweets(pdf): + return pdf.rename(columns={'id': 'status_id', 'id_str': 'status_id_str'}) + +@task(log_stdout=True, skip_on_upstream_skip=True) +def clean_datetimes(pdf): + print('cleaning datetimes...') + pdf = pdf.assign(created_at=pd.to_datetime(pdf['created_at'])) + pdf = pdf.assign(created_date=pdf['created_at'].apply(lambda dt: dt.timestamp())) + print(' ...cleaned') + return pdf + +#some reason always False +#this seems to match full_text[:2] == 'RT' +@task(log_stdout=True, skip_on_upstream_skip=True) +def clean_retweeted(pdf): + return pdf.assign(retweeted=pdf['retweeted_status'] != 'None') + +def update_to_type(row): + if row['is_quote_status']: + return 'retweet_quote' + if row['retweeted']: + return 'retweet' + if row['in_reply_to_status_id'] is not None and row['in_reply_to_status_id'] > 0: + return 'reply' + return 'original' + +@task(log_stdout=True, skip_on_upstream_skip=True) +def tag_status_type(pdf): + ##only materialize required fields.. + print('tagging status...') + pdf2 = pdf\ + .assign(status_type=pdf[['is_quote_status', 'retweeted', 'in_reply_to_status_id']].apply(update_to_type, axis=1)) + print(' ...tagged') + return pdf2 + +def try_load(s): + try: + out = ast.literal_eval(s) + return { + k if type(k) == str else str(k): out[k] + for k in out.keys() + } + except: + if s != 0.0: + print('bad s', s) + return {} + +def flatten_status_col(pdf, col, status_type, prefix): + print('flattening %s...' % col) + print(' ', pdf.columns) + #retweet_status -> hash -> lookup json for hash -> pull out id/created_at/user_id + pdf_hashed = pdf.assign(hashed=pdf[col].apply(hash)) + retweets = pdf_hashed[ pdf_hashed['status_type'] == status_type ][['hashed', col]]\ + .drop_duplicates('hashed').reset_index(drop=True) + retweets_flattened = pd.io.json.json_normalize( + retweets[col].replace("(").replace(")")\ + .apply(try_load)) + print(' ... fixing dates') + created_at_datetime = pd.to_datetime(retweets_flattened['created_at']) + created_at = np.full_like(created_at_datetime, np.nan, dtype=np.float64) + created_at[created_at_datetime.notnull()] = created_at_datetime[created_at_datetime.notnull()].apply(lambda dt: dt.timestamp()) + retweets_flattened = retweets_flattened.assign( + created_at = created_at, + user_id = retweets_flattened['user.id']) + retweets = retweets[['hashed']]\ + .assign(**{ + prefix + c: retweets_flattened[c] + for c in retweets_flattened if c in ['id', 'created_at', 'user_id'] + }) + print(' ... remerging') + pdf_with_flat_retweets = pdf_hashed.merge(retweets, on='hashed', how='left').drop(columns='hashed') + print(' ...flattened', pdf_with_flat_retweets.shape) + return pdf_with_flat_retweets + +@task(log_stdout=True, skip_on_upstream_skip=True) +def flatten_retweets(pdf): + print('flattening retweets...') + pdf2 = flatten_status_col(pdf, 'retweeted_status', 'retweet', 'retweet_') + print(' ...flattened', pdf2.shape) + return pdf2 + +@task(log_stdout=True, skip_on_upstream_skip=True) +def flatten_quotes(pdf): + print('flattening quotes...') + pdf2 = flatten_status_col(pdf, 'quoted_status', 'retweet_quote', 'quote_') + print(' ...flattened', pdf2.shape) + return pdf2 + +@task(log_stdout=True, skip_on_upstream_skip=True) +def flatten_users(pdf): + print('flattening users') + pdf_user_cols = pd.io.json.json_normalize(pdf['user'].replace("(").replace(")").apply(ast.literal_eval)) + pdf2 = pdf.assign(**{ + 'user_' + c: pdf_user_cols[c] + for c in pdf_user_cols if c in [ + 'id', 'screen_name', 'created_at', 'followers_count', 'friends_count', 'favourites_count', + 'utc_offset', 'time_zone', 'verified', 'statuses_count', 'profile_image_url', + 'name', 'description' + ]}) + print(' ... fixing dates') + pdf2 = pdf2.assign(user_created_at=pd.to_datetime(pdf2['user_created_at']).apply(lambda dt: dt.timestamp())) + print(' ...flattened') + return pdf2 + +@task(log_stdout=True, skip_on_upstream_skip=True) +def load_tweets(creds, path): + print(path) + fh = FirehoseJob(creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=prefect.context.get('save_to_neo', False)) + cnt = 0 + data = [] + for arr in fh.process_id_file(path, job_name="500m_COVID-REHYDRATE"): + data.append(arr.to_pandas()) + print('{}/{}'.format(len(data), len(arr))) + cnt += len(arr) + print('TOTAL: ' + str(cnt)) + data = pd.concat(data, ignore_index=True, sort=False) + if len(data) == 0: + raise ENDRUN(state=Skipped()) + return data + +@task(log_stdout=True, skip_on_upstream_skip=True) +def sample(tweets): + print('responses shape', tweets.shape) + print(tweets.columns) + print(tweets.sample(5)) + +schedule = IntervalSchedule( + # start_date=datetime(2020, 1, 20), + # interval=timedelta(hours=1), + start_date=datetime.now() + timedelta(seconds=1), + interval=timedelta(hours=1), +) + +# with Flow("Rehydration Pipeline", schedule=schedule) as flow: +with Flow("Rehydration Pipeline") as flow: + creds = load_creds() + path_list = load_path() + tweets = load_tweets(creds, path_list) + tweets = clean_timeline_tweets(tweets) + tweets = clean_datetimes(tweets) + tweets = clean_retweeted(tweets) + tweets = tag_status_type(tweets) + tweets = flatten_retweets(tweets) + tweets = flatten_quotes(tweets) + tweets = flatten_users(tweets) + + sample(tweets) + +LOCAL_MODE = False + +if LOCAL_MODE: + with prefect.context( + backfill_timestamp=datetime(2020, 2, 6, 9), + ): + flow.run() +else: + flow.register(project_name="rehydrate") diff --git a/pipelines/README.md b/pipelines/README.md new file mode 100644 index 0000000..391b17b --- /dev/null +++ b/pipelines/README.md @@ -0,0 +1,16 @@ +# Pipelines + +This folder contains the data ingestion pipelines. It uses [Prefect](http://prefect.io) as the orchestrator. + +To run the pipelines locally, we need to start the Prefect UI + server container by running `make prefect-up`. Wait for the message (it should take a few seconds) + +``` +Server ready at http://0.0.0.0:4200/ 🚀 +``` + +You can then access the Prefect UI at http://localhost:8080. + +To deploy the pipelines to Prefect, we need to run the Prefect agent (which registers flows to the Prefect server and polls for flow runs). We can either use (in a separate CLI): + +1. `make agent` (preferred for development): this script runs the Prefect agent/pipelines locally. +2. `make agent-docker` (sanity check before merging changes): this script packs a Prefect agent and this repository into a docker container, and runs them inside the container. It takes a while to build the container, but it is also what we use in production, so before merging changes into master it's a great sanity check.