diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..35c194fa2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,25 @@ +**/.dockerignore +**/Dockerfile +**/*.Dockerfile + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/env +**/.env + +**/.tox +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log + +**/node_modules + +!.git/ diff --git a/.gitignore b/.gitignore index 28c6ef875..4e9830784 100644 --- a/.gitignore +++ b/.gitignore @@ -82,7 +82,7 @@ htmlcov/ .cache nosetests.xml coverage.xml -*,cover +*.cover # Translations *.mo diff --git a/.travis.yml b/.travis.yml index 95369e3c1..fd79936ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,38 +15,6 @@ branches: - /^[0-9]+(\.[0-9]+){2}$/ -# define stages and their execution order -stages: - - name: test - - # release only in: - # - branch develop - # - branch release-#.# - # - tag #.#.# - # - never in forks or pull requests - - name: release - if: | - fork IS false AND \ - type != pull_request AND \ - ((branch = develop) OR \ - (branch =~ ^release\-[0-9]+\.[0-9]+$) OR \ - (tag =~ ^[0-9]+(\.[0-9]+){2}$)) - - # Trigger deployment on GCS - # only in: - # - branch develop - # - branch release-#.# - # - tag #.#.# - # - never in forks or pull requests - - name: deploy - if: | - fork IS false AND \ - type != pull_request AND \ - ((branch = develop) OR \ - (branch =~ ^release\-[0-9]+\.[0-9]+$) OR \ - (tag =~ ^[0-9]+(\.[0-9]+){2}$)) - - # use matrix to parallelize tests jobs: fast_finish: true @@ -67,13 +35,19 @@ jobs: stage: test env: 'TEST_MODE=integration' - - name: "Release" - stage: release - script: ./scripts/release.sh + - name: "Test deployment process" + stage: test + script: ./scripts/test_deployment.sh - - name: "Deployment" + - name: "Publish images in Docker Hub" stage: deploy - script: ./scripts/deploy.sh + script: ./scripts/release.sh + if: | + fork IS false AND \ + type != pull_request AND \ + ((branch = develop) OR \ + (branch =~ ^release\-[0-9]+\.[0-9]+$) OR \ + (tag =~ ^[0-9]+(\.[0-9]+){2}$)) install: true diff --git a/aether-client-library/.dockerignore b/aether-client-library/.dockerignore index aaca40b6a..21006af6f 100644 --- a/aether-client-library/.dockerignore +++ b/aether-client-library/.dockerignore @@ -1,19 +1,16 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log diff --git a/aether-kernel/.dockerignore b/aether-kernel/.dockerignore index aaca40b6a..21006af6f 100644 --- a/aether-kernel/.dockerignore +++ b/aether-kernel/.dockerignore @@ -1,19 +1,16 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log diff --git a/aether-kernel/Dockerfile b/aether-kernel/Dockerfile index 3042e83c6..30be24ddc 100644 --- a/aether-kernel/Dockerfile +++ b/aether-kernel/Dockerfile @@ -1,27 +1,31 @@ FROM python:3.7-slim-buster +LABEL description="Aether Kernel" \ + name="aether-kernel" \ + author="eHealth Africa" + ################################################################################ -## setup container +## set up container ################################################################################ COPY ./conf/docker/* /tmp/ RUN /tmp/setup.sh +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + ################################################################################ -## install app +## install dependencies ## copy files one by one and split commands to use docker cache ################################################################################ -WORKDIR /code - -COPY ./conf/pip /code/conf/pip +COPY --chown=aether:aether ./conf/pip /code/conf/pip RUN pip install -q --upgrade pip && \ pip install -q -r /code/conf/pip/requirements.txt - -COPY ./ /code +COPY --chown=aether:aether ./ /code ################################################################################ -## copy application version and create git revision +## create application version and revision files ################################################################################ ARG VERSION=0.0.0 @@ -30,11 +34,3 @@ ARG GIT_REVISION RUN mkdir -p /var/tmp && \ echo $VERSION > /var/tmp/VERSION && \ echo $GIT_REVISION > /var/tmp/REVISION - -################################################################################ -## last setup steps -################################################################################ - -RUN chown -R aether: /code - -ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/aether-kernel/aether/kernel/api/serializers.py b/aether-kernel/aether/kernel/api/serializers.py index 23b9da628..d59772f07 100644 --- a/aether-kernel/aether/kernel/api/serializers.py +++ b/aether-kernel/aether/kernel/api/serializers.py @@ -256,6 +256,11 @@ class SchemaDecoratorSerializer(DynamicFieldsMixin, DynamicFieldsModelSerializer source='mappings', ) + schema_name = serializers.CharField( + read_only=True, + source='schema.name', + ) + schema_definition = serializers.JSONField( read_only=True, source='schema.definition', diff --git a/aether-kernel/aether/kernel/api/views.py b/aether-kernel/aether/kernel/api/views.py index 768d3dcf5..d17f8f56a 100644 --- a/aether-kernel/aether/kernel/api/views.py +++ b/aether-kernel/aether/kernel/api/views.py @@ -45,13 +45,7 @@ from .exporter import ExporterMixin from .mapping_validation import validate_mappings -from . import ( - filters, - models, - project_artefacts, - serializers, - utils -) +from . import filters, models, project_artefacts, serializers, utils class ProjectViewSet(MtViewSetMixin, FilteredMixin, viewsets.ModelViewSet): diff --git a/aether-kernel/aether/kernel/apps.py b/aether-kernel/aether/kernel/apps.py index 5894549f2..861356475 100644 --- a/aether-kernel/aether/kernel/apps.py +++ b/aether-kernel/aether/kernel/apps.py @@ -17,8 +17,19 @@ # under the License. from django.apps import AppConfig +from django.db.models.signals import pre_migrate, post_migrate + +from aether.kernel.models import pre_migrate_signal, post_migrate_signal class Config(AppConfig): name = 'aether.kernel' verbose_name = 'Aether Kernel' + + def ready(self): + super(Config, self).ready() + + # https://docs.djangoproject.com/en/2.2/ref/signals/#management-signals + # enables the migration signals that will recreate the views + pre_migrate.connect(pre_migrate_signal, sender=self) + post_migrate.connect(post_migrate_signal, sender=self) diff --git a/aether-kernel/aether/kernel/models.py b/aether-kernel/aether/kernel/models.py new file mode 100644 index 000000000..0fcb4db1b --- /dev/null +++ b/aether-kernel/aether/kernel/models.py @@ -0,0 +1,111 @@ +# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The purpose of this file is to tell django that there are models linked to +# this module then it runs the `pre_migrate` and `post_migrate` signals, +# otherwise they will be ignored. + +from django.db import connection +from django.conf import settings + + +_DROP_VIEWS = ''' + DROP VIEW IF EXISTS kernel_entity_vw CASCADE; + DROP VIEW IF EXISTS kernel_schema_vw CASCADE; + DROP VIEW IF EXISTS multitenancy_mtinstance_vw CASCADE; +''' + +_CREATE_MT_VIEW_YES = ''' + CREATE OR REPLACE VIEW multitenancy_mtinstance_vw AS + SELECT realm, instance_id + FROM multitenancy_mtinstance + ; +''' + +_CREATE_MT_VIEW_NOT = f''' + CREATE OR REPLACE VIEW multitenancy_mtinstance_vw AS + SELECT '{settings.NO_MULTITENANCY_REALM}' AS realm, id AS instance_id + FROM kernel_project + ; +''' + +_CREATE_SCHEMA_VIEW = ''' + CREATE OR REPLACE VIEW kernel_schema_vw AS + SELECT + GREATEST(sd.modified, s.modified) AS modified, + + sd.id AS schemadecorator_id, + sd.name AS schemadecorator_name, + + s.id AS schema_id, + s.name AS schema_name, + s.definition AS schema_definition, + s.revision AS schema_revision, + + mt.realm AS realm, + + (s.family = sd.project_id::text) AS is_identity + + FROM kernel_schemadecorator AS sd + INNER JOIN kernel_schema AS s + ON sd.schema_id = s.id + INNER JOIN multitenancy_mtinstance_vw AS mt + ON sd.project_id = mt.instance_id + ORDER BY 1 ASC + ; +''' + +_CREATE_ENTITY_VIEW = ''' + CREATE OR REPLACE VIEW kernel_entity_vw AS + SELECT + e.id, + e.revision, + e.payload, + e.modified, + e.status, + + s.schemadecorator_id, + s.schemadecorator_name, + s.schema_name, + s.schema_id, + s.schema_revision, + s.realm + + FROM kernel_entity AS e + INNER JOIN kernel_schema_vw AS s + ON e.schemadecorator_id = s.schemadecorator_id + ORDER BY e.modified ASC + ; +''' + + +def pre_migrate_signal(**kwargs): + with connection.cursor() as cursor: + cursor.execute(_DROP_VIEWS) + + +def post_migrate_signal(**kwargs): + if settings.MULTITENANCY: + create_sql = _CREATE_MT_VIEW_YES + else: + create_sql = _CREATE_MT_VIEW_NOT + create_sql += _CREATE_SCHEMA_VIEW + create_sql += _CREATE_ENTITY_VIEW + + with connection.cursor() as cursor: + cursor.execute(create_sql) diff --git a/aether-kernel/conf/pip/requirements.txt b/aether-kernel/conf/pip/requirements.txt index 99cd3cecb..bc8e335db 100644 --- a/aether-kernel/conf/pip/requirements.txt +++ b/aether-kernel/conf/pip/requirements.txt @@ -13,11 +13,11 @@ ################################################################################ aether.python==1.0.17 -aether.sdk==1.2.20 +aether.sdk==1.2.21 attrs==19.3.0 autopep8==1.5 -boto3==1.12.2 -botocore==1.15.2 +boto3==1.12.22 +botocore==1.15.22 cachetools==4.0.0 certifi==2019.11.28 cffi==1.14.0 @@ -25,21 +25,21 @@ chardet==3.0.4 configparser==4.0.2 coreapi==2.3.3 coreschema==0.0.4 -coverage==5.0.3 +coverage==5.0.4 cryptography==2.8 -decorator==4.4.1 -Django==2.2.10 +decorator==4.4.2 +Django==2.2.11 django-autofixture==0.12.1 django-cacheops==4.2 django-cleanup==4.0.0 django-cors-headers==3.2.1 django-debug-toolbar==2.2 django-filter==2.2.0 -django-minio-storage==0.3.5 +django-minio-storage==0.3.7 django-model-utils==4.0.0 django-prometheus==2.0.0 django-redis-sessions==0.6.1 -django-silk==4.0.0 +django-silk==4.0.1 django-storages==1.9.1 django-uwsgi==0.2.2 djangorestframework==3.11.0 @@ -53,27 +53,27 @@ flake8==3.7.9 flake8-quotes==2.1.1 funcy==1.14 google-api-core==1.16.0 -google-auth==1.11.2 +google-auth==1.11.3 google-cloud-core==1.3.0 google-cloud-storage==1.26.0 google-resumable-media==0.5.0 googleapis-common-protos==1.51.0 gprof2dot==2019.11.30 -idna==2.8 +idna==2.9 importlib-metadata==1.5.0 inflection==0.3.1 itypes==1.1.0 jdcal==1.4.1 Jinja2==2.11.1 -jmespath==0.9.4 -jsonpath-ng==1.4.3 +jmespath==0.9.5 +jsonpath-ng==1.5.1 jsonschema==3.2.0 lxml==4.5.0 MarkupSafe==1.1.1 mccabe==0.6.1 -minio==5.0.7 +minio==5.0.8 openpyxl==3.0.3 -packaging==20.1 +packaging==20.3 ply==3.11 prometheus-client==0.7.1 protobuf==3.11.3 @@ -81,9 +81,9 @@ psycopg2-binary==2.8.4 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycodestyle==2.5.0 -pycparser==2.19 +pycparser==2.20 pyflakes==2.1.1 -Pygments==2.5.2 +Pygments==2.6.1 pyOpenSSL==19.1.0 pyparsing==2.4.6 pyrsistent==0.15.7 @@ -91,17 +91,17 @@ python-dateutil==2.8.1 python-json-logger==0.1.11 pytz==2019.3 redis==3.4.1 -requests==2.22.0 +requests==2.23.0 rsa==4.0 ruamel.yaml==0.16.10 ruamel.yaml.clib==0.2.0 s3transfer==0.3.3 -sentry-sdk==0.14.1 +sentry-sdk==0.14.2 six==1.14.0 -spavro==1.1.22 -sqlparse==0.3.0 +spavro==1.1.23 +sqlparse==0.3.1 tblib==1.6.0 uritemplate==3.0.1 urllib3==1.25.8 uWSGI==2.0.18 -zipp==3.0.0 +zipp==3.1.0 diff --git a/aether-kernel/sql/create_readonly_user.py b/aether-kernel/sql/create_readonly_user.py deleted file mode 100755 index 5e1b2d467..000000000 --- a/aether-kernel/sql/create_readonly_user.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python - -# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org -# -# See the NOTICE file distributed with this work for additional information -# regarding copyright ownership. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -''' -Create a readonly user in the kernel database. - -Background: aether producers can query the kernel database via psycopg2, -bypassing the safety checks django provides. They only need read permissions -and should therefore use a readonly user. - -This script is intended to run in the `setup_db` block of `entrypoint.sh`, with -access to all environment variables available in that context. -''' - -import os -import logging -import psycopg2 -import sys - -from psycopg2 import sql - -DEBUG = os.environ.get('DEBUG') -LEVEL = logging.DEBUG if DEBUG else logging.WARNING - -logging.basicConfig(level=LEVEL) -logger = logging.getLogger(__name__) - -# Create a readonly user with username "{role_id}" if none exists. -# Grant read permission for relevant tables. - - -def main(ro_user, ro_password): - dbname = os.environ['DB_NAME'] - host = os.environ['PGHOST'] - port = os.environ['PGPORT'] - root_user = os.environ['PGUSER'] - root_password = os.environ['PGPASSWORD'] - - logger.debug('db://{user}:{pwrd}@{host}:{port}/{dbname}'.format( - user=root_user, - pwrd=(len(root_password) * '*'), - host=host, - port=port, - dbname=dbname, - )) - postgres_credentials = { - 'dbname': dbname, - 'host': host, - 'port': port, - 'user': root_user, - 'password': root_password, - } - with open('/code/sql/query.sql', 'r') as fp: - CREATE_READONLY_USER = fp.read() - - with psycopg2.connect(**postgres_credentials) as conn: - cursor = conn.cursor() - query = sql.SQL(CREATE_READONLY_USER).format( - database=sql.Identifier(dbname), - role_id=sql.Identifier(ro_user), - role_literal=sql.Literal(ro_user), - password=sql.Literal(ro_password), - ) - cursor.execute(query) - - -if __name__ == '__main__': - try: - args = sys.argv[1:] - main(*args) - except Exception as e: - logger.error(str(e)) - sys.exit(1) diff --git a/aether-kernel/sql/query.sql b/aether-kernel/sql/query.sql deleted file mode 100644 index bc36b1dad..000000000 --- a/aether-kernel/sql/query.sql +++ /dev/null @@ -1,90 +0,0 @@ -DO $$ -BEGIN - IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = {role_literal}) - THEN - CREATE ROLE {role_id} - WITH LOGIN ENCRYPTED - PASSWORD {password} - INHERIT NOSUPERUSER NOCREATEDB NOCREATEROLE NOREPLICATION; - END IF; -END -$$ LANGUAGE plpgsql; - - -DO $$ -BEGIN - IF NOT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'multitenancy_mtinstance') - THEN - DROP VIEW IF EXISTS multitenancy_mtinstance CASCADE; - CREATE VIEW multitenancy_mtinstance AS - SELECT '-' AS realm, - p.id AS instance_id - FROM kernel_project; - END IF; -END -$$ LANGUAGE plpgsql; - - -DROP VIEW IF EXISTS kernel_entity_vw CASCADE; -DROP VIEW IF EXISTS kernel_schema_vw CASCADE; - -CREATE VIEW kernel_schema_vw AS - SELECT - GREATEST(sd.modified, s.modified) AS modified, - - sd.id AS schemadecorator_id, - sd.name AS schemadecorator_name, - - s.id AS schema_id, - s.name AS schema_name, - s.definition AS schema_definition, - s.revision AS schema_revision, - - mt.realm AS realm, - - (s.family = sd.project_id::text) AS is_identity - - FROM kernel_schemadecorator AS sd - INNER JOIN kernel_schema AS s - ON sd.schema_id = s.id - INNER JOIN multitenancy_mtinstance AS mt - ON sd.project_id = mt.instance_id - ORDER BY 1 ASC - ; - -CREATE VIEW kernel_entity_vw AS - SELECT - e.id, - e.revision, - e.payload, - e.modified, - e.status, - - s.schemadecorator_id, - s.schemadecorator_name, - s.schema_name, - s.schema_id, - s.schema_revision, - s.realm - - FROM kernel_entity AS e - INNER JOIN kernel_schema_vw AS s - ON e.schemadecorator_id = s.schemadecorator_id - ORDER BY e.modified ASC - ; - - -REVOKE ALL PRIVILEGES ON DATABASE {database} FROM {role_id} CASCADE; -GRANT CONNECT ON DATABASE {database} TO {role_id}; - -GRANT USAGE ON SCHEMA public TO {role_id}; - -GRANT SELECT ON kernel_entity TO {role_id}; -GRANT SELECT ON kernel_mapping TO {role_id}; -GRANT SELECT ON kernel_schemadecorator TO {role_id}; -GRANT SELECT ON kernel_schema TO {role_id}; -GRANT SELECT ON kernel_project TO {role_id}; -GRANT SELECT ON multitenancy_mtinstance TO {role_id}; - -GRANT SELECT ON kernel_schema_vw TO {role_id}; -GRANT SELECT ON kernel_entity_vw TO {role_id}; diff --git a/aether-odk-module/.dockerignore b/aether-odk-module/.dockerignore index aaca40b6a..21006af6f 100644 --- a/aether-odk-module/.dockerignore +++ b/aether-odk-module/.dockerignore @@ -1,19 +1,16 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log diff --git a/aether-odk-module/Dockerfile b/aether-odk-module/Dockerfile index 3042e83c6..520733665 100644 --- a/aether-odk-module/Dockerfile +++ b/aether-odk-module/Dockerfile @@ -1,27 +1,31 @@ FROM python:3.7-slim-buster +LABEL description="Aether ODK Module" \ + name="aether-odk" \ + author="eHealth Africa" + ################################################################################ -## setup container +## set up container ################################################################################ COPY ./conf/docker/* /tmp/ RUN /tmp/setup.sh +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + ################################################################################ -## install app +## install dependencies ## copy files one by one and split commands to use docker cache ################################################################################ -WORKDIR /code - -COPY ./conf/pip /code/conf/pip +COPY --chown=aether:aether ./conf/pip /code/conf/pip RUN pip install -q --upgrade pip && \ pip install -q -r /code/conf/pip/requirements.txt - -COPY ./ /code +COPY --chown=aether:aether ./ /code ################################################################################ -## copy application version and create git revision +## create application version and revision files ################################################################################ ARG VERSION=0.0.0 @@ -30,11 +34,3 @@ ARG GIT_REVISION RUN mkdir -p /var/tmp && \ echo $VERSION > /var/tmp/VERSION && \ echo $GIT_REVISION > /var/tmp/REVISION - -################################################################################ -## last setup steps -################################################################################ - -RUN chown -R aether: /code - -ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/aether-odk-module/conf/pip/requirements.txt b/aether-odk-module/conf/pip/requirements.txt index d35c82965..0c5b27523 100644 --- a/aether-odk-module/conf/pip/requirements.txt +++ b/aether-odk-module/conf/pip/requirements.txt @@ -12,26 +12,26 @@ # ################################################################################ -aether.sdk==1.2.20 +aether.sdk==1.2.21 autopep8==1.5 -boto3==1.12.2 -botocore==1.15.2 +boto3==1.12.22 +botocore==1.15.22 cachetools==4.0.0 certifi==2019.11.28 cffi==1.14.0 chardet==3.0.4 configparser==4.0.2 -coverage==5.0.3 +coverage==5.0.4 cryptography==2.8 -Django==2.2.10 +Django==2.2.11 django-cacheops==4.2 django-cleanup==4.0.0 django-cors-headers==3.2.1 django-debug-toolbar==2.2 -django-minio-storage==0.3.5 +django-minio-storage==0.3.7 django-prometheus==2.0.0 django-redis-sessions==0.6.1 -django-silk==4.0.0 +django-silk==4.0.1 django-storages==1.9.1 django-uwsgi==0.2.2 djangorestframework==3.11.0 @@ -43,42 +43,42 @@ flake8-quotes==2.1.1 FormEncode==1.3.1 funcy==1.14 google-api-core==1.16.0 -google-auth==1.11.2 +google-auth==1.11.3 google-cloud-core==1.3.0 google-cloud-storage==1.26.0 google-resumable-media==0.5.0 googleapis-common-protos==1.51.0 gprof2dot==2019.11.30 -idna==2.8 +idna==2.9 Jinja2==2.11.1 -jmespath==0.9.4 +jmespath==0.9.5 linecache2==1.0.0 lxml==4.5.0 MarkupSafe==1.1.1 mccabe==0.6.1 -minio==5.0.7 +minio==5.0.8 prometheus-client==0.7.1 protobuf==3.11.3 psycopg2-binary==2.8.4 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycodestyle==2.5.0 -pycparser==2.19 +pycparser==2.20 pyflakes==2.1.1 -Pygments==2.5.2 +Pygments==2.6.1 pyOpenSSL==19.1.0 python-dateutil==2.8.1 python-json-logger==0.1.11 pytz==2019.3 pyxform==0.15.1 redis==3.4.1 -requests==2.22.0 +requests==2.23.0 rsa==4.0 s3transfer==0.3.3 -sentry-sdk==0.14.1 +sentry-sdk==0.14.2 six==1.14.0 -spavro==1.1.22 -sqlparse==0.3.0 +spavro==1.1.23 +sqlparse==0.3.1 tblib==1.6.0 traceback2==1.4.0 unicodecsv==0.14.1 diff --git a/aether-producer/.dockerignore b/aether-producer/.dockerignore index aaca40b6a..21006af6f 100644 --- a/aether-producer/.dockerignore +++ b/aether-producer/.dockerignore @@ -1,19 +1,16 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log diff --git a/aether-producer/Dockerfile b/aether-producer/Dockerfile index 7aa1a2b9d..e525a2fc3 100644 --- a/aether-producer/Dockerfile +++ b/aether-producer/Dockerfile @@ -1,7 +1,11 @@ FROM python:3.7-slim-buster +LABEL description="Aether Kafka Producer" \ + name="aether-producer" \ + author="eHealth Africa" + ################################################################################ -## setup container +## set up container ################################################################################ RUN apt-get update -qq && \ @@ -10,23 +14,24 @@ RUN apt-get update -qq && \ --allow-downgrades \ --allow-remove-essential \ --allow-change-held-packages \ - install gcc + install gcc && \ + useradd -ms /bin/false aether + +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] ################################################################################ ## install app ## copy files one by one and split commands to use docker cache ################################################################################ -WORKDIR /code - -COPY ./conf/pip /code/conf/pip +COPY --chown=aether:aether ./conf/pip /code/conf/pip RUN pip install -q --upgrade pip && \ pip install -q -r /code/conf/pip/requirements.txt - -COPY ./ /code +COPY --chown=aether:aether ./ /code ################################################################################ -## copy application version and create git revision +## create application version and revision files ################################################################################ ARG VERSION=0.0.0 @@ -35,13 +40,3 @@ ARG GIT_REVISION RUN mkdir -p /var/tmp && \ echo $VERSION > /var/tmp/VERSION && \ echo $GIT_REVISION > /var/tmp/REVISION - -################################################################################ -## last setup steps -################################################################################ - -# create user to run container (avoid root user) -RUN useradd -ms /bin/false aether -RUN chown -R aether: /code - -ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/aether-producer/conf/pip/requirements.txt b/aether-producer/conf/pip/requirements.txt index 7b26d7ec2..6376ee7e9 100644 --- a/aether-producer/conf/pip/requirements.txt +++ b/aether-producer/conf/pip/requirements.txt @@ -16,7 +16,7 @@ attrs==19.3.0 certifi==2019.11.28 cffi==1.14.0 chardet==3.0.4 -Click==7.0 +click==7.1.1 confluent-kafka==1.3.0 cryptography==2.8 entrypoints==0.3 @@ -25,29 +25,29 @@ flake8-quotes==2.1.1 Flask==1.1.1 gevent==1.4.0 greenlet==0.4.15 -idna==2.8 +idna==2.9 importlib-metadata==1.5.0 itsdangerous==1.1.0 Jinja2==2.11.1 MarkupSafe==1.1.1 mccabe==0.6.1 more-itertools==8.2.0 -packaging==20.1 +packaging==20.3 pluggy==0.13.1 -psycogreen==1.0.1 +psycogreen==1.0.2 psycopg2-binary==2.8.4 py==1.8.1 pycodestyle==2.5.0 -pycparser==2.19 +pycparser==2.20 pyflakes==2.1.1 pyOpenSSL==19.1.0 pyparsing==2.4.6 -pytest==5.3.5 -requests==2.22.0 +pytest==5.4.1 +requests==2.23.0 six==1.14.0 -spavro==1.1.22 -SQLAlchemy==1.3.13 +spavro==1.1.23 +SQLAlchemy==1.3.15 urllib3==1.25.8 wcwidth==0.1.8 Werkzeug==1.0.0 -zipp==3.0.0 +zipp==3.1.0 diff --git a/aether-producer/producer/__init__.py b/aether-producer/producer/__init__.py index f070fdece..8c56d8285 100644 --- a/aether-producer/producer/__init__.py +++ b/aether-producer/producer/__init__.py @@ -16,70 +16,26 @@ # specific language governing permissions and limitations # under the License. -from gevent import monkey, sleep -# need to patch sockets to make requests async -monkey.patch_all() # noqa -import psycogreen.gevent -psycogreen.gevent.patch_psycopg() # noqa - -import ast -import concurrent -from datetime import datetime -import enum -from functools import wraps -import io -import json -import logging -import os import signal import socket -import spavro.schema -import sys -import traceback +from functools import wraps -from confluent_kafka import Producer -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import AdminClient from flask import Flask, Response, request, jsonify + import gevent from gevent.pool import Pool from gevent.pywsgi import WSGIServer -import psycopg2 -from psycopg2 import sql -from psycopg2.extras import DictCursor -from spavro.datafile import DataFileWriter, DataFileReader -from spavro.io import DatumWriter, DatumReader -from spavro.io import validate - -from producer import db -from producer.db import Offset -from producer.db import KERNEL_DB as POSTGRES -from producer.settings import Settings - - -class KafkaStatus(enum.Enum): - SUBMISSION_PENDING = 1 - SUBMISSION_FAILURE = 2 - SUBMISSION_SUCCESS = 3 - - -def apply_kafka_security_settings(settings, kafka_settings, mode='SASL_PLAINTEXT'): - kafka_settings['bootstrap.servers'] = settings.get('kafka_url') - if mode and mode.lower() == 'sasl_plaintext': - # Let Producer use Kafka SU to produce - kafka_settings['security.protocol'] = 'SASL_PLAINTEXT' - kafka_settings['sasl.mechanisms'] = 'SCRAM-SHA-256' - kafka_settings['sasl.username'] = \ - settings.get('KAFKA_SU_USER') - kafka_settings['sasl.password'] = \ - settings.get('KAFKA_SU_PW') - elif mode and mode.lower() == 'sasl_ssl': - kafka_settings['security.protocol'] = 'SASL_SSL' - kafka_settings['sasl.mechanisms'] = 'PLAIN' - kafka_settings['sasl.username'] = \ - settings.get('KAFKA_SU_USER') - kafka_settings['sasl.password'] = \ - settings.get('KAFKA_SU_PW') - return kafka_settings + +from producer.db import init as init_offset_db +from producer.settings import KAFKA_SETTINGS, SETTINGS, LOG_LEVEL, get_logger +from producer.topic import KafkaStatus, TopicStatus, TopicManager + +# How to access Kernel: API (default) | DB +if SETTINGS.get('kernel_access_type', 'api').lower() != 'db': + from producer.kernel_api import KernelAPIClient as KernelClient +else: + from producer.kernel_db import KernelDBClient as KernelClient class ProducerManager(object): @@ -89,42 +45,34 @@ class ProducerManager(object): # Spawns a TopicManager for each schema type in Kernel # TopicManager registers own eventloop greenlet (update_kafka) with ProducerManager - SCHEMAS_STR = 'SELECT * FROM kernel_schema_vw' - - def __init__(self, settings): - self.settings = settings + def __init__(self): # Start Signal Handlers self.killed = False signal.signal(signal.SIGTERM, self.kill) signal.signal(signal.SIGINT, self.kill) gevent.signal(signal.SIGTERM, self.kill) + # Turn on Flask Endpoints # Get Auth details from env - self.admin_name = settings.get('PRODUCER_ADMIN_USER') - self.admin_password = settings.get('PRODUCER_ADMIN_PW') + self.admin_name = SETTINGS.get_required('producer_admin_user') + self.admin_password = SETTINGS.get_required('producer_admin_pw') self.serve() self.add_endpoints() - # Initialize Offsetdb + + # Initialize Offset db, Kernel and Kafka clients self.init_db() + self.kernel_client = KernelClient() + self.kafka_admin_client = AdminClient(KAFKA_SETTINGS) + # Clear objects and start - self.kernel = None - self.kafka = KafkaStatus.SUBMISSION_PENDING - self.get_admin_client() + self.kafka_status = KafkaStatus.SUBMISSION_PENDING self.topic_managers = {} self.run() - def get_admin_client(self): - kafka_settings = apply_kafka_security_settings( - self.settings, - {}, - self.settings.get('kafka_security'), - ) - self.kafka_admin_client = AdminClient(kafka_settings) - def keep_alive_loop(self): # Keeps the server up in case all other threads join at the same time. while not self.killed: - sleep(1) + gevent.sleep(1) def run(self): self.threads = [] @@ -136,7 +84,7 @@ def run(self): def kill(self, *args, **kwargs): # Stops HTTP service and flips stop switch, which is read by greenlets - self.app.logger.warn('Shutting down gracefully') + self.logger.warn('Shutting down gracefully') self.http.stop() self.http.close() self.worker_pool.kill() @@ -144,25 +92,22 @@ def kill(self, *args, **kwargs): def safe_sleep(self, dur): # keeps shutdown time low by yielding during sleep and checking if killed. - for x in range(dur): + for x in range(int(dur)): if not self.killed: - sleep(1) + gevent.sleep(1) # Connectivity # see if kafka's port is available def kafka_available(self): - kafka_ip, kafka_port = self.settings['kafka_url'].split(':') + kafka_ip, kafka_port = SETTINGS.get_required('kafka_url').split(':') kafka_port = int(kafka_port) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((kafka_ip, kafka_port)) - except (InterruptedError, - ConnectionRefusedError, - socket.gaierror) as rce: - self.logger.debug( - 'Could not connect to Kafka on url: %s:%s' % (kafka_ip, kafka_port)) - self.logger.debug('Connection problem: %s' % rce) + except (InterruptedError, ConnectionRefusedError, socket.gaierror) as rce: + self.logger.debug(f'Could not connect to Kafka on url: {kafka_ip}:{kafka_port}') + self.logger.debug(f'Connection problem: {rce}') return False return True @@ -175,6 +120,7 @@ def broker_info(self): res['brokers'].append('{} (controller)'.format(b)) else: res['brokers'].append('{}'.format(b)) + for t in iter(md.topics.values()): t_str = [] if t.error is not None: @@ -197,9 +143,9 @@ def broker_info(self): except Exception as err: return {'error': f'{err}'} - # Connect to sqlite + # Connect to offset def init_db(self): - db.init() + init_offset_db() self.logger.info('OffsetDB initialized') # Main Schema Loop @@ -211,60 +157,31 @@ def check_schemas(self): while not self.killed: schemas = [] try: - schemas = self.get_schemas() - self.kernel = datetime.now().isoformat() + schemas = self.kernel_client.get_schemas() except Exception as err: - self.kernel = None - self.logger.error(f'no database connection: {err}') - sleep(1) + self.logger.error(f'No Kernel connection: {err}') + gevent.sleep(1) continue + for schema in schemas: - _name = schema['schema_name'] + name = schema['schema_name'] realm = schema['realm'] - schema_name = f'{realm}.{_name}' + schema_name = f'{realm}.{name}' if schema_name not in self.topic_managers.keys(): - self.logger.info( - 'Topic connected: %s' % schema_name) - self.topic_managers[schema_name] = TopicManager( - self, schema, realm) + self.logger.info(f'Topic connected: {schema_name}') + self.topic_managers[schema_name] = TopicManager(self, schema, realm) else: topic_manager = self.topic_managers[schema_name] if topic_manager.schema_changed(schema): topic_manager.update_schema(schema) - self.logger.debug( - 'Schema %s updated' % schema_name) + self.logger.debug(f'Schema {schema_name} updated') else: - self.logger.debug( - 'Schema %s unchanged' % schema_name) + self.logger.debug(f'Schema {schema_name} unchanged') + # Time between checks for schema change - self.safe_sleep(self.settings['sleep_time']) + self.safe_sleep(SETTINGS.get('sleep_time', 1)) self.logger.debug('No longer checking schemas') - def get_schemas(self): - name = 'schemas_query' - query = sql.SQL(ProducerManager.SCHEMAS_STR) - try: - # needs to be quick(ish) - promise = POSTGRES.request_connection(1, name) - conn = promise.get() - cursor = conn.cursor(cursor_factory=DictCursor) - cursor.execute(query) - for row in cursor: - yield {key: row[key] for key in row.keys()} - - except psycopg2.OperationalError as pgerr: - self.logger.critical( - 'Could not access db to get topic size: %s' % pgerr) - return -1 - finally: - try: - POSTGRES.release(name, conn) - except UnboundLocalError: - self.logger.error( - f'{name} could not release a' - ' connection it never received.' - ) - # Flask Functions def add_endpoints(self): @@ -277,39 +194,23 @@ def add_endpoints(self): self.register('rebuild', self.request_rebuild) def register(self, route_name, fn): - self.app.add_url_rule('/%s' % route_name, route_name, view_func=fn) + self.app.add_url_rule(f'/{route_name}', route_name, view_func=fn) def serve(self): self.app = Flask('AetherProducer') # pylint: disable=invalid-name - self.logger = self.app.logger - try: - handler = self.logger.handlers[0] - except IndexError: - handler = logging.StreamHandler() - self.logger.addHandler(handler) - handler.setFormatter(logging.Formatter( - '%(asctime)s [Producer] %(levelname)-8s %(message)s')) - log_level = logging.getLevelName(self.settings - .get('log_level', 'DEBUG')) - self.logger.setLevel(log_level) - # self.logger.setLevel(logging.INFO) - if log_level == 'DEBUG': + self.logger = get_logger('Producer', self.app.logger) + if LOG_LEVEL == 'DEBUG': self.app.debug = True - self.app.config['JSONIFY_PRETTYPRINT_REGULAR'] = self.settings \ + + self.app.config['JSONIFY_PRETTYPRINT_REGULAR'] = SETTINGS \ .get('flask_settings', {}) \ .get('pretty_json_status', False) - pool_size = self.settings \ - .get('flask_settings', {}) \ - .get('max_connections', 3) - server_ip = self.settings \ - .get('server_ip', '') - server_port = int(self.settings - .get('server_port', 9005)) + + server_ip = SETTINGS.get('server_ip', '') + server_port = int(SETTINGS.get('server_port', 5005)) + pool_size = SETTINGS.get('flask_settings', {}).get('max_connections', 3) self.worker_pool = Pool(pool_size) - self.http = WSGIServer( - (server_ip, server_port), - self.app.wsgi_app, spawn=self.worker_pool - ) + self.http = WSGIServer((server_ip, server_port), self.app.wsgi_app, spawn=self.worker_pool) self.http.start() # Basic Auth implementation @@ -339,12 +240,13 @@ def request_healthcheck(self): @requires_auth def request_status(self): status = { - 'kernel_connected': self.kernel is not None, # This is a real object + 'kernel_mode': self.kernel_client.mode(), + 'kernel_last_check': self.kernel_client.last_check, + 'kernel_last_check_error': self.kernel_client.last_check_error, 'kafka_container_accessible': self.kafka_available(), 'kafka_broker_information': self.broker_info(), - # This is just a status flag - 'kafka_submission_status': str(self.kafka), - 'topics': {k: v.get_status() for k, v in self.topic_managers.items()} + 'kafka_submission_status': str(self.kafka_status), # This is just a status flag + 'topics': {k: v.get_status() for k, v in self.topic_managers.items()}, } with self.app.app_context(): return jsonify(**status) @@ -353,6 +255,7 @@ def request_status(self): def request_topics(self): if not self.topic_managers: return Response({}) + status = {k: v.get_topic_size() for k, v in self.topic_managers.items()} with self.app.app_context(): return jsonify(**status) @@ -371,12 +274,14 @@ def request_resume(self): def request_rebuild(self): return self.handle_topic_command(request, TopicStatus.REBUILDING) + @requires_auth def handle_topic_command(self, request, status): topic = request.args.get('topic') if not topic: - return Response(f'A topic must be specified', 422) + return Response('A topic must be specified', 422) if not self.topic_managers.get(topic): return Response(f'Bad topic name {topic}', 422) + manager = self.topic_managers[topic] if status is TopicStatus.PAUSED: fn = manager.pause @@ -384,556 +289,16 @@ def handle_topic_command(self, request, status): fn = manager.resume if status is TopicStatus.REBUILDING: fn = manager.rebuild + try: res = fn() if not res: return Response(f'Operation failed on {topic}', 500) + return Response(f'Success for status {status} on {topic}', 200) except Exception as err: return Response(f'Operation failed on {topic} with: {err}', 500) -class TopicStatus(enum.Enum): - INITIALIZING = 0 # Started by not yet operational - PAUSED = 1 # Paused - LOCKED = 2 # Paused by system and non-resumable via API until sys unlock - REBUILDING = 3 # Topic is being rebuilt - NORMAL = 4 # Topic is operating normally - ERROR = 5 - - -class TopicManager(object): - - # Creates a long running job on TopicManager.update_kafka - - # Changes detection query - NEW_STR = ''' - SELECT id, modified - FROM kernel_entity_vw - WHERE modified > {modified} - AND schema_name = {schema_name} - AND realm = {realm} - LIMIT 1; - ''' - - # Count how many unique (controlled by kernel) messages should currently be in this topic - COUNT_STR = ''' - SELECT COUNT(id) - FROM kernel_entity_vw - WHERE schema_name = {schema_name} - AND realm = {realm}; - ''' - - # Changes pull query - QUERY_STR = ''' - SELECT * - FROM kernel_entity_vw - WHERE modified > {modified} - AND schema_name = {schema_name} - AND realm = {realm} - ORDER BY modified ASC - LIMIT {limit}; - ''' - - def __init__(self, server_handler, schema, realm): - self.context = server_handler - self.logger = self.context.logger - self.name = schema['schema_name'] - self.realm = realm - self.offset = '' - self.operating_status = TopicStatus.INITIALIZING - self.limit = self.context.settings.get('postgres_pull_limit', 100) - self.status = { - 'last_errors_set': {}, - 'last_changeset_status': {} - } - self.change_set = {} - self.successful_changes = [] - self.failed_changes = {} - self.wait_time = self.context.settings.get('sleep_time', 2) - self.window_size_sec = self.context.settings.get('window_size_sec', 3) - pg_requires = ['user', 'dbname', 'port', 'host', 'password'] - self.pg_creds = {key: self.context.settings.get( - 'postgres_%s' % key) for key in pg_requires} - self.kafka_failure_wait_time = self.context.settings.get( - 'kafka_failure_wait_time', 10) - try: - topic_base = self.context.settings \ - .get('topic_settings', {}) \ - .get('name_modifier', '%s') \ - % self.name - self.topic_name = f'{self.realm}.{topic_base}' - except Exception: # Bad Name - self.logger.critical(('invalid name_modifier using topic name for topic: %s.' - ' Update configuration for' - ' topic_settings.name_modifier') % self.name) - # This is a failure which could cause topics to collide. We'll kill the producer - # so the configuration can be updated. - sys.exit(1) - self.update_schema(schema) - self.get_producer() - # Spawn worker and give to pool. - self.logger.debug(f'Spawning kafka update thread: {self.topic_name}') - self.context.threads.append(gevent.spawn(self.update_kafka)) - self.logger.debug(f'Checking for existence of topic {self.topic_name}') - while not self.check_topic(): - if self.create_topic(): - break - self.logger.debug(f'Waiting 30 seconds to retry creation of T:{self.topic_name}') - self.context.safe_sleep(30) - self.operating_status = TopicStatus.NORMAL - - def check_topic(self): - metadata = self.producer.list_topics() - topics = [t for t in metadata.topics.keys()] - if self.topic_name in topics: - self.logger.debug(f'Topic {self.topic_name} already exists.') - return True - self.logger.debug(f'Topic {self.name} does not exist. current topics: {topics}') - return False - - def create_topic(self): - self.logger.debug(f'Trying to create topic {self.topic_name}') - kadmin = self.context.kafka_admin_client - topic_config = self.context.settings.get('kafka_settings', {}).get('default.topic.config') - partitions = int(self.context.settings.get('kafka_default_topic_partitions', 1)) - replicas = int(self.context.settings.get('kafka_default_topic_replicas', 1)) - topic = NewTopic( - self.topic_name, - num_partitions=partitions, - replication_factor=replicas, - config=topic_config - ) - fs = kadmin.create_topics([topic]) - # future must return before timeout - for f in concurrent.futures.as_completed(iter(fs.values()), timeout=60): - e = f.exception() - if not e: - self.logger.info(f'Created topic {self.name}') - return True - else: - self.logger.critical(f'Topic {self.name} could not be created: {e}') - return False - - def get_producer(self): - self.kafka_settings = apply_kafka_security_settings( - self.context.settings, - {}, - self.context.settings.get('kafka_security'), - ) - self.producer = Producer(**self.kafka_settings) - self.logger.debug(f'Producer for {self.name} started...') - - # API Calls to Control Topic - - def pause(self): - # Stops sending of data on this topic until resume is called or Producer restarts. - if self.operating_status is not TopicStatus.NORMAL: - self.logger.info( - f'Topic {self.name} could not pause, status: {self.operating_status}.') - return False - self.logger.info(f'Topic {self.name} is pausing.') - self.operating_status = TopicStatus.PAUSED - return True - - def resume(self): - # Resume sending data after pausing. - if self.operating_status is not TopicStatus.PAUSED: - self.logger.info( - f'Topic {self.name} could not resume, status: {self.operating_status}.') - return False - self.logger.info(f'Topic {self.name} is resuming.') - self.operating_status = TopicStatus.NORMAL - return True - - # Functions to rebuilt this topic - - def rebuild(self): - # API Call - self.logger.warn(f'Topic {self.name} is being REBUIT!') - # kick off rebuild process - self.context.threads.append(gevent.spawn(self.handle_rebuild)) - return True - - def handle_rebuild(self): - # greened background task to handle rebuilding of topic - self.operating_status = TopicStatus.REBUILDING - tag = f'REBUILDING {self.name}:' - self.logger.info(f'{tag} waiting' - + f' {self.wait_time *1.5}(sec) for inflight ops to resolve') - self.context.safe_sleep(int(self.wait_time * 1.5)) - self.logger.info(f'{tag} Deleting Topic') - self.producer = None - ok = self.delete_this_topic() - if not ok: - self.logger.critical(f'{tag} FAILED. Topic will not resume.') - self.operating_status = TopicStatus.LOCKED - return - self.logger.warn(f'{tag} Resetting Offset.') - self.set_offset('') - self.logger.info(f'{tag} Rebuilding Topic Producer') - self.producer = Producer(**self.kafka_settings) - self.logger.warn(f'{tag} Wipe Complete. /resume to complete operation.') - self.operating_status = TopicStatus.PAUSED - - def delete_this_topic(self): - kadmin = self.context.kafka_admin_client - fs = kadmin.delete_topics([self.name], operation_timeout=60) - future = fs.get(self.name) - for x in range(60): - if not future.done(): - if (x % 5 == 0): - self.logger.debug(f'REBUILDING {self.name}: Waiting for future to complete') - sleep(1) - else: - return True - return False - - # Postgres Facing Polls and handlers - - def updates_available(self): - - modified = '' if not self.offset else self.offset # "" evals to < all strings - query = sql.SQL(TopicManager.NEW_STR).format( - modified=sql.Literal(modified), - schema_name=sql.Literal(self.name), - realm=sql.Literal(self.realm) - ) - try: - promise = POSTGRES.request_connection(1, self.name) # Medium priority - conn = promise.get() - cursor = conn.cursor(cursor_factory=DictCursor) - cursor.execute(query) - return sum([1 for i in cursor]) > 0 - except psycopg2.OperationalError as pgerr: - self.logger.critical( - 'Could not access Database to look for updates: %s' % pgerr) - return False - finally: - try: - POSTGRES.release(self.name, conn) - except UnboundLocalError: - self.logger.error(f'{self.name} could not release a connection it never received.') - - def get_time_window_filter(self, query_time): - # You can't always trust that a set from kernel made up of time window - # start < _time < end is complete if nearlyequal(_time, now()). - # Sometimes rows belonging to that set would show up a couple mS after - # the window had closed and be dropped. Our solution was to truncate sets - # based on the insert time and now() to provide a buffer. - TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' - - def fn(row): - commited = datetime.strptime(row.get('modified')[:26], TIME_FORMAT) - lag_time = (query_time - commited).total_seconds() - if lag_time > self.window_size_sec: - return True - elif lag_time < -30.0: - # Sometimes fractional negatives show up. More than 30 seconds is an issue though. - self.logger.critical( - 'INVALID LAG INTERVAL: %s. Check time settings on server.' % lag_time) - _id = row.get('id') - self.logger.debug('WINDOW EXCLUDE: ID: %s, LAG: %s' % - (_id, lag_time)) - return False - return fn - - def get_db_updates(self): - modified = '' if not self.offset else self.offset # "" evals to < all strings - query = sql.SQL(TopicManager.QUERY_STR).format( - modified=sql.Literal(modified), - schema_name=sql.Literal(self.name), - limit=sql.Literal(self.limit), - realm=sql.Literal(self.realm) - ) - query_time = datetime.now() - - try: - promise = POSTGRES.request_connection(2, self.name) # Lowest priority - conn = promise.get() - cursor = conn.cursor(cursor_factory=DictCursor) - cursor.execute(query) - window_filter = self.get_time_window_filter(query_time) - return [{key: row[key] for key in row.keys()} for row in cursor if window_filter(row)] - except psycopg2.OperationalError as pgerr: - self.logger.critical( - 'Could not access Database to look for updates: %s' % pgerr) - return [] - finally: - try: - POSTGRES.release(self.name, conn) - except UnboundLocalError: - self.logger.error(f'{self.name} could not release a connection it never received.') - - def get_topic_size(self): - query = sql.SQL(TopicManager.COUNT_STR).format( - schema_name=sql.Literal(self.name), - realm=sql.Literal(self.realm) - ) - try: - promise = POSTGRES.request_connection(0, self.name) # needs to be quick - conn = promise.get() - cursor = conn.cursor(cursor_factory=DictCursor) - cursor.execute(query) - size = [{key: row[key] for key in row.keys()} for row in cursor][0] - self.logger.debug(f'''Reporting requested size for {self.name} of {size['count']}''') - return size - except psycopg2.OperationalError as pgerr: - self.logger.critical( - 'Could not access db to get topic size: %s' % pgerr) - return -1 - finally: - try: - POSTGRES.release(self.name, conn) - except UnboundLocalError: - self.logger.error(f'{self.name} could not release a connection it never received.') - - def update_schema(self, schema_obj): - self.schema_obj = self.parse_schema(schema_obj) - self.schema = spavro.schema.parse(json.dumps(self.schema_obj)) - - def parse_schema(self, schema_obj): - # We split this method from update_schema because schema_obj as it is can not - # be compared for differences. literal_eval fixes this. As such, this is used - # by the schema_changed() method. - # schema_obj is a nested OrderedDict, which needs to be stringified - return ast.literal_eval(json.dumps(schema_obj['schema_definition'])) - - def schema_changed(self, schema_candidate): - # for use by ProducerManager.check_schemas() - return self.parse_schema(schema_candidate) != self.schema_obj - - def get_status(self): - # Updates inflight status and returns to Flask called - self.status['operating_status'] = str(self.operating_status) - self.status['inflight'] = [i for i in self.change_set.keys()] - return self.status - - # Callback function registered with Kafka Producer to acknowledge receipt - def kafka_callback(self, err=None, msg=None, _=None, **kwargs): - if err: - self.logger.error('ERROR %s', [err, msg, kwargs]) - with io.BytesIO() as obj: - obj.write(msg.value()) - reader = DataFileReader(obj, DatumReader()) - for message in reader: - _id = message.get('id') - if err: - self.logger.debug('NO-SAVE: %s in topic %s | err %s' % - (_id, self.topic_name, err.name())) - self.failed_changes[_id] = err - else: - self.logger.debug('SAVE: %s in topic %s' % - (_id, self.topic_name)) - self.successful_changes.append(_id) - - def update_kafka(self): - # Main update loop - # Monitors postgres for changes via TopicManager.updates_available - # Consumes updates to the Posgres DB via TopicManager.get_db_updates - # Sends new messages to Kafka - # Registers message callback (ok or fail) to TopicManager.kafka_callback - # Waits for all messages to be accepted or timeout in TopicManager.wait_for_kafka - self.logger.debug(f'Topic {self.name}: Initializing') - while self.operating_status is TopicStatus.INITIALIZING: - if self.context.killed: - return - self.logger.debug(f'Waiting for topic {self.name} to initialize...') - self.context.safe_sleep(self.wait_time) - pass - while not self.context.killed: - if self.operating_status is not TopicStatus.NORMAL: - self.logger.debug( - f'Topic {self.name} not updating, status: {self.operating_status}' - + f', waiting {self.wait_time}(sec)') - self.context.safe_sleep(self.wait_time) - continue - - if not self.context.kafka_available(): - self.logger.debug('Kafka Container not accessible, waiting.') - self.context.safe_sleep(self.wait_time) - continue - - self.offset = self.get_offset() - if not self.updates_available(): - self.logger.debug('No updates') - self.context.safe_sleep(self.wait_time) - continue - try: - self.logger.debug('Getting Changeset for %s' % self.name) - self.change_set = {} - new_rows = self.get_db_updates() - if not new_rows: - self.context.safe_sleep(self.wait_time) - continue - end_offset = new_rows[-1].get('modified') - except Exception as pge: - self.logger.error( - 'Could not get new records from kernel: %s' % pge) - self.context.safe_sleep(self.wait_time) - continue - - try: - with io.BytesIO() as bytes_writer: - writer = DataFileWriter( - bytes_writer, DatumWriter(), self.schema, codec='deflate') - - for row in new_rows: - _id = row['id'] - msg = row.get('payload') - modified = row.get('modified') - if validate(self.schema, msg): - # Message validates against current schema - self.logger.debug( - 'ENQUEUE MSG TOPIC: %s, ID: %s, MOD: %s' % ( - self.name, - _id, - modified - )) - self.change_set[_id] = row - writer.append(msg) - else: - # Message doesn't have the proper format for the current schema. - self.logger.critical( - 'SCHEMA_MISMATCH:NOT SAVED! TOPIC:%s, ID:%s' % (self.name, _id)) - - writer.flush() - raw_bytes = bytes_writer.getvalue() - - self.producer.poll(0) - self.producer.produce( - self.topic_name, - raw_bytes, - callback=self.kafka_callback - ) - self.producer.flush() - self.wait_for_kafka( - end_offset, failure_wait_time=self.kafka_failure_wait_time) - - except Exception as ke: - self.logger.error('error in Kafka save: %s' % ke) - self.logger.error(traceback.format_exc()) - self.context.safe_sleep(self.wait_time) - - def wait_for_kafka(self, end_offset, timeout=10, iters_per_sec=10, failure_wait_time=10): - # Waits for confirmation of message receipt from Kafka before moving to next changeset - # Logs errors and status to log and to web interface - - sleep_time = timeout / (timeout * iters_per_sec) - change_set_size = len(self.change_set) - errors = {} - for i in range(timeout * iters_per_sec): - - # whole changeset failed; systemic failure likely; sleep it off and try again - if len(self.failed_changes) >= change_set_size: - self.handle_kafka_errors( - change_set_size, all_failed=True, failure_wait_time=failure_wait_time) - self.clear_changeset() - self.logger.info( - 'Changeset not saved; likely broker outage, sleeping worker for %s' % self.name) - self.context.safe_sleep(failure_wait_time) - return # all failed; ignore changeset - - # All changes were saved - elif len(self.successful_changes) == change_set_size: - - self.logger.debug( - 'All changes saved ok in topic %s.' % self.name) - break - - # Remove successful and errored changes - for _id, err in self.failed_changes.items(): - try: - del self.change_set[_id] - except KeyError: - pass # could have been removed on previous iter - for _id in self.successful_changes: - try: - del self.change_set[_id] - except KeyError: - pass # could have been removed on previous iter - - # All changes registered - if len(self.change_set) == 0: - break - - sleep(sleep_time) - - # Timeout reached or all messages returned ( and not all failed ) - - self.status['last_changeset_status'] = { - 'changes': change_set_size, - 'failed': len(self.failed_changes), - 'succeeded': len(self.successful_changes), - 'timestamp': datetime.now().isoformat() - } - if errors: - self.handle_kafka_errors(change_set_size, all_failed=False) - self.clear_changeset() - # Once we're satisfied, we set the new offset past the processed messages - self.context.kafka = KafkaStatus.SUBMISSION_SUCCESS - self.set_offset(end_offset) - # Sleep so that elements passed in the current window become eligible - self.context.safe_sleep(self.window_size_sec) - - def handle_kafka_errors(self, change_set_size, all_failed=False, failure_wait_time=10): - # Errors in saving data to Kafka are handled and logged here - errors = {} - for _id, err in self.failed_changes.items(): - # accumulate error types - error_type = str(err.name()) - errors[error_type] = errors.get(error_type, 0) + 1 - - last_error_set = { - 'changes': change_set_size, - 'errors': errors, - 'outcome': 'RETRY', - 'timestamp': datetime.now().isoformat() - } - - if not all_failed: - # Collect Error types for reporting - for _id, err in self.failed_changes.items(): - self.logger.critical('PRODUCER_FAILURE: T: %s ID %s , ERR_MSG %s' % ( - self.name, _id, err.name())) - dropped_messages = change_set_size - len(self.successful_changes) - errors['NO_REPLY'] = dropped_messages - len(self.failed_changes) - last_error_set['failed'] = len(self.failed_changes), - last_error_set['succeeded'] = len(self.successful_changes), - last_error_set['outcome'] = 'MSGS_DROPPED : %s' % dropped_messages, - - self.status['last_errors_set'] = last_error_set - if all_failed: - self.context.kafka = KafkaStatus.SUBMISSION_FAILURE - return - - def clear_changeset(self): - self.failed_changes = {} - self.successful_changes = [] - self.change_set = {} - - def get_offset(self): - # Get current offset from Database - offset = Offset.get_offset(self.name) - if offset: - self.logger.debug('Got offset for %s | %s' % - (self.name, offset)) - return offset - else: - self.logger.debug( - 'Could not get offset for %s it is a new type' % (self.name)) - # No valid offset so return None; query will use empty string which is < any value - return None - - def set_offset(self, offset): - # Set a new offset in the database - new_offset = Offset.update(self.name, offset) - self.logger.debug('new offset for %s | %s' % - (self.name, new_offset)) - self.status['offset'] = new_offset - - def main(): - file_path = os.environ.get('PRODUCER_SETTINGS_FILE') - settings = Settings(file_path) - ProducerManager(settings) + ProducerManager() diff --git a/aether-producer/producer/db.py b/aether-producer/producer/db.py index a9c450a57..e1bf7405e 100644 --- a/aether-producer/producer/db.py +++ b/aether-producer/producer/db.py @@ -16,209 +16,34 @@ # specific language governing permissions and limitations # under the License. -from gevent import monkey, sleep -# need to patch sockets to make requests async -monkey.patch_all() # noqa -import psycogreen.gevent -psycogreen.gevent.patch_psycopg() # noqa - from datetime import datetime -import logging -import sys -import os import signal -import uuid +import sys import gevent +from gevent import monkey, sleep from gevent.event import AsyncResult from gevent.queue import PriorityQueue, Queue +# need to patch sockets to make requests async +monkey.patch_all() # noqa +import psycogreen.gevent +psycogreen.gevent.patch_psycopg() # noqa + import psycopg2 from psycopg2 import sql from psycopg2.extras import DictCursor -from sqlalchemy import Column, String -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import create_engine +from sqlalchemy import Column, String, create_engine from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool -from producer.settings import Settings - -FILE_PATH = os.path.dirname(os.path.realpath(__file__)) +from producer.settings import SETTINGS, get_logger Base = declarative_base() -logger = logging.getLogger('producer-db') -logger.propagate = False -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter( - '%(asctime)s [ProducerDB] %(levelname)-8s %(message)s')) -logger.addHandler(handler) -engine = None -Session = None - -file_path = os.environ.get('PRODUCER_SETTINGS_FILE') -SETTINGS = Settings(file_path) -log_level = logging.getLevelName(SETTINGS.get('log_level', 'DEBUG')) -logger.setLevel(log_level) - - -def init(): - global engine - - # Offset - - offset_db_host = SETTINGS['offset_db_host'] - offset_db_user = SETTINGS['offset_db_user'] - offset_db_port = SETTINGS['offset_db_port'] - offset_db_password = SETTINGS['offset_db_password'] - offset_db_name = SETTINGS['offset_db_name'] - - url = f'postgresql+psycopg2://{offset_db_user}:{offset_db_password}' + \ - f'@{offset_db_host}:{offset_db_port}/{offset_db_name}' - - engine = create_engine(url, poolclass=NullPool) - try: - start_session(engine) - Offset.create_pool() - return - except SQLAlchemyError: - pass - try: - logger.error('Attempting to Create Database.') - create_db(engine, url) - start_session(engine) - Offset.create_pool() - except SQLAlchemyError as sqe: - logger.error('Database creation failed: %s' % sqe) - sys.exit(1) - - -def start_session(engine): - try: - Base.metadata.create_all(engine) - - global Session - Session = sessionmaker(bind=engine) - - logger.info('Database initialized.') - except SQLAlchemyError as err: - logger.error('Database could not be initialized. | %s' % err) - raise err - - -def create_db(engine, url): - db_name = url.split('/')[-1] - root = url.replace(db_name, 'postgres') - temp_engine = create_engine(root) - conn = temp_engine.connect() - conn.execute('commit') - conn.execute('create database %s' % db_name) - conn.close() - - -def get_session(): - return Session() - - -def get_engine(): - return engine - - -def make_uuid(): - return str(uuid.uuid4()) - - -class Offset(Base): - - # SQLAlchemy is now only used for table creation. - # All gets and sets to offset are made via psycopg2 which - # allows us to use the priority connection pooling, making - # sure our sets happen ASAP while reads of offsets can be - # delayed until resources are free. - - __tablename__ = 'offset' - - # GET single offset value - GET_STR = ''' - SELECT - o.schema_name, - o.offset_value - FROM public.offset o - WHERE o.schema_name = {schema_name} - LIMIT 1; ''' - - # UPSERT Postgres call. - SET_STR = ''' - INSERT INTO public.offset - (schema_name, offset_value) - VALUES - ({schema_name}, {offset_value}) - ON CONFLICT (schema_name) DO UPDATE SET - (schema_name, offset_value) - = (EXCLUDED.schema_name, EXCLUDED.offset_value);''' - - schema_name = Column(String, primary_key=True) - offset_value = Column(String, nullable=False) - - @classmethod - def create_pool(cls): - # Creates connection pool to OffsetDB as part of initialization. - offset_settings = { - 'user': 'offset_db_user', - 'dbname': 'offset_db_name', - 'port': 'offset_db_port', - 'host': 'offset_db_host', - 'password': 'offset_db_password' - } - offset_creds = {k: SETTINGS.get(v) for k, v in offset_settings.items()} - global OFFSET_DB - offset_db_pool_size = SETTINGS.get('offset_db_pool_size', 6) - OFFSET_DB = PriorityDatabasePool(offset_creds, 'OffsetDB', offset_db_pool_size) - - @classmethod - def update(cls, name, offset_value): - # Update or Create if not existing - call = 'Offset-SET' - try: - promise = OFFSET_DB.request_connection(0, call) # Highest Priority - conn = promise.get() - cursor = conn.cursor() - query = sql.SQL(Offset.SET_STR).format( - schema_name=sql.Literal(name), - offset_value=sql.Literal(offset_value)) - cursor.execute(query) - conn.commit() - return offset_value - except Exception as err: - raise err - finally: - try: - OFFSET_DB.release(call, conn) - except UnboundLocalError: - logger.error(f'{call} could not release a connection it never received.') - - @classmethod - def get_offset(cls, name): - call = 'Offset-GET' - try: - promise = OFFSET_DB.request_connection(1, call) # Lower Priority than set - conn = promise.get() - cursor = conn.cursor(cursor_factory=DictCursor) - query = sql.SQL(Offset.GET_STR).format(schema_name=sql.Literal(name)) - cursor.execute(query) - res = [i for i in cursor] - if not res: - return None - return res[0][1] - except Exception as err: - raise err - finally: - try: - OFFSET_DB.release(call, conn) - except UnboundLocalError: - logger.error(f'{call} could not release a connection it never received.') +logger = get_logger('producer-db') class PriorityDatabasePool(object): @@ -234,8 +59,9 @@ def __init__(self, pg_creds, name, max_connections=1): self.name = name self.live_workers = 0 self.pg_creds = pg_creds - self.max_connections = max_connections + self.max_connections = int(max_connections) logger.debug(f'Initializing DB Pool: {self.name} with {max_connections} connections.') + self.workers = [] self.backlog = 0 self.max_backlog = 0 @@ -272,7 +98,7 @@ def _make_connection(self): def _kill(self, *args, **kwargs): logger.info(f'PriorityDatabasePool: {self.name} is shutting down.') - logger.info(f'PriorityDatabasePool: {self.name}' + + logger.info(f'PriorityDatabasePool: {self.name}' f' resolving remaning {len(self.job_queue)} jobs.') self.running = False @@ -286,8 +112,8 @@ def _dispatcher(self): logger.debug(f'{self.name} pulled 1 for {name}: still {len(self.connection_pool)}') sleep(0) # allow other coroutines to work while not self._test_connection(conn): - logger.error(f'Pooled connection is dead, getting new resource') - logger.error(f'Replacing dead pool member.') + logger.error('Pooled connection is dead, getting new resource.' + ' Replacing dead pool member.') conn = self._make_connection() sleep(0) logger.debug(f'Got job from {name} @priority {priority_level}') @@ -328,8 +154,159 @@ def _shutdown_pool(self): c += 1 -# KernelDB -pg_requires = ['user', 'dbname', 'port', 'host', 'password'] -pg_creds = {key: SETTINGS.get('postgres_%s' % key) for key in pg_requires} -kernel_db_pool_size = SETTINGS.get('kernel_db_pool_size', 6) -KERNEL_DB = PriorityDatabasePool(pg_creds, 'KernelDB', kernel_db_pool_size) # imported from here +class Offset(Base): + + # SQLAlchemy is now only used for table creation. + # All gets and sets to offset are made via psycopg2 which + # allows us to use the priority connection pooling, making + # sure our sets happen ASAP while reads of offsets can be + # delayed until resources are free. + + __tablename__ = 'offset' + schema_name = Column(String, primary_key=True) + offset_value = Column(String, nullable=False) + + # private connection pool + __pool__ = None + + # GET single offset value + GET_STR = ''' + SELECT offset_value + FROM public.offset + WHERE schema_name = {schema_name} + LIMIT 1; + ''' + + # UPSERT Postgres call + SET_STR = ''' + INSERT INTO public.offset + (schema_name, offset_value) + VALUES + ({schema_name}, {offset_value}) + ON CONFLICT (schema_name) + DO UPDATE SET + (schema_name, offset_value) = (EXCLUDED.schema_name, EXCLUDED.offset_value); + ''' + + @classmethod + def create_pool(cls): + if not Offset.__pool__: + # Creates connection pool to OffsetDB as part of initialization. + offset_settings = { + 'user': 'offset_db_user', + 'dbname': 'offset_db_name', + 'port': 'offset_db_port', + 'host': 'offset_db_host', + 'password': 'offset_db_password' + } + offset_creds = {k: SETTINGS.get_required(v) for k, v in offset_settings.items()} + offset_db_pool_size = int(SETTINGS.get('offset_db_pool_size', 6)) + Offset.__pool__ = PriorityDatabasePool(offset_creds, 'OffsetDB', offset_db_pool_size) + return Offset.__pool__ + + @classmethod + def close_pool(cls): + if Offset.__pool__: + Offset.__pool__._kill() + Offset.__pool__ = None + + @classmethod + def update(cls, name, offset_value): + # Update or Create if not existing + call = 'Offset-SET' + try: + promise = Offset.__pool__.request_connection(0, call) # Highest Priority + conn = promise.get() + cursor = conn.cursor() + query = sql.SQL(Offset.SET_STR).format( + schema_name=sql.Literal(name), + offset_value=sql.Literal(offset_value), + ) + cursor.execute(query) + conn.commit() + return offset_value + + except Exception as err: + logger.error(err) + raise err + + finally: + try: + Offset.__pool__.release(call, conn) + except UnboundLocalError: + logger.error(f'{call} could not release a connection it never received.') + + @classmethod + def get_offset(cls, name): + call = 'Offset-GET' + try: + promise = Offset.__pool__.request_connection(1, call) # Lower Priority than set + conn = promise.get() + cursor = conn.cursor(cursor_factory=DictCursor) + query = sql.SQL(Offset.GET_STR).format(schema_name=sql.Literal(name)) + cursor.execute(query) + res = [i for i in cursor] + return res[0][0] if res else None + + except Exception as err: + logger.error(err) + raise err + + finally: + try: + Offset.__pool__.release(call, conn) + except UnboundLocalError: + logger.error(f'{call} could not release a connection it never received.') + + +def init(): + def _db_url(db_name): + return ( + f'postgresql+psycopg2://{offset_db_user}:{offset_db_password}' + f'@{offset_db_host}:{offset_db_port}/{db_name}' + ) + + def _start_session(engine): + try: + Base.metadata.create_all(engine) + sessionmaker(bind=engine) + logger.info('Database initialized.') + except SQLAlchemyError as err: + logger.error(f'Database could not be initialized | {err}') + raise err + + def _create_db(): + logger.info(f'Attempting to create database "{offset_db_name}".') + temp_engine = create_engine(_db_url('postgres')) + conn = temp_engine.connect() + conn.execute('commit') + conn.execute(f'CREATE DATABASE {offset_db_name};') + conn.close() + logger.info(f'Database "{offset_db_name}" created.') + + # Offset + offset_db_host = SETTINGS.get_required('offset_db_host') + offset_db_user = SETTINGS.get_required('offset_db_user') + offset_db_port = SETTINGS.get_required('offset_db_port') + offset_db_password = SETTINGS.get_required('offset_db_password') + offset_db_name = SETTINGS.get_required('offset_db_name') + + engine = create_engine(_db_url(offset_db_name), poolclass=NullPool) + try: + _start_session(engine) + Offset.create_pool() + return + except SQLAlchemyError as sqe: + logger.error(f'Start session failed (1st attempt): {sqe}') + pass + + # it was not possible to start session because the database does not exit + # create it and try again + try: + _create_db() + _start_session(engine) + Offset.create_pool() + except SQLAlchemyError as sqe: + logger.error(f'Start session failed (2nd attempt): {sqe}') + logger.exception(sqe) + sys.exit(1) diff --git a/aether-producer/producer/kernel.py b/aether-producer/producer/kernel.py new file mode 100644 index 000000000..e72621694 --- /dev/null +++ b/aether-producer/producer/kernel.py @@ -0,0 +1,74 @@ +# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime + +from producer.settings import SETTINGS, get_logger + + +logger = get_logger('producer-kernel') + +_WINDOW_SIZE_SEC = int(SETTINGS.get('window_size_sec', 3)) +_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + + +class KernelClient(object): + + def __init__(self): + # last time kernel was checked for new updates + self.last_check = None + self.last_check_error = None + self.limit = int(SETTINGS.get('fetch_size', 100)) + + def get_time_window_filter(self, query_time): + # You can't always trust that a set from kernel made up of time window + # start < _time < end is complete if nearly equal(_time, now()). + # Sometimes rows belonging to that set would show up a couple mS after + # the window had closed and be dropped. Our solution was to truncate sets + # based on the insert time and now() to provide a buffer. + + def fn(row): + commited = datetime.strptime(row.get('modified')[:26], _TIME_FORMAT) + lag_time = (query_time - commited).total_seconds() + if lag_time > _WINDOW_SIZE_SEC: + return True + + elif lag_time < -30.0: + # Sometimes fractional negatives show up. More than 30 seconds is an issue though. + logger.critical(f'INVALID LAG INTERVAL: {lag_time}. Check time settings on server.') + + _id = row.get('id') + logger.debug(f'WINDOW EXCLUDE: ID: {_id}, LAG: {lag_time}') + return False + + return fn + + def mode(self): + raise NotImplementedError + + def get_schemas(self): + raise NotImplementedError + + def check_updates(self, modified, schema_name, realm): + raise NotImplementedError + + def count_updates(self, schema_name, realm): + raise NotImplementedError + + def get_updates(self, modified, schema_name, realm): + raise NotImplementedError diff --git a/aether-producer/producer/kernel_api.py b/aether-producer/producer/kernel_api.py new file mode 100644 index 000000000..7ce457ae9 --- /dev/null +++ b/aether-producer/producer/kernel_api.py @@ -0,0 +1,159 @@ +# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import requests + +from datetime import datetime +from gevent import sleep + +from producer.settings import SETTINGS +from producer.kernel import KernelClient, logger + + +_REQUEST_ERROR_RETRIES = int(SETTINGS.get('request_error_retries', 3)) + +# Aether kernel +_KERNEL_TOKEN = SETTINGS.get_required('aether_kernel_token') +_KERNEL_URL = SETTINGS.get_required('aether_kernel_url') + +# Multitenancy +_DEFAULT_REALM = SETTINGS.get('default_realm', 'eha') +_REALM_COOKIE = SETTINGS.get('realm_cookie', 'eha-realm') +_REALMS_PATH = SETTINGS.get('aether_kernel_realms_path', '/admin/~realms') + +# Aether Kernel REST API urls +_REALMS_URL = f'{_KERNEL_URL}{_REALMS_PATH}' +_SCHEMAS_URL = ( + f'{_KERNEL_URL}/' + 'schemadecorators.json?' + '&page_size={page_size}' + '&fields=id,schema_name,schema_definition' +) +_ENTITIES_URL = ( + f'{_KERNEL_URL}/' + 'entities.json?' + '&page_size={page_size}' + '&fields=id,modified,payload' + '&ordering=modified' + '&modified__gt={modified}' + '&schema={schema}' +) + + +class KernelAPIClient(KernelClient): + + def mode(self): + return 'api' + + def get_schemas(self): + self.last_check = datetime.now().isoformat() + + try: + # get list of realms + realms = self._fetch(url=_REALMS_URL)['realms'] + for realm in realms: + # get list of schema decorators + _next_url = _SCHEMAS_URL.format(page_size=self.limit) + while _next_url: + response = self._fetch(url=_next_url, realm=realm) + _next_url = response['next'] + + for entry in response['results']: + yield {'realm': realm, **entry} + + except Exception: + self.last_check_error = 'Could not access kernel API to get topics' + logger.critical(self.last_check_error) + return [] + + def check_updates(self, modified, schema_name, realm): + url = _ENTITIES_URL.format( + page_size=1, + schema=schema_name, + modified=modified, + ) + try: + response = self._fetch(url=url, realm=realm) + return response['count'] > 1 + except Exception: + logger.critical('Could not access kernel API to look for updates') + return False + + def count_updates(self, schema_name, realm): + url = _ENTITIES_URL.format( + page_size=1, + schema=schema_name, + modified='', + ) + try: + _count = self._fetch(url=url, realm=realm)['count'] + logger.debug(f'Reporting requested size for {schema_name} of {_count}') + return {'count': _count} + except Exception: + logger.critical('Could not access kernel API to look for updates') + return -1 + + def get_updates(self, modified, schema_name, realm): + url = _ENTITIES_URL.format( + page_size=self.limit, + schema=schema_name, + modified=modified, + ) + + try: + query_time = datetime.now() + window_filter = self.get_time_window_filter(query_time) + + response = self._fetch(url=url, realm=realm) + return [ + entry + for entry in response['results'] + if window_filter(entry) + ] + + except Exception: + logger.critical('Could not access kernel API to look for updates') + return [] + + def _fetch(self, url, realm=None): + ''' + Executes the request call at least X times (``REQUEST_ERROR_RETRIES``) + trying to avoid unexpected connection errors (not request expected ones). + + Like: + # ConnectionResetError: [Errno 104] Connection reset by peer + # http.client.RemoteDisconnected: Remote end closed connection without response + ''' + + headers = { + 'Authorization': f'Token {_KERNEL_TOKEN}', + _REALM_COOKIE: realm if realm else _DEFAULT_REALM + } + + count = 0 + while True: + count += 1 + try: + response = requests.get(url, headers=headers) + return response.json() + except Exception as e: + if count >= _REQUEST_ERROR_RETRIES: + logger.error(f'Error while fetching data from {url}') + logger.error(e) + raise e + sleep(count) # sleep longer in each iteration diff --git a/aether-producer/producer/kernel_db.py b/aether-producer/producer/kernel_db.py new file mode 100644 index 000000000..e9269cd0f --- /dev/null +++ b/aether-producer/producer/kernel_db.py @@ -0,0 +1,158 @@ +# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime + +from gevent import monkey +# need to patch sockets to make requests async +monkey.patch_all() # noqa +import psycogreen.gevent +psycogreen.gevent.patch_psycopg() # noqa + +import psycopg2 +from psycopg2 import sql +from psycopg2.extras import DictCursor + +from producer.db import PriorityDatabasePool +from producer.settings import SETTINGS +from producer.kernel import KernelClient, logger + + +_SCHEMAS_SQL = 'SELECT * FROM kernel_schema_vw' + +_CHECK_UPDATES_SQL = ''' + SELECT id + FROM kernel_entity_vw + WHERE modified > {modified} + AND schema_name = {schema_name} + AND realm = {realm} + LIMIT 1; +''' + +_COUNT_UPDATES_SQL = ''' + SELECT COUNT(id) + FROM kernel_entity_vw + WHERE schema_name = {schema_name} + AND realm = {realm}; +''' + +_GET_UPDATES_SQL = ''' + SELECT * + FROM kernel_entity_vw + WHERE modified > {modified} + AND schema_name = {schema_name} + AND realm = {realm} + ORDER BY modified ASC + LIMIT {limit}; +''' + + +class KernelDBClient(KernelClient): + + def __init__(self): + super(KernelDBClient, self).__init__() + + pg_requires = ['user', 'dbname', 'port', 'host', 'password'] + pg_creds = {key: SETTINGS.get_required(f'postgres_{key}') for key in pg_requires} + kernel_db_pool_size = int(SETTINGS.get('kernel_db_pool_size', 6)) + self.pool = PriorityDatabasePool(pg_creds, 'KernelDBClient', kernel_db_pool_size) + + def mode(self): + return 'db' + + def get_schemas(self): + self.last_check = datetime.now().isoformat() + name = 'schemas_query' + query = sql.SQL(_SCHEMAS_SQL) + cursor = self._exec_sql(name, 1, query) + if cursor: + self.last_check_error = None + for row in cursor: + yield {key: row[key] for key in row.keys()} + else: + self.last_check_error = 'Could not access kernel database to get topics' + logger.critical('Could not access kernel database to get topics') + return [] + + def check_updates(self, modified, schema_name, realm): + query = sql.SQL(_CHECK_UPDATES_SQL).format( + modified=sql.Literal(modified), + schema_name=sql.Literal(schema_name), + realm=sql.Literal(realm), + ) + cursor = self._exec_sql(schema_name, 1, query) + if cursor: + return sum([1 for i in cursor]) > 0 + else: + logger.critical('Could not access kernel database to look for updates') + return False + + def count_updates(self, schema_name, realm): + query = sql.SQL(_COUNT_UPDATES_SQL).format( + schema_name=sql.Literal(schema_name), + realm=sql.Literal(realm), + ) + cursor = self._exec_sql(schema_name, 0, query) + if cursor: + _count = cursor.fetchone()[0] + logger.debug(f'Reporting requested size for {schema_name} of {_count}') + return {'count': _count} + else: + logger.critical('Could not access kernel database to look for updates') + return -1 + + def get_updates(self, modified, schema_name, realm): + query = sql.SQL(_GET_UPDATES_SQL).format( + modified=sql.Literal(modified), + schema_name=sql.Literal(schema_name), + realm=sql.Literal(realm), + limit=sql.Literal(self.limit), + ) + + query_time = datetime.now() + cursor = self._exec_sql(schema_name, 2, query) + if cursor: + window_filter = self.get_time_window_filter(query_time) + return [ + {key: row[key] for key in row.keys()} + for row in cursor + if window_filter(row) + ] + else: + logger.critical('Could not access kernel database to look for updates') + return [] + + def _exec_sql(self, name, priority, query): + try: + promise = self.pool.request_connection(priority, name) + conn = promise.get() + cursor = conn.cursor(cursor_factory=DictCursor) + cursor.execute(query) + + return cursor + + except psycopg2.OperationalError as pgerr: + logger.critical(f'Error while accessing database: {pgerr}') + logger.exception(pgerr) + return None + + finally: + try: + self.pool.release(name, conn) + except UnboundLocalError: + logger.error(f'{name} could not release a connection it never received.') diff --git a/aether-producer/producer/settings.json b/aether-producer/producer/settings.json index 5e24ab374..a357dd9d9 100644 --- a/aether-producer/producer/settings.json +++ b/aether-producer/producer/settings.json @@ -1,35 +1,26 @@ { - "start_delay": 5, - "sleep_time": 10, - "log_level": "ERROR", - "window_size_sec": 2, - "postgres_pull_limit": 250, - "postgres_host": "db", - "postgres_port": 5432, - "postgres_dbname": "aether", - "postgres_user": "readonlyuser", - "postgres_password": "", - "offset_db_pool_size": 6, - "kernel_db_pool_size": 6, - "kernel_url": "http://kernel:8100", - "kernel_admin_username": "admin", - "kernel_admin_password": "", - "kafka_failure_wait_time": 10, - "kafka_url": "kafka:29092", + "fetch_size": 100, + "flask_settings": { + "max_connections": 3, + "pretty_json_status": true + }, "kafka_default_topic_partitions": 1, "kafka_default_topic_replicas": 3, + "kafka_failure_wait_time": 10, "kafka_settings": { "default.topic.config": { "retention.ms": -1 } }, - "server_port": 5005, + "kernel_access_type": "api", + "log_level": "ERROR", + "offset_db_pool_size": 6, "server_ip": "", - "flask_settings": { - "max_connections": 3, - "pretty_json_status": true - }, + "server_port": 5005, + "sleep_time": 10, + "start_delay": 5, "topic_settings": { "name_modifier": "%s" - } -} \ No newline at end of file + }, + "window_size_sec": 2 +} diff --git a/aether-producer/producer/settings.py b/aether-producer/producer/settings.py index bc166a4fe..2d6c3da3c 100644 --- a/aether-producer/producer/settings.py +++ b/aether-producer/producer/settings.py @@ -17,11 +17,13 @@ # under the License. import json +import logging import os class Settings(dict): # A container for our settings + def __init__(self, file_path=None): self.load(file_path) @@ -31,10 +33,13 @@ def get(self, key, default=None): except KeyError: return default + def get_required(self, key): + return self.__getitem__(key) + def __getitem__(self, key): result = os.environ.get(key.upper()) if result is None: - result = super().__getitem__(key) + result = super().__getitem__(key.lower()) return result @@ -42,4 +47,55 @@ def load(self, path): with open(path) as f: obj = json.load(f) for k in obj: - self[k] = obj.get(k) + self[k.lower()] = obj.get(k) + + +def get_logger(name, logger=None): + if not logger: + logger = logging.getLogger(name) + logger.propagate = False + + try: + handler = logger.handlers[0] + except IndexError: + handler = logging.StreamHandler() + logger.addHandler(handler) + + handler.setFormatter(logging.Formatter(f'%(asctime)s [{name}] %(levelname)-8s %(message)s')) + + logger.setLevel(LOG_LEVEL) + + return logger + + +def _get_kafka_settings(): + kafka_settings = {} + kafka_settings['bootstrap.servers'] = SETTINGS.get_required('kafka_url') + + mode = SETTINGS.get('kafka_security', '') + if mode.lower() == 'sasl_plaintext': + # Let Producer use Kafka SU to produce + kafka_settings['security.protocol'] = 'SASL_PLAINTEXT' + kafka_settings['sasl.mechanisms'] = 'SCRAM-SHA-256' + kafka_settings['sasl.username'] = SETTINGS.get_required('kafka_su_user') + kafka_settings['sasl.password'] = SETTINGS.get_required('kafka_su_pw') + + elif mode.lower() == 'sasl_ssl': + kafka_settings['security.protocol'] = 'SASL_SSL' + kafka_settings['sasl.mechanisms'] = 'PLAIN' + kafka_settings['sasl.username'] = SETTINGS.get_required('kafka_su_user') + kafka_settings['sasl.password'] = SETTINGS.get_required('kafka_su_pw') + + return kafka_settings + + +################################################## +# Get settings from config file + +here = os.path.dirname(os.path.realpath(__file__)) +_default_file_path = os.path.join(here, 'settings.json') + +_file_path = os.environ.get('PRODUCER_SETTINGS_FILE', _default_file_path) +SETTINGS = Settings(_file_path) +KAFKA_SETTINGS = _get_kafka_settings() +LOG_LEVEL = logging.getLevelName(SETTINGS.get('log_level', 'INFO')) diff --git a/aether-producer/producer/topic.py b/aether-producer/producer/topic.py new file mode 100644 index 000000000..cd930c9c3 --- /dev/null +++ b/aether-producer/producer/topic.py @@ -0,0 +1,448 @@ +# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import ast +import concurrent +import enum +import gevent +import io +import json +import sys +import traceback +from datetime import datetime + +from confluent_kafka import Producer +from confluent_kafka.admin import NewTopic + +import spavro.schema +from spavro.datafile import DataFileWriter, DataFileReader +from spavro.io import DatumWriter, DatumReader +from spavro.io import validate + +from producer.db import Offset +from producer.settings import SETTINGS, KAFKA_SETTINGS, get_logger + +logger = get_logger('producer-topic') + + +class KafkaStatus(enum.Enum): + SUBMISSION_PENDING = 1 + SUBMISSION_FAILURE = 2 + SUBMISSION_SUCCESS = 3 + + +class TopicStatus(enum.Enum): + INITIALIZING = 0 # Started by not yet operational + PAUSED = 1 # Paused + LOCKED = 2 # Paused by system and non-resumable via API until sys unlock + REBUILDING = 3 # Topic is being rebuilt + NORMAL = 4 # Topic is operating normally + ERROR = 5 + + +class TopicManager(object): + + # Creates a long running job on TopicManager.update_kafka + + def __init__(self, context, schema, realm): + self.context = context + self.name = schema['schema_name'] + self.realm = realm + self.offset = '' + self.operating_status = TopicStatus.INITIALIZING + self.status = { + 'last_errors_set': {}, + 'last_changeset_status': {} + } + self.change_set = {} + self.successful_changes = [] + self.failed_changes = {} + self.sleep_time = int(SETTINGS.get('sleep_time', 2)) + self.window_size_sec = int(SETTINGS.get('window_size_sec', 3)) + + self.kafka_failure_wait_time = float(SETTINGS.get('kafka_failure_wait_time', 10)) + + try: + topic_base = SETTINGS.get('topic_settings', {}).get('name_modifier', '%s') % self.name + self.topic_name = f'{self.realm}.{topic_base}' + except Exception: # Bad Name + logger.critical((f'invalid name_modifier using topic name for topic: {self.name}.' + ' Update configuration for topic_settings.name_modifier')) + # This is a failure which could cause topics to collide. We'll kill the producer + # so the configuration can be updated. + sys.exit(1) + + self.update_schema(schema) + self.get_producer() + # Spawn worker and give to pool. + logger.debug(f'Spawning kafka update thread: {self.topic_name}') + self.context.threads.append(gevent.spawn(self.update_kafka)) + logger.debug(f'Checking for existence of topic {self.topic_name}') + while not self.check_topic(): + if self.create_topic(): + break + logger.debug(f'Waiting 30 seconds to retry creation of topic {self.topic_name}') + self.context.safe_sleep(30) + self.operating_status = TopicStatus.NORMAL + + def check_topic(self): + topics = [t for t in self.producer.list_topics().topics.keys()] + if self.topic_name in topics: + logger.debug(f'Topic {self.topic_name} already exists.') + return True + + logger.debug(f'Topic {self.name} does not exist. current topics: {topics}') + return False + + def create_topic(self): + logger.debug(f'Trying to create topic {self.topic_name}') + + kadmin = self.context.kafka_admin_client + topic_config = SETTINGS.get('kafka_settings', {}).get('default.topic.config') + partitions = int(SETTINGS.get('kafka_default_topic_partitions', 1)) + replicas = int(SETTINGS.get('kafka_default_topic_replicas', 1)) + topic = NewTopic( + self.topic_name, + num_partitions=partitions, + replication_factor=replicas, + config=topic_config, + ) + fs = kadmin.create_topics([topic]) + # future must return before timeout + for f in concurrent.futures.as_completed(iter(fs.values()), timeout=60): + e = f.exception() + if not e: + logger.info(f'Created topic {self.name}') + return True + else: + logger.critical(f'Topic {self.name} could not be created: {e}') + return False + + def get_producer(self): + self.producer = Producer(**KAFKA_SETTINGS) + logger.debug(f'Producer for {self.name} started...') + + # API Calls to Control Topic + + def pause(self): + # Stops sending of data on this topic until resume is called or Producer restarts. + if self.operating_status is not TopicStatus.NORMAL: + logger.info(f'Topic {self.name} could not pause, status: {self.operating_status}.') + return False + + logger.info(f'Topic {self.name} is pausing.') + self.operating_status = TopicStatus.PAUSED + return True + + def resume(self): + # Resume sending data after pausing. + if self.operating_status is not TopicStatus.PAUSED: + logger.info(f'Topic {self.name} could not resume, status: {self.operating_status}.') + return False + + logger.info(f'Topic {self.name} is resuming.') + self.operating_status = TopicStatus.NORMAL + return True + + # Functions to rebuilt this topic + + def rebuild(self): + # API Call + logger.warn(f'Topic {self.name} is being REBUIT!') + # kick off rebuild process + self.context.threads.append(gevent.spawn(self.handle_rebuild)) + return True + + def handle_rebuild(self): + # greened background task to handle rebuilding of topic + self.operating_status = TopicStatus.REBUILDING + tag = f'REBUILDING {self.name}:' + logger.info(f'{tag} waiting {self.sleep_time *1.5}(sec) for inflight ops to resolve') + self.context.safe_sleep(self.sleep_time * 1.5) + logger.info(f'{tag} Deleting Topic') + self.producer = None + + if not self.delete_this_topic(): + logger.critical(f'{tag} FAILED. Topic will not resume.') + self.operating_status = TopicStatus.LOCKED + return + + logger.warn(f'{tag} Resetting Offset.') + self.set_offset('') + logger.info(f'{tag} Rebuilding Topic Producer') + self.producer = Producer(**KAFKA_SETTINGS) + logger.warn(f'{tag} Wipe Complete. /resume to complete operation.') + self.operating_status = TopicStatus.PAUSED + + def delete_this_topic(self): + kadmin = self.context.kafka_admin_client + fs = kadmin.delete_topics([self.name], operation_timeout=60) + future = fs.get(self.name) + for x in range(60): + if not future.done(): + if (x % 5 == 0): + logger.debug(f'REBUILDING {self.name}: Waiting for future to complete') + gevent.sleep(1) + else: + return True + return False + + def updates_available(self): + return self.context.kernel_client.check_updates(self.offset, self.name, self.realm) + + def get_db_updates(self): + return self.context.kernel_client.get_updates(self.offset, self.name, self.realm) + + def get_topic_size(self): + return self.context.kernel_client.count_updates(self.name, self.realm) + + def update_schema(self, schema_obj): + self.schema_obj = self.parse_schema(schema_obj) + self.schema = spavro.schema.parse(json.dumps(self.schema_obj)) + + def parse_schema(self, schema_obj): + # We split this method from update_schema because schema_obj as it is can not + # be compared for differences. literal_eval fixes this. As such, this is used + # by the schema_changed() method. + # schema_obj is a nested OrderedDict, which needs to be stringified + return ast.literal_eval(json.dumps(schema_obj['schema_definition'])) + + def schema_changed(self, schema_candidate): + # for use by ProducerManager.check_schemas() + return self.parse_schema(schema_candidate) != self.schema_obj + + def get_status(self): + # Updates inflight status and returns to Flask called + self.status['operating_status'] = str(self.operating_status) + self.status['inflight'] = [i for i in self.change_set.keys()] + return self.status + + # Callback function registered with Kafka Producer to acknowledge receipt + def kafka_callback(self, err=None, msg=None, _=None, **kwargs): + if err: + logger.error(f'ERROR [{err}, {msg}, {kwargs}]') + + with io.BytesIO() as obj: + obj.write(msg.value()) + reader = DataFileReader(obj, DatumReader()) + for message in reader: + _id = message.get('id') + if err: + logger.debug(f'NO-SAVE: {_id} in topic {self.topic_name} | err {err.name()}') + self.failed_changes[_id] = err + else: + logger.debug(f'SAVE: {_id} in topic {self.topic_name}') + self.successful_changes.append(_id) + + def update_kafka(self): + # Main update loop + # Monitors postgres for changes via TopicManager.updates_available + # Consumes updates to the Posgres DB via TopicManager.get_db_updates + # Sends new messages to Kafka + # Registers message callback (ok or fail) to TopicManager.kafka_callback + # Waits for all messages to be accepted or timeout in TopicManager.wait_for_kafka + logger.debug(f'Topic {self.name}: Initializing') + + while self.operating_status is TopicStatus.INITIALIZING: + if self.context.killed: + return + logger.debug(f'Waiting for topic {self.name} to initialize...') + self.context.safe_sleep(self.sleep_time) + pass + + while not self.context.killed: + if self.operating_status is not TopicStatus.NORMAL: + logger.debug( + f'Topic {self.name} not updating, status: {self.operating_status}' + f', waiting {self.sleep_time}(sec)') + self.context.safe_sleep(self.sleep_time) + continue + + if not self.context.kafka_available(): + logger.debug('Kafka Container not accessible, waiting.') + self.context.safe_sleep(self.sleep_time) + continue + + self.offset = self.get_offset() or '' + if not self.updates_available(): + logger.debug('No updates') + self.context.safe_sleep(self.sleep_time) + continue + + try: + logger.debug(f'Getting Changeset for {self.name}') + self.change_set = {} + new_rows = self.get_db_updates() + if not new_rows: + self.context.safe_sleep(self.sleep_time) + continue + + end_offset = new_rows[-1].get('modified') + except Exception as pge: + logger.error(f'Could not get new records from kernel: {pge}') + self.context.safe_sleep(self.sleep_time) + continue + + try: + with io.BytesIO() as bytes_writer: + writer = DataFileWriter( + bytes_writer, DatumWriter(), self.schema, codec='deflate') + + for row in new_rows: + _id = row['id'] + msg = row.get('payload') + modified = row.get('modified') + if validate(self.schema, msg): + # Message validates against current schema + logger.debug( + f'ENQUEUE MSG TOPIC: {self.name}, ID: {_id}, MOD: {modified}') + self.change_set[_id] = row + writer.append(msg) + else: + # Message doesn't have the proper format for the current schema. + logger.critical( + f'SCHEMA_MISMATCH: NOT SAVED! TOPIC: {self.name}, ID: {_id}') + + writer.flush() + raw_bytes = bytes_writer.getvalue() + + self.producer.poll(0) + self.producer.produce( + self.topic_name, + raw_bytes, + callback=self.kafka_callback + ) + self.producer.flush() + self.wait_for_kafka(end_offset, failure_wait_time=self.kafka_failure_wait_time) + + except Exception as ke: + logger.error(f'error in Kafka save: {ke}') + logger.error(traceback.format_exc()) + self.context.safe_sleep(self.sleep_time) + + def wait_for_kafka(self, end_offset, timeout=10, iters_per_sec=10, failure_wait_time=10): + # Waits for confirmation of message receipt from Kafka before moving to next changeset + # Logs errors and status to log and to web interface + + sleep_time = timeout / (timeout * iters_per_sec) + change_set_size = len(self.change_set) + errors = {} + for i in range(timeout * iters_per_sec): + # whole changeset failed; systemic failure likely; sleep it off and try again + if len(self.failed_changes) >= change_set_size: + self.handle_kafka_errors(change_set_size, True, failure_wait_time) + self.clear_changeset() + logger.info( + f'Changeset not saved; likely broker outage, sleeping worker for {self.name}') + self.context.safe_sleep(failure_wait_time) + return # all failed; ignore changeset + + # All changes were saved + elif len(self.successful_changes) == change_set_size: + logger.debug(f'All changes saved ok in topic {self.name}.') + break + + # Remove successful and errored changes + for k in self.failed_changes: + try: + del self.change_set[k] + except KeyError: + pass # could have been removed on previous iter + + for k in self.successful_changes: + try: + del self.change_set[k] + except KeyError: + pass # could have been removed on previous iter + + # All changes registered + if len(self.change_set) == 0: + break + + gevent.sleep(sleep_time) + + # Timeout reached or all messages returned ( and not all failed ) + + self.status['last_changeset_status'] = { + 'changes': change_set_size, + 'failed': len(self.failed_changes), + 'succeeded': len(self.successful_changes), + 'timestamp': datetime.now().isoformat(), + } + if errors: + self.handle_kafka_errors(change_set_size, all_failed=False) + self.clear_changeset() + # Once we're satisfied, we set the new offset past the processed messages + self.context.kafka_status = KafkaStatus.SUBMISSION_SUCCESS + self.set_offset(end_offset) + # Sleep so that elements passed in the current window become eligible + self.context.safe_sleep(self.window_size_sec) + + def handle_kafka_errors(self, change_set_size, all_failed=False, failure_wait_time=10): + # Errors in saving data to Kafka are handled and logged here + errors = {} + for _id, err in self.failed_changes.items(): + # accumulate error types + error_type = str(err.name()) + errors[error_type] = errors.get(error_type, 0) + 1 + + last_error_set = { + 'changes': change_set_size, + 'errors': errors, + 'outcome': 'RETRY', + 'timestamp': datetime.now().isoformat(), + } + + if not all_failed: + # Collect Error types for reporting + for _id, err in self.failed_changes.items(): + logger.critical(f'PRODUCER_FAILURE: T: {self.name} ID {_id}, ERR_MSG {err.name()}') + + dropped_messages = change_set_size - len(self.successful_changes) + errors['NO_REPLY'] = dropped_messages - len(self.failed_changes) + + last_error_set['failed'] = len(self.failed_changes) + last_error_set['succeeded'] = len(self.successful_changes) + last_error_set['outcome'] = f'MSGS_DROPPED : {dropped_messages}' + + self.status['last_errors_set'] = last_error_set + if all_failed: + self.context.kafka_status = KafkaStatus.SUBMISSION_FAILURE + return + + def clear_changeset(self): + self.failed_changes = {} + self.successful_changes = [] + self.change_set = {} + + def get_offset(self): + # Get current offset from Database + offset = Offset.get_offset(self.name) + if offset: + logger.debug(f'Got offset for {self.name} | {offset}') + return offset + else: + logger.debug(f'Could not get offset for {self.name}, it is a new type') + # No valid offset so return None; query will use empty string which is < any value + return None + + def set_offset(self, offset): + # Set a new offset in the database + new_offset = Offset.update(self.name, offset) + logger.debug(f'Set new offset for {self.name} | {new_offset}') + self.status['offset'] = new_offset diff --git a/aether-producer/setup.py b/aether-producer/setup.py index 5b937acf9..9253098fb 100644 --- a/aether-producer/setup.py +++ b/aether-producer/setup.py @@ -36,11 +36,4 @@ def read(f): author_email='shawn.sarwar@ehealthafrica.org', license='Apache2 License', keywords=['aet', 'aether', 'kafka', 'producer'], - - setup_requires=['pytest'], - tests_require=[ - 'pytest', - 'requests[security]', - 'sqlalchemy', - ], ) diff --git a/aether-producer/tests/__init__.py b/aether-producer/tests/__init__.py index 9f19dcc67..5a8109728 100644 --- a/aether-producer/tests/__init__.py +++ b/aether-producer/tests/__init__.py @@ -18,60 +18,46 @@ # specific language governing permissions and limitations # under the License. -import logging -import pytest - -from .timeout import timeout as Timeout # noqa -from producer import * # noqa -Offset = db.Offset - -log = logging.getLogger() - -USER = os.environ['PRODUCER_ADMIN_USER'] -PW = os.environ['PRODUCER_ADMIN_PW'] +from producer import ProducerManager +from producer.settings import SETTINGS, get_logger class MockAdminInterface(object): + def list_topics(self, *args, **kwargs): return {} -class MockProducerManager(ProducerManager): - - def __init__(self, settings): - self.admin_name = USER - self.admin_password = PW - self.settings = settings - self.killed = False - self.kernel = None - self.kafka = False - self.kafka_admin_client = MockAdminInterface() - self.logger = log - self.topic_managers = {} +class MockKernelClient(object): + def __init__(self): + self.last_check = None + self.last_check_error = None -class ObjectWithKernel(object): + def mode(self): + return 'dummy' - def __init__(self, initial_kernel_value=None): - self.kernel = initial_kernel_value - self.logger = log + def get_schemas(self): + return [] + def check_updates(self, modified, schema_name, realm): + return False -@pytest.mark.integration -@pytest.fixture(scope='session') -def ProducerManagerSettings(): - return Settings('/code/tests/conf/producer.json') + def count_updates(self, schema_name, realm): + return 0 + def get_updates(self, modified, schema_name, realm): + return [] -@pytest.mark.integration -@pytest.fixture(scope='session') -def OffsetDB(ProducerManagerSettings): - man = MockProducerManager(ProducerManagerSettings) - man.init_db() - return Offset +class MockProducerManager(ProducerManager): -@pytest.mark.integration -@pytest.fixture(scope='function') -def OffsetQueue(): - return db.OFFSET_DB + def __init__(self): + self.admin_name = SETTINGS.get_required('producer_admin_user') + self.admin_password = SETTINGS.get_required('producer_admin_pw') + self.killed = False + self.kernel_client = MockKernelClient() + self.kafka_status = False + self.kafka_admin_client = MockAdminInterface() + self.logger = get_logger('tests') + self.topic_managers = {} diff --git a/aether-producer/tests/conf/producer.json b/aether-producer/tests/conf/producer.json index c67e8d866..500748418 100644 --- a/aether-producer/tests/conf/producer.json +++ b/aether-producer/tests/conf/producer.json @@ -1,35 +1,26 @@ { - "start_delay": 5, - "sleep_time": 10, - "log_level": "ERROR", - "window_size_sec": 2, - "postgres_pull_limit": 250, - "postgres_host": "db-test", - "postgres_port": 5432, - "postgres_dbname": "kernel-test", - "postgres_user": "readonlyuser", - "postgres_password": "", - "offset_db_pool_size": 1, - "kernel_db_pool_size": 1, - "kernel_url": "http://kernel-test:9100", - "kernel_admin_username": "admin", - "kernel_admin_password": "", - "kafka_failure_wait_time": 4, - "kafka_url": "kafka-test:29092", + "default_realm": "eha", + "flask_settings": { + "max_connections": 3, + "pretty_json_status": true + }, "kafka_default_topic_partitions": 1, "kafka_default_topic_replicas": 1, + "kafka_failure_wait_time": 4, "kafka_settings": { "default.topic.config": { "retention.ms": -1 } }, - "server_port": 9005, + "log_level": "ERROR", + "offset_db_pool_size": 1, + "realm_cookie": "eha-realm", "server_ip": "", - "flask_settings": { - "max_connections": 3, - "pretty_json_status": true - }, + "server_port": 9005, + "sleep_time": 10, + "start_delay": 5, "topic_settings": { "name_modifier": "%s" - } -} \ No newline at end of file + }, + "window_size_sec": 2 +} diff --git a/aether-producer/tests/test_integration.py b/aether-producer/tests/test_integration.py index 864fe5b76..c3ecbcd8a 100644 --- a/aether-producer/tests/test_integration.py +++ b/aether-producer/tests/test_integration.py @@ -18,91 +18,120 @@ # specific language governing permissions and limitations # under the License. - import requests import uuid +from gevent import sleep + +from producer.settings import SETTINGS +from producer.db import Offset -from . import * +from .timeout import timeout as Timeout +from . import MockProducerManager -@pytest.mark.integration -def test_manager_http_endpoint_service(ProducerManagerSettings): - man = MockProducerManager(ProducerManagerSettings) +def test_manager_http_endpoint_service(): + man = MockProducerManager() try: - auth = requests.auth.HTTPBasicAuth(USER, PW) + auth = requests.auth.HTTPBasicAuth(man.admin_name, man.admin_password) man.serve() man.add_endpoints() sleep(1) - url = 'http://localhost:%s' % man.settings.get('server_port') + + url = 'http://localhost:%s' % SETTINGS.get('server_port', 9005) r = requests.head(f'{url}/healthcheck') assert(r.status_code == 200) + protected_endpoints = ['status', 'topics'] for e in protected_endpoints: r = requests.head(f'{url}/{e}') assert(r.status_code == 401) + for e in protected_endpoints: r = requests.head(f'{url}/{e}', auth=auth) assert(r.status_code == 200) + finally: man.http.stop() man.http.close() man.worker_pool.kill() -@pytest.mark.integration -def test_initialize_database_get_set(OffsetDB): - assert(OffsetDB.get_offset('some_missing') is None) - value = str(uuid.uuid4()) - OffsetDB.update('fake_entry', value) - assert(OffsetDB.get_offset('fake_entry') == value) - - -@pytest.mark.integration -def test_offset_pooling(OffsetQueue, OffsetDB): - osn, osv = 'pool_offset_test', '10001' - OffsetDB.update(osn, osv) - assert(osv == OffsetDB.get_offset(osn)) - assert(OffsetQueue.max_connections is len(OffsetQueue.connection_pool)) - conns = [] - while not OffsetQueue.connection_pool.empty(): - promise = OffsetQueue.request_connection(0, 'test') - conn = promise.get() - assert(OffsetQueue._test_connection(conn) is True) - conns.append(conn) +def test_initialize_database_get_set(): try: - with Timeout(1): - OffsetDB.get_offset(osn) - except TimeoutError: - pass - else: - assert(False), 'Operation should have timed out.' - for conn in conns: - OffsetQueue.release('test', conn) + MockProducerManager().init_db() + + assert(Offset.get_offset('some_missing') is None) + value = str(uuid.uuid4()) + Offset.update('fake_entry', value) + assert(Offset.get_offset('fake_entry') == value) + finally: + Offset.close_pool() -@pytest.mark.integration -def test_offset_prioritization(OffsetQueue, OffsetDB): - # Grab all connections - conns = [] - while not OffsetQueue.connection_pool.empty(): - promise = OffsetQueue.request_connection(0, 'test') - conn = promise.get() - assert(OffsetQueue._test_connection(conn) is True) - conns.append(conn) - low_prio = OffsetQueue.request_connection(1, 'low') - high_prio = OffsetQueue.request_connection(0, 'high') - # free a connection - conn = conns.pop() - OffsetQueue.release('test', conn) +def test_offset_pooling(): try: - with Timeout(1): - conn = low_prio.get() - except TimeoutError: - pass - else: - assert(False), 'Operation should have timed out.' - with Timeout(1): - conn = high_prio.get() - assert(OffsetQueue._test_connection(conn) is True) - for conn in conns: + MockProducerManager().init_db() + OffsetQueue = Offset.create_pool() + + osn, osv = 'pool_offset_test', '10001' + Offset.update(osn, osv) + assert(osv == Offset.get_offset(osn)) + assert(OffsetQueue.max_connections == len(OffsetQueue.connection_pool)) + + conns = [] + while not OffsetQueue.connection_pool.empty(): + promise = OffsetQueue.request_connection(0, 'test') + conn = promise.get() + assert(OffsetQueue._test_connection(conn)) + conns.append(conn) + + try: + with Timeout(1): + Offset.get_offset(osn) + except TimeoutError: + assert(True), 'Operation has timed out.' + else: + assert(False), 'Operation should have timed out.' + finally: + for conn in conns: + OffsetQueue.release('test', conn) + finally: + Offset.close_pool() + + +def test_offset_prioritization(): + try: + MockProducerManager().init_db() + OffsetQueue = Offset.create_pool() + + # Grab all connections + conns = [] + while not OffsetQueue.connection_pool.empty(): + promise = OffsetQueue.request_connection(0, 'test') + conn = promise.get() + assert(OffsetQueue._test_connection(conn)) + conns.append(conn) + + low_prio = OffsetQueue.request_connection(1, 'low') + high_prio = OffsetQueue.request_connection(0, 'high') + + # free a connection + conn = conns.pop() OffsetQueue.release('test', conn) + + try: + with Timeout(1): + conn = low_prio.get() + except TimeoutError: + assert(True), 'Operation has timed out.' + else: + assert(False), 'Operation should have timed out.' + + with Timeout(1): + conn = high_prio.get() + assert(OffsetQueue._test_connection(conn)) + + for conn in conns: + OffsetQueue.release('test', conn) + finally: + Offset.close_pool() diff --git a/aether-ui/.dockerignore b/aether-ui/.dockerignore index aaca40b6a..02f0e3c96 100644 --- a/aether-ui/.dockerignore +++ b/aether-ui/.dockerignore @@ -1,19 +1,18 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log + +**/node_modules diff --git a/aether-ui/Dockerfile b/aether-ui/Dockerfile index 3042e83c6..d52b9eb04 100644 --- a/aether-ui/Dockerfile +++ b/aether-ui/Dockerfile @@ -1,27 +1,31 @@ FROM python:3.7-slim-buster +LABEL description="Aether Kernel UI" \ + name="aether-ui" \ + author="eHealth Africa" + ################################################################################ -## setup container +## set up container ################################################################################ COPY ./conf/docker/* /tmp/ RUN /tmp/setup.sh +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + ################################################################################ -## install app +## install dependencies ## copy files one by one and split commands to use docker cache ################################################################################ -WORKDIR /code - -COPY ./conf/pip /code/conf/pip +COPY --chown=aether:aether ./conf/pip /code/conf/pip RUN pip install -q --upgrade pip && \ pip install -q -r /code/conf/pip/requirements.txt - -COPY ./ /code +COPY --chown=aether:aether ./ /code ################################################################################ -## copy application version and create git revision +## create application version and revision files ################################################################################ ARG VERSION=0.0.0 @@ -30,11 +34,3 @@ ARG GIT_REVISION RUN mkdir -p /var/tmp && \ echo $VERSION > /var/tmp/VERSION && \ echo $GIT_REVISION > /var/tmp/REVISION - -################################################################################ -## last setup steps -################################################################################ - -RUN chown -R aether: /code - -ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/aether-ui/aether/ui/assets/.dockerignore b/aether-ui/aether/ui/assets/.dockerignore index a201be011..0e16925a3 100644 --- a/aether-ui/aether/ui/assets/.dockerignore +++ b/aether-ui/aether/ui/assets/.dockerignore @@ -1,2 +1,4 @@ -.* -!.env +.dockerignore +Dockerfile + +node_modules diff --git a/aether-ui/aether/ui/assets/Dockerfile b/aether-ui/aether/ui/assets/Dockerfile index ba3f7771a..57205bb99 100644 --- a/aether-ui/aether/ui/assets/Dockerfile +++ b/aether-ui/aether/ui/assets/Dockerfile @@ -16,6 +16,4 @@ ENV PATH /code/node_modules/.bin:$PATH WORKDIR /code/assets COPY ./ /code/assets -RUN apt-get autoremove && apt-get clean - ENTRYPOINT ["/code/assets/conf/entrypoint.sh"] diff --git a/aether-ui/aether/ui/assets/package.json b/aether-ui/aether/ui/assets/package.json index 7db4c1ae4..e327953e4 100644 --- a/aether-ui/aether/ui/assets/package.json +++ b/aether-ui/aether/ui/assets/package.json @@ -26,10 +26,10 @@ "jquery": "~3.4.0", "moment": "~2.24.0", "popper.js": "~1.16.0", - "react": "~16.12.0", + "react": "~16.13.0", "react-clipboard.js": "~2.0.16", - "react-dom": "~16.12.0", - "react-intl": "~3.12.0", + "react-dom": "~16.13.0", + "react-intl": "~4.1.0", "react-outside-click-handler": "~1.3.0", "react-redux": "~7.2.0", "react-router-dom": "~5.1.0", @@ -44,7 +44,7 @@ "@babel/plugin-proposal-class-properties": "~7.8.0", "@babel/preset-env": "~7.8.0", "@babel/preset-react": "~7.8.0", - "@hot-loader/react-dom": "~16.11.0", + "@hot-loader/react-dom": "~16.13.0", "babel-loader": "~8.0.0", "css-loader": "~3.4.0", "enzyme": "~3.11.0", @@ -61,7 +61,7 @@ "sass-loader": "~8.0.0", "standard": "~14.3.0", "style-loader": "~1.1.0", - "webpack": "~4.41.0", + "webpack": "~4.42.0", "webpack-bundle-tracker": "~0.4.3", "webpack-cli": "~3.3.0", "webpack-dev-middleware": "~3.7.0", diff --git a/aether-ui/conf/pip/requirements.txt b/aether-ui/conf/pip/requirements.txt index c3bbbde1d..bca99d4ab 100644 --- a/aether-ui/conf/pip/requirements.txt +++ b/aether-ui/conf/pip/requirements.txt @@ -12,30 +12,30 @@ # ################################################################################ -aether.sdk==1.2.20 +aether.sdk==1.2.21 autopep8==1.5 -boto3==1.12.2 -botocore==1.15.2 +boto3==1.12.22 +botocore==1.15.22 cachetools==4.0.0 certifi==2019.11.28 cffi==1.14.0 chardet==3.0.4 configparser==4.0.2 -coverage==5.0.3 +coverage==5.0.4 cryptography==2.8 -Django==2.2.10 +Django==2.2.11 django-cacheops==4.2 django-cleanup==4.0.0 django-cors-headers==3.2.1 django-debug-toolbar==2.2 -django-minio-storage==0.3.5 +django-minio-storage==0.3.7 django-model-utils==4.0.0 django-prometheus==2.0.0 django-redis-sessions==0.6.1 -django-silk==4.0.0 +django-silk==4.0.1 django-storages==1.9.1 django-uwsgi==0.2.2 -django-webpack-loader==0.6.0 +django-webpack-loader==0.7.0 djangorestframework==3.11.0 docutils==0.15.2 drf-dynamic-fields==0.3.1 @@ -44,38 +44,38 @@ flake8==3.7.9 flake8-quotes==2.1.1 funcy==1.14 google-api-core==1.16.0 -google-auth==1.11.2 +google-auth==1.11.3 google-cloud-core==1.3.0 google-cloud-storage==1.26.0 google-resumable-media==0.5.0 googleapis-common-protos==1.51.0 gprof2dot==2019.11.30 -idna==2.8 +idna==2.9 Jinja2==2.11.1 -jmespath==0.9.4 +jmespath==0.9.5 MarkupSafe==1.1.1 mccabe==0.6.1 -minio==5.0.7 +minio==5.0.8 prometheus-client==0.7.1 protobuf==3.11.3 psycopg2-binary==2.8.4 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycodestyle==2.5.0 -pycparser==2.19 +pycparser==2.20 pyflakes==2.1.1 -Pygments==2.5.2 +Pygments==2.6.1 pyOpenSSL==19.1.0 python-dateutil==2.8.1 python-json-logger==0.1.11 pytz==2019.3 redis==3.4.1 -requests==2.22.0 +requests==2.23.0 rsa==4.0 s3transfer==0.3.3 -sentry-sdk==0.14.1 +sentry-sdk==0.14.2 six==1.14.0 -sqlparse==0.3.0 +sqlparse==0.3.1 tblib==1.6.0 urllib3==1.25.8 uWSGI==2.0.18 diff --git a/dev.json.enc b/dev.json.enc deleted file mode 100644 index c40d397ee..000000000 Binary files a/dev.json.enc and /dev/null differ diff --git a/docker-compose-base.yml b/docker-compose-base.yml index 0d44d50d2..76bfb8002 100644 --- a/docker-compose-base.yml +++ b/docker-compose-base.yml @@ -68,7 +68,7 @@ services: command: minio server --quiet --address minio:9090 /data keycloak-base: - image: jboss/keycloak:7.0.1 + image: jboss/keycloak:9.0.0 environment: DB_VENDOR: POSTGRES DB_ADDR: db @@ -299,29 +299,26 @@ services: environment: PYTHONUNBUFFERED: 1 - # default settings file - PRODUCER_SETTINGS_FILE: /code/producer/settings.json - PRODUCER_ADMIN_USER: ${PRODUCER_ADMIN_USER} PRODUCER_ADMIN_PW: ${PRODUCER_ADMIN_PW} # These variables will override the ones indicated in the settings file - KERNEL_URL: http://${NETWORK_DOMAIN}/kernel - KERNEL_USERNAME: ${KERNEL_ADMIN_USERNAME} - KERNEL_PASSWORD: ${KERNEL_ADMIN_PASSWORD} + AETHER_KERNEL_TOKEN: ${KERNEL_ADMIN_TOKEN} + AETHER_KERNEL_URL: http://kernel:8100 + DEFAULT_REALM: ${DEFAULT_REALM} + REALM_COOKIE: ${REALM_COOKIE} POSTGRES_HOST: db POSTGRES_DBNAME: aether - POSTGRES_USER: ${KERNEL_READONLY_DB_USERNAME} - POSTGRES_PASSWORD: ${KERNEL_READONLY_DB_PASSWORD} + POSTGRES_USER: postgres + POSTGRES_PORT: 5432 + POSTGRES_PASSWORD: ${KERNEL_DB_PASSWORD} OFFSET_DB_HOST: db OFFSET_DB_USER: postgres OFFSET_DB_PORT: 5432 - OFFSET_DB_PASSWORD: ${KERNEL_DB_PASSWORD} + OFFSET_DB_PASSWORD: ${PRODUCER_DB_PASSWORD} OFFSET_DB_NAME: producer_offset_db - - SERVER_PORT: 5005 volumes: - ./aether-producer:/code command: start diff --git a/docker-compose-test.yml b/docker-compose-test.yml index 88b6d0fd4..f86de85b6 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -153,26 +153,17 @@ services: file: ./docker-compose-base.yml service: producer-base environment: + PRODUCER_SETTINGS_FILE: /code/tests/conf/producer.json + # These variables will override the ones indicated in the settings file + AETHER_KERNEL_URL: http://kernel-test:9100 KAFKA_URL: kafka-test:29092 - KERNEL_URL: http://kernel-test:9100 - LOG_LEVEL: DEBUG - + KERNEL_ACCESS_TYPE: ${KERNEL_ACCESS_TYPE:-api} OFFSET_DB_HOST: db-test OFFSET_DB_NAME: producer_offset_db_test - OFFSET_DB_PASSWORD: ${KERNEL_DB_PASSWORD} - OFFSET_DB_PORT: 5432 - OFFSET_DB_USER: postgres - POSTGRES_HOST: db-test POSTGRES_DBNAME: ${TEST_KERNEL_DB_NAME:-test-kernel} - PRODUCER_ADMIN_PW: ${PRODUCER_ADMIN_PW} - PRODUCER_ADMIN_USER: ${PRODUCER_ADMIN_USER} - PRODUCER_SETTINGS_FILE: /code/tests/conf/producer.json - - SERVER_PORT: 9005 - # --------------------------------- # Aether Integration Tests @@ -182,6 +173,8 @@ services: image: aether-integration-test build: ./test-aether-integration-module environment: + KAFKA_URL: kafka-test:29092 + KERNEL_URL: http://kernel-test:9100 KERNEL_USERNAME: ${CLIENT_USERNAME} KERNEL_PASSWORD: ${CLIENT_PASSWORD} @@ -191,6 +184,7 @@ services: PRODUCER_URL: http://producer-test:9005 PRODUCER_ADMIN_PW: ${PRODUCER_ADMIN_PW} PRODUCER_ADMIN_USER: ${PRODUCER_ADMIN_USER} + PRODUCER_MODE: ${KERNEL_ACCESS_TYPE:-api} volumes: - ./test-aether-integration-module:/code command: test diff --git a/docker-compose.yml b/docker-compose.yml index 749eccc7a..99b2aef82 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -114,11 +114,6 @@ services: extends: file: ./docker-compose-base.yml service: ui-assets-base - volumes: - # static folder - - ./.persistent_data/static/ui:/var/www/static - # backup folder - - ./.persistent_data/backups/ui:/backups networks: - internal @@ -126,6 +121,11 @@ services: extends: file: ./docker-compose-base.yml service: ui-base + volumes: + # static folder + - ./.persistent_data/static/ui:/var/www/static + # backup folder + - ./.persistent_data/backups/ui:/backups depends_on: db: condition: service_healthy diff --git a/prod.json.enc b/prod.json.enc deleted file mode 100644 index 051ac73dc..000000000 Binary files a/prod.json.enc and /dev/null differ diff --git a/scripts/_lib.sh b/scripts/_lib.sh index 217c531bd..86dcdcddb 100644 --- a/scripts/_lib.sh +++ b/scripts/_lib.sh @@ -88,16 +88,6 @@ function pip_freeze_container { $DC run --rm --no-deps $container pip_freeze } -# kernel readonly user (used by Aether Producer) -# Usage: create_readonly_user -function create_readonly_user { - docker-compose up -d db - docker-compose run --rm --no-deps kernel setup - docker-compose run --rm --no-deps kernel eval \ - python3 /code/sql/create_readonly_user.py "$1" "$2" - docker-compose kill -} - # Start database container and wait till is up and responding function start_db { _wait_for "db" "docker-compose run --rm --no-deps kernel eval pg_isready -q" @@ -107,7 +97,7 @@ function start_db { # Usage: start_container function start_container { local container=$1 - local is_ready="docker-compose run --rm --no-deps kernel manage check_url -u $2" + local is_ready="docker-compose run --rm --no-deps kernel eval wget -q --spider $2" _wait_for "$container" "$is_ready" } @@ -124,8 +114,10 @@ function _wait_for { >&2 echo "Waiting for $container... $retries" ((retries++)) - if [[ $retries -gt 30 ]]; then + if [[ $retries -gt 10 ]]; then echo_message "It was not possible to start $container" + docker-compose logs $container + echo_message "" exit 1 fi diff --git a/scripts/build_all_containers.sh b/scripts/build_all_containers.sh index 9d8b248c5..3bb1ebb27 100755 --- a/scripts/build_all_containers.sh +++ b/scripts/build_all_containers.sh @@ -35,7 +35,3 @@ for container in "${containers[@]}" do build_container $container done - -create_readonly_user \ - "$KERNEL_READONLY_DB_USERNAME" \ - "$KERNEL_READONLY_DB_PASSWORD" diff --git a/scripts/build_container.sh b/scripts/build_container.sh index fd00575ff..1bc95e751 100755 --- a/scripts/build_container.sh +++ b/scripts/build_container.sh @@ -37,10 +37,4 @@ fi build_container $1 -if [[ $1 == "kernel" ]]; then - create_readonly_user \ - "$KERNEL_READONLY_DB_USERNAME" \ - "$KERNEL_READONLY_DB_PASSWORD" -fi - ./scripts/clean_all.sh diff --git a/scripts/build_docker_credentials.sh b/scripts/build_docker_credentials.sh index 406a9dc4d..5f865286c 100755 --- a/scripts/build_docker_credentials.sh +++ b/scripts/build_docker_credentials.sh @@ -109,8 +109,6 @@ KERNEL_ADMIN_USERNAME=admin KERNEL_ADMIN_PASSWORD=$(gen_random_string) KERNEL_ADMIN_TOKEN=$(gen_random_string) KERNEL_DJANGO_SECRET_KEY=$(gen_random_string) -KERNEL_READONLY_DB_USERNAME=readonlyuser -KERNEL_READONLY_DB_PASSWORD=$(gen_random_string) KERNEL_DB_PASSWORD=$(gen_random_string) # ------------------------------------------------------------------ @@ -141,6 +139,7 @@ UI_DB_PASSWORD=$(gen_random_string) # ================================================================== PRODUCER_ADMIN_USER=admin PRODUCER_ADMIN_PW=$(gen_random_string) +PRODUCER_DB_PASSWORD=$(gen_random_string) # ------------------------------------------------------------------ diff --git a/scripts/deploy.sh b/scripts/deploy.sh deleted file mode 100755 index 18000089a..000000000 --- a/scripts/deploy.sh +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env bash -# -# Copyright (C) 2020 by eHealth Africa : http://www.eHealthAfrica.org -# -# See the NOTICE file distributed with this work for additional information -# regarding copyright ownership. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -set -Eo pipefail - -LINE=`printf -v row "%${COLUMNS:-$(tput cols)}s"; echo ${row// /#}` - -DEPLOY_APPS=( kernel odk ui producer ) - -export GOOGLE_APPLICATION_CREDENTIALS="gcs_key.json" - -if [[ ${TRAVIS_TAG} =~ ^[0-9]+(\.[0-9]+){2}$ ]]; then - - echo "${LINE}" - echo "Skipping production deployment (temporary)" - echo "${LINE}" - exit 0 - - DOCKER_VERSION=${TRAVIS_TAG} - GCR_VERSION=${TRAVIS_TAG} - GCS_PROJECT="eha-data" - GCR_PROJECT="production-228613" - export RELEASE_BUCKET="aether-releases" - - openssl aes-256-cbc \ - -K $encrypted_17d8de6bf835_key \ - -iv $encrypted_17d8de6bf835_iv \ - -in prod.json.enc \ - -out gcs_key.json \ - -d - -elif [[ ${TRAVIS_BRANCH} =~ ^release\-[0-9]+\.[0-9]+$ ]]; then - - echo "${LINE}" - echo "Skipping release candidates deployment (temporary)" - echo "${LINE}" - exit 0 - - FILE_VERSION=`cat VERSION` - DOCKER_VERSION="${FILE_VERSION}-rc" - - GCR_VERSION="${DOCKER_VERSION}-${TRAVIS_COMMIT}" - - # deploy release candidates in ??? - GCS_PROJECT="alpha" - GCR_PROJECT="development-223016" - export RELEASE_BUCKET="aether-releases-dev" - - openssl aes-256-cbc \ - -K $encrypted_17d8de6bf835_key \ - -iv $encrypted_17d8de6bf835_iv \ - -in dev.json.enc \ - -out gcs_key.json \ - -d - -else - - DOCKER_VERSION="alpha" - GCR_VERSION=${TRAVIS_COMMIT} - GCS_PROJECT="alpha" - GCR_PROJECT="development-223016" - export RELEASE_BUCKET="aether-releases-dev" - - openssl aes-256-cbc \ - -K $encrypted_17d8de6bf835_key \ - -iv $encrypted_17d8de6bf835_iv \ - -in dev.json.enc \ - -out gcs_key.json \ - -d -fi - -echo "${LINE}" -echo "Docker images: ${DEPLOY_APPS[@]}" -echo "Docker images tag: $DOCKER_VERSION" -echo "Deployment version: $GCR_VERSION" -echo "Repository project: $GCR_PROJECT" -echo "Storage project: $GCS_PROJECT" -echo "${LINE}" - - -# =========================================================== -# install dependencies and create GC credentials files -pip install -q google-cloud-storage push-app-version - - -# =========================================================== -# pull images from public docker hub - -DOCKER_IMAGE_REPO="ehealthafrica" - -for APP in "${DEPLOY_APPS[@]}"; do - AETHER_APP="aether-${APP}" - SRC_IMG="${DOCKER_IMAGE_REPO}/${AETHER_APP}:${DOCKER_VERSION}" - - echo "Pulling Docker image ${SRC_IMG}" - docker pull "${SRC_IMG}" - echo "${LINE}" -done - - -# =========================================================== -# push images to deployment repository - -GCR_REPO_URL="https://eu.gcr.io" -GCR_IMAGE_REPO="eu.gcr.io/${GCR_PROJECT}" - -# https://cloud.google.com/container-registry/docs/advanced-authentication#json_key_file -cat gcs_key.json | docker login -u _json_key --password-stdin $GCR_REPO_URL - -for APP in "${DEPLOY_APPS[@]}"; do - AETHER_APP="aether-${APP}" - - SRC_IMG="${DOCKER_IMAGE_REPO}/${AETHER_APP}:${DOCKER_VERSION}" - DEST_IMG="${GCR_IMAGE_REPO}/${AETHER_APP}:${GCR_VERSION}" - - echo "Pushing GCR image ${DEST_IMG}" - docker tag "$SRC_IMG" "$DEST_IMG" - docker push "${DEST_IMG}" - echo "${LINE}" -done - -docker logout ${GCR_REPO_URL} || true - - -# =========================================================== -# notify to Google Cloud Storage the new images - -push-app-version \ - --version $GCR_VERSION \ - --project $GCS_PROJECT diff --git a/scripts/deployment/kernel.Dockerfile b/scripts/deployment/kernel.Dockerfile new file mode 100644 index 000000000..2db08d15b --- /dev/null +++ b/scripts/deployment/kernel.Dockerfile @@ -0,0 +1,38 @@ +################################################################################ +## using alpine image to build version and revision files +################################################################################ + +FROM alpine AS app_resource + +WORKDIR /tmp +COPY ./.git /tmp/.git +COPY ./scripts/deployment/setup_revision.sh /tmp/setup_revision.sh +RUN /tmp/setup_revision.sh + + +################################################################################ +## using python image to build app +################################################################################ + +FROM python:3.7-slim-buster + +LABEL description="Aether Kernel" \ + name="aether-kernel" \ + author="eHealth Africa" + +## set up container +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + +COPY ./aether-kernel/conf/docker/* /tmp/ +RUN /tmp/setup.sh + +## copy source code +COPY --chown=aether:aether ./aether-kernel/ /code + +## install dependencies +RUN pip install -q --upgrade pip && \ + pip install -q -r /code/conf/pip/requirements.txt + +## copy application version and revision +COPY --from=app_resource --chown=aether:aether /tmp/resources/. /var/tmp/ diff --git a/scripts/deployment/odk.Dockerfile b/scripts/deployment/odk.Dockerfile new file mode 100644 index 000000000..5c67f698b --- /dev/null +++ b/scripts/deployment/odk.Dockerfile @@ -0,0 +1,38 @@ +################################################################################ +## using alpine image to build version and revision files +################################################################################ + +FROM alpine AS app_resource + +WORKDIR /tmp +COPY ./.git /tmp/.git +COPY ./scripts/deployment/setup_revision.sh /tmp/setup_revision.sh +RUN /tmp/setup_revision.sh + + +################################################################################ +## using python image to build app +################################################################################ + +FROM python:3.7-slim-buster + +LABEL description="Aether ODK Module" \ + name="aether-odk" \ + author="eHealth Africa" + +## set up container +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + +COPY ./aether-odk-module/conf/docker/* /tmp/ +RUN /tmp/setup.sh + +## copy source code +COPY --chown=aether:aether ./aether-odk-module/ /code + +## install dependencies +RUN pip install -q --upgrade pip && \ + pip install -q -r /code/conf/pip/requirements.txt + +## copy application version and revision +COPY --from=app_resource --chown=aether:aether /tmp/resources/. /var/tmp/ diff --git a/scripts/deployment/producer.Dockerfile b/scripts/deployment/producer.Dockerfile new file mode 100644 index 000000000..ebddab5b8 --- /dev/null +++ b/scripts/deployment/producer.Dockerfile @@ -0,0 +1,44 @@ +################################################################################ +## using alpine image to build version and revision files +################################################################################ + +FROM alpine AS app_resource + +WORKDIR /tmp +COPY ./.git /tmp/.git +COPY ./scripts/deployment/setup_revision.sh /tmp/setup_revision.sh +RUN /tmp/setup_revision.sh + + +################################################################################ +## using python image to build app +################################################################################ + +FROM python:3.7-slim-buster + +LABEL description="Aether Kafka Producer" \ + name="aether-producer" \ + author="eHealth Africa" + +## set up container +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + +RUN apt-get update -qq && \ + apt-get -qq \ + --yes \ + --allow-downgrades \ + --allow-remove-essential \ + --allow-change-held-packages \ + install gcc && \ + useradd -ms /bin/false aether + +## copy source code +COPY --chown=aether:aether ./aether-producer/ /code + +## install dependencies +RUN pip install -q --upgrade pip && \ + pip install -q -r /code/conf/pip/requirements.txt + +## copy application version and revision +COPY --from=app_resource --chown=aether:aether /tmp/resources/. /var/tmp/ diff --git a/scripts/deployment/setup_revision.sh b/scripts/deployment/setup_revision.sh new file mode 100755 index 000000000..26b2ea735 --- /dev/null +++ b/scripts/deployment/setup_revision.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env sh +# +# Copyright (C) 2020 by eHealth Africa : http://www.eHealthAfrica.org +# +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# create resources directory +mkdir -p ./resources/ + +# install git (missing in alpine image) +apk add --no-cache git + +# get branch and commit from git + +# create REVISION resources +GIT_COMMIT=`git rev-parse HEAD` +echo $GIT_COMMIT > ./resources/REVISION + +# create VERSION resource (from branch name) +GIT_BRANCH=`git rev-parse --abbrev-ref HEAD` +# replace "release-" prefix with "v" (release-1.2 => v1.2) +GIT_BRANCH=$(echo "$GIT_BRANCH" | sed "s/release-/v/") +if [ $GIT_BRANCH = "develop" ]; then + echo "alpha" > ./resources/VERSION +else + echo $GIT_BRANCH > ./resources/VERSION +fi diff --git a/scripts/deployment/ui.Dockerfile b/scripts/deployment/ui.Dockerfile new file mode 100644 index 000000000..324d50cb2 --- /dev/null +++ b/scripts/deployment/ui.Dockerfile @@ -0,0 +1,58 @@ +################################################################################ +## using alpine image to build version and revision files +################################################################################ + +FROM alpine AS app_resource + +WORKDIR /tmp +COPY ./.git /tmp/.git +COPY ./scripts/deployment/setup_revision.sh /tmp/setup_revision.sh +RUN /tmp/setup_revision.sh + + +################################################################################ +## using node image to build react app +################################################################################ + +FROM node:lts-slim AS app_node + +## set up container +WORKDIR /assets/ +## copy application version and git revision +COPY --from=app_resource /tmp/resources/. /var/tmp/ +## copy source code +COPY ./aether-ui/aether/ui/assets/ /assets/ +## build react app +RUN npm install -q && npm run build + + +################################################################################ +## using python image to build app +################################################################################ + +FROM python:3.7-slim-buster AS app + +LABEL description="Aether Kernel UI" \ + name="aether-ui" \ + author="eHealth Africa" + +## set up container +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] + +COPY ./aether-ui/conf/docker/* /tmp/ +RUN /tmp/setup.sh + +## copy source code +COPY --chown=aether:aether ./aether-ui/ /code + +## install dependencies +RUN pip install -q --upgrade pip && \ + pip install -q -r /code/conf/pip/requirements.txt + +## copy react app +RUN rm -Rf /code/aether/ui/assets/ +COPY --from=app_node --chown=aether:aether /assets/bundles/. /code/aether/ui/assets/bundles + +## copy application version and revision +COPY --from=app_resource --chown=aether:aether /tmp/resources/. /var/tmp/ diff --git a/scripts/setup_keycloak.sh b/scripts/setup_keycloak.sh index d489c6715..3743b3af7 100755 --- a/scripts/setup_keycloak.sh +++ b/scripts/setup_keycloak.sh @@ -46,8 +46,8 @@ $PSQL <<- EOSQL UPDATE pg_database SET datallowconn = 'false' WHERE datname = '${KC_DB}'; SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '${KC_DB}'; - DROP DATABASE ${KC_DB}; - DROP USER ${KC_USER}; + DROP DATABASE IF EXISTS ${KC_DB}; + DROP USER IF EXISTS ${KC_USER}; CREATE USER ${KC_USER} PASSWORD '${KEYCLOAK_DB_PASSWORD}'; CREATE DATABASE ${KC_DB} OWNER ${KC_USER}; diff --git a/scripts/test_container.sh b/scripts/test_container.sh index c602c002f..14cad63ed 100755 --- a/scripts/test_container.sh +++ b/scripts/test_container.sh @@ -122,20 +122,11 @@ if [[ $1 != "kernel" ]]; then -r=$CLIENT_REALM fi - # Producer and Integration need readonlyuser to be present - if [[ $1 = "producer" || $1 == "integration" ]]; then - echo_message "Creating readonlyuser on Kernel DB" - $DC_KERNEL_RUN eval \ - python3 /code/sql/create_readonly_user.py \ - "$KERNEL_READONLY_DB_USERNAME" \ - "$KERNEL_READONLY_DB_PASSWORD" - - if [[ $1 = "integration" ]]; then - build_container producer - echo_message "Starting producer" - $DC_TEST up -d producer-test - echo_message "producer ready!" - fi + if [[ $1 = "integration" ]]; then + build_container producer + echo_message "Starting producer" + $DC_TEST up -d producer-test + echo_message "producer ready!" fi fi diff --git a/aether-producer/tests/test_unit.py b/scripts/test_deployment.sh old mode 100644 new mode 100755 similarity index 58% rename from aether-producer/tests/test_unit.py rename to scripts/test_deployment.sh index 5cc3ccf94..1f552bd86 --- a/aether-producer/tests/test_unit.py +++ b/scripts/test_deployment.sh @@ -1,11 +1,11 @@ -#!/usr/bin/env python - -# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org +#!/usr/bin/env bash +# +# Copyright (C) 2020 by eHealth Africa : http://www.eHealthAfrica.org # # See the NOTICE file distributed with this work for additional information # regarding copyright ownership. # -# Licensed under the Apache License, Version 2.0 (the 'License'); +# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with # the License. You may obtain a copy of the License at # @@ -17,5 +17,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# + +set -Eeuo pipefail + +DEPLOY_APPS=( kernel producer odk ui ) +IMAGE_PREFIX="test-deployment-aether" -# Unit Tests were outdated, returning in true MT version +for APP in "${DEPLOY_APPS[@]}"; do + docker build \ + --force-rm \ + --tag ${IMAGE_PREFIX}-${APP} \ + --file ./scripts/deployment/${APP}.Dockerfile \ + . +done diff --git a/scripts/test_travis.sh b/scripts/test_travis.sh index 20638a474..61436a88f 100755 --- a/scripts/test_travis.sh +++ b/scripts/test_travis.sh @@ -22,8 +22,6 @@ set -Eeuo pipefail source ./scripts/_lib.sh -export BUILD_OPTIONS="--no-cache --force-rm --pull" - create_credentials create_docker_assets @@ -45,6 +43,13 @@ case "$1" in ./scripts/test_container.sh producer build_client + + # check producer access to kernel via REST API + export KERNEL_ACCESS_TYPE=api + ./scripts/test_container.sh integration + + # check producer access to kernel via database + export KERNEL_ACCESS_TYPE=db ./scripts/test_container.sh integration ;; diff --git a/test-aether-integration-module/.dockerignore b/test-aether-integration-module/.dockerignore index aaca40b6a..2724ff134 100644 --- a/test-aether-integration-module/.dockerignore +++ b/test-aether-integration-module/.dockerignore @@ -1,19 +1,18 @@ .dockerignore Dockerfile -db.sqlite3 -__pycache__ -*.pyc -*.pyo -*.pyd -.Python -env -pip-log.txt -pip-delete-this-directory.txt -.tox -.coverage -.coverage.* -.cache -coverage.xml -*,cover -*.log -.git + +**/__pycache__ +**/*.pyc +**/*.pyo +**/*.pyd +**/.Python + +**/.coverage +**/.coverage.* +**/.cache +**/coverage.xml +**/*.cover + +**/*.log + +**/db.sqlite3 diff --git a/test-aether-integration-module/Dockerfile b/test-aether-integration-module/Dockerfile index 9da3c93d6..757a7e5dc 100644 --- a/test-aether-integration-module/Dockerfile +++ b/test-aether-integration-module/Dockerfile @@ -1,7 +1,11 @@ FROM python:3.7-slim-buster +LABEL description="Aether Integration Tests" \ + name="aether-integration-test" \ + author="eHealth Africa" + ################################################################################ -## setup container +## set up container ################################################################################ RUN apt-get update -qq && \ @@ -10,27 +14,24 @@ RUN apt-get update -qq && \ --allow-downgrades \ --allow-remove-essential \ --allow-change-held-packages \ - install gcc + install gcc && \ + useradd -ms /bin/false aether -################################################################################ -## install app -## copy files one by one and split commands to use docker cache -################################################################################ +WORKDIR /code +ENTRYPOINT ["/code/entrypoint.sh"] ################################################################################ -## install app +## install dependencies +## copy files one by one and split commands to use docker cache ################################################################################ -WORKDIR /code - -COPY ./conf/pip /code/conf/pip +COPY --chown=aether:aether ./conf/pip /code/conf/pip RUN pip install -q --upgrade pip && \ pip install -q -f /code/conf/pip/dependencies -r /code/conf/pip/requirements.txt - -COPY ./ /code +COPY --chown=aether:aether ./ /code ################################################################################ -## copy application version and create git revision +## create application version and revision files ################################################################################ ARG VERSION=0.0.0 @@ -39,13 +40,3 @@ ARG GIT_REVISION RUN mkdir -p /var/tmp && \ echo $VERSION > /var/tmp/VERSION && \ echo $GIT_REVISION > /var/tmp/REVISION - -################################################################################ -## last setup steps -################################################################################ - -# create user to run container (avoid root user) -RUN useradd -ms /bin/false aether -RUN chown -R aether: /code - -ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/test-aether-integration-module/test/__init__.py b/test-aether-integration-module/test/__init__.py index 8fefb43f9..c653c2f61 100644 --- a/test-aether-integration-module/test/__init__.py +++ b/test-aether-integration-module/test/__init__.py @@ -44,12 +44,16 @@ SEED_TYPE = 'CurrentStock' # realm is only required if Kernel is MultiTenant REALM = os.environ.get('KERNEL_REALM', '-') + KAFKA_SEED_TYPE = f'{REALM}.{SEED_TYPE}' +KAFKA_URL = os.environ['KAFKA_URL'] PRODUCER_CREDS = [ os.environ['PRODUCER_ADMIN_USER'], - os.environ['PRODUCER_ADMIN_PW'] + os.environ['PRODUCER_ADMIN_PW'], ] +PRODUCER_URL = os.environ['PRODUCER_URL'] +PRODUCER_MODE = os.environ['PRODUCER_MODE'] @pytest.fixture(scope='function') @@ -76,14 +80,16 @@ def wait_for_producer_status(): status = producer_request('status') if not status: raise ValueError('No status response from producer') + kafka = status.get('kafka_container_accessible') if not kafka: raise ValueError('Kafka not connected yet') + person = status.get('topics', {}).get(KAFKA_SEED_TYPE, {}) ok_count = person.get('last_changeset_status', {}).get('succeeded') if ok_count: sleep(5) - return ok_count + return status else: raise ValueError('Last changeset status has no successes. Not producing') except Exception as err: @@ -99,8 +105,7 @@ def entities(client, schemadecorators): # noqa: F811 for sd in schemadecorators: name = sd['name'] sd_id = sd.id - entities[name] = [i for i in client.entities.paginated( - 'list', schemadecorator=sd_id)] + entities[name] = [i for i in client.entities.paginated('list', schemadecorator=sd_id)] return entities @@ -110,10 +115,7 @@ def generate_entities(client, mappingset): # noqa: F811 entities = [] for i in range(FORMS_TO_SUBMIT): Submission = client.get_model('Submission') - submission = Submission( - payload=next(payloads), - mappingset=mappingset.id - ) + submission = Submission(payload=next(payloads), mappingset=mappingset.id) instance = client.submissions.create(data=submission) for entity in client.entities.paginated('list', submission=instance.id): entities.append(entity) @@ -122,7 +124,7 @@ def generate_entities(client, mappingset): # noqa: F811 @pytest.fixture(scope='function') def read_people(): - consumer = get_consumer(KAFKA_SEED_TYPE) + consumer = get_consumer(KAFKA_URL, KAFKA_SEED_TYPE) messages = read(consumer, start='FIRST', verbose=False, timeout_ms=500) consumer.close() # leaving consumers open can slow down zookeeper, try to stay tidy return messages @@ -133,11 +135,8 @@ def read_people(): def producer_request(endpoint, expect_json=True): auth = requests.auth.HTTPBasicAuth(*PRODUCER_CREDS) - url = '{base}/{endpoint}'.format( - base=os.environ['PRODUCER_URL'], - endpoint=endpoint) try: - res = requests.get(url, auth=auth) + res = requests.get(f'{PRODUCER_URL}/{endpoint}', auth=auth) if expect_json: return res.json() else: diff --git a/test-aether-integration-module/test/consumer.py b/test-aether-integration-module/test/consumer.py index ea69fe724..5af316ab7 100644 --- a/test-aether-integration-module/test/consumer.py +++ b/test-aether-integration-module/test/consumer.py @@ -17,80 +17,66 @@ # under the License. import json -import sys -from time import sleep as Sleep +from time import sleep from aet.consumer import KafkaConsumer from kafka.consumer.fetcher import NoOffsetForPartitionError -def pprint(obj): - print(json.dumps(obj, indent=2)) - - -def get_consumer(topic=None, strategy='latest'): +def get_consumer(kafka_url, topic=None, strategy='latest'): consumer = KafkaConsumer( aether_emit_flag_required=False, group_id='demo-reader', - bootstrap_servers=['kafka-test:29092'], - auto_offset_reset=strategy + bootstrap_servers=[kafka_url], + auto_offset_reset=strategy, ) if topic: consumer.subscribe(topic) return consumer -def connect_kafka(): - CONN_RETRY = 3 - CONN_RETRY_WAIT_TIME = 10 - for x in range(CONN_RETRY): - try: - consumer = get_consumer() - topics = consumer.topics() - consumer.close() - print('Connected to Kafka...') - return [topic for topic in topics] - except Exception as ke: - print('Could not connect to Kafka: %s' % (ke)) - Sleep(CONN_RETRY_WAIT_TIME) - print('Failed to connect to Kafka after %s retries' % CONN_RETRY) - sys.exit(1) # Kill consumer with error - - -def read_poll_result(new_records, verbose=False): - flattened = [] - for parition_key, packages in new_records.items(): - for package in packages: - messages = package.get('messages') - for msg in messages: - flattened.append(msg) - if verbose: - pprint(msg) - return flattened - - def read(consumer, start='LATEST', verbose=False, timeout_ms=5000, max_records=200): messages = [] if start not in ['FIRST', 'LATEST']: - raise ValueError('%s it not a valid argument for "start="' % start) + raise ValueError(f'{start} it not a valid argument for "start="') if start == 'FIRST': consumer.seek_to_beginning() + blank = 0 while True: try: poll_result = consumer.poll_and_deserialize( timeout_ms=timeout_ms, - max_records=max_records) + max_records=max_records, + ) except NoOffsetForPartitionError as nofpe: print(nofpe) break + if not poll_result: blank += 1 if blank > 3: break - Sleep(1) + sleep(1) - new_messages = read_poll_result(poll_result, verbose) + new_messages = _read_poll_result(poll_result, verbose) messages.extend(new_messages) - print('Read %s messages' % (len(messages))) + + print(f'Read {len(messages)} messages') return messages + + +def _read_poll_result(new_records, verbose=False): + flattened = [] + for parition_key, packages in new_records.items(): + for package in packages: + messages = package.get('messages') + for msg in messages: + flattened.append(msg) + if verbose: + _pprint(msg) + return flattened + + +def _pprint(obj): + print(json.dumps(obj, indent=2)) diff --git a/test-aether-integration-module/test/test_integration.py b/test-aether-integration-module/test/test_integration.py index e09e80309..ae1c9d21b 100644 --- a/test-aether-integration-module/test/test_integration.py +++ b/test-aether-integration-module/test/test_integration.py @@ -43,11 +43,12 @@ def test_3_check_updated_count(entities): def test_4_check_producer_status(wait_for_producer_status): assert(wait_for_producer_status is not None) + assert(wait_for_producer_status['kernel_mode'] == PRODUCER_MODE) def test_5_check_producer_topics(producer_topics): assert(KAFKA_SEED_TYPE in producer_topics.keys()) - assert(int(producer_topics[KAFKA_SEED_TYPE]['count']) is SEED_ENTITIES) + assert(int(producer_topics[KAFKA_SEED_TYPE]['count']) == SEED_ENTITIES) def test_6_check_stream_entities(read_people, entities): @@ -57,6 +58,7 @@ def test_6_check_stream_entities(read_people, entities): for _id in kernel_messages: if _id not in kafka_messages: failed.append(_id) + assert(len(failed) == 0) assert(len(kernel_messages) == len(kafka_messages)) assert(producer_topic_count(KAFKA_SEED_TYPE) == len(kafka_messages)) @@ -65,17 +67,21 @@ def test_6_check_stream_entities(read_people, entities): def test_7_control_topic(): producer_control_topic(KAFKA_SEED_TYPE, 'pause') sleep(.5) + op = topic_status(KAFKA_SEED_TYPE)['operating_status'] assert(op == 'TopicStatus.PAUSED') producer_control_topic(KAFKA_SEED_TYPE, 'resume') sleep(.5) + op = topic_status(KAFKA_SEED_TYPE)['operating_status'] assert(op == 'TopicStatus.NORMAL') producer_control_topic(KAFKA_SEED_TYPE, 'rebuild') sleep(.5) + for x in range(120): op = topic_status(KAFKA_SEED_TYPE)['operating_status'] if op != 'TopicStatus.REBUILDING': return sleep(1) + assert(False), 'Topic Deletion Timed out.'