Skip to content

Commit

Permalink
Update for Airflow 2.6.3 (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog authored Aug 7, 2023
1 parent 2b240f5 commit 5c10154
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 246 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ jobs:
cd ..
git clone https://github.com/The-Academic-Observatory/observatory-platform.git
cd observatory-platform
pip install -e observatory-api
pip install -e observatory-platform
pip install -e observatory-api --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
pip install -e observatory-platform --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
cd ..
cd oaebu-workflows
pip install -e .[tests]
pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
# Required for testing ONIX Telescope
- name: Set up JDK 11
Expand Down
24 changes: 2 additions & 22 deletions oaebu_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
# limitations under the License.

# Author: James Diprose
import os

import json
import os
import re
from typing import Dict

from airflow.exceptions import AirflowException

from observatory.platform.config import module_file_path
from observatory.platform.workflows.elastic_import_workflow import ElasticImportConfig
from observatory.platform.utils.jinja2_utils import render_template
from observatory.platform.elastic.kibana import TimeField
from observatory.platform.elastic.elastic import KeepInfo, KeepOrder


def test_fixtures_folder(*subdirs) -> str:
Expand Down Expand Up @@ -98,21 +96,3 @@ def elastic_mappings_folder() -> str:
"""

return module_file_path("oaebu_workflows.database.mappings")


ELASTIC_OAEBU_KIBANA_TIME_FIELDS = [
TimeField("^oaebu-.*-unmatched-book-metrics$", "release_date"),
TimeField("^oaebu-.*-book-product-list$", "time_field"),
TimeField("^oaebu-.*$", "month"),
]
ELASTIC_INDEX_KEEP_INFO = {
"": KeepInfo(ordering=KeepOrder.newest, num=3),
"oaebu": KeepInfo(ordering=KeepOrder.newest, num=3),
}

ELASTIC_IMPORT_CONFIG = ElasticImportConfig(
elastic_mappings_path=elastic_mappings_folder(),
elastic_mappings_func=load_elastic_mappings_oaebu,
kibana_time_fields=ELASTIC_OAEBU_KIBANA_TIME_FIELDS,
index_keep_info=ELASTIC_INDEX_KEEP_INFO,
)
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/google_analytics_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
catchup: bool = True,
start_date: pendulum.DateTime = pendulum.datetime(2018, 1, 1),
schedule_interval: str = "@monthly",
schedule: str = "@monthly",
):
"""Construct a GoogleAnalyticsTelescope instance.
:param dag_id: The ID of the DAG
Expand All @@ -107,12 +107,12 @@ def __init__(
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param catchup: Whether to catchup the DAG or not
:param start_date: The start date of the DAG
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
catchup=catchup,
airflow_conns=[oaebu_service_account_conn_id, observatory_api_conn_id],
tags=["oaebu"],
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/google_books_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
sftp_service_conn_id: str = "sftp_service",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
catchup: bool = False,
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
start_date: pendulum.DateTime = pendulum.datetime(2018, 1, 1),
):
"""Construct a GoogleBooksTelescope instance.
Expand All @@ -104,13 +104,13 @@ def __init__(
:param sftp_service_conn_id: Airflow connection ID for the SFTP service
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param catchup: Whether to catchup the DAG or not
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
:param start_date: The start date of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
catchup=catchup,
airflow_conns=[sftp_service_conn_id, observatory_api_conn_id],
tags=["oaebu"],
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/irus_fulcrum_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
irus_oapen_api_conn_id: str = "irus_api",
catchup: bool = True,
schedule_interval: str = "0 0 4 * *", # Run on the 4th of every month
schedule: str = "0 0 4 * *", # Run on the 4th of every month
start_date: pendulum.DateTime = pendulum.datetime(2022, 4, 1), # Earliest available data
):
"""The Fulcrum Telescope
Expand All @@ -102,7 +102,7 @@ def __init__(
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param irus_oapen_api_conn_id: Airflow connection ID OAPEN IRUS UK (counter 5)
:param catchup: Whether to catchup the DAG or not
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
:param start_date: The start date of the DAG
"""
if bq_table_description is None:
Expand All @@ -111,7 +111,7 @@ def __init__(
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
airflow_conns=[observatory_api_conn_id, irus_oapen_api_conn_id],
catchup=catchup,
tags=["oaebu"],
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/irus_oapen_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__(
irus_oapen_login_conn_id: str = "irus_login",
catchup: bool = True,
start_date: pendulum.DateTime = pendulum.datetime(2015, 6, 1),
schedule_interval: str = "0 0 4 * *", # Run on the 4th of every month
schedule: str = "0 0 4 * *", # Run on the 4th of every month
max_active_runs: int = 5,
):
"""The OAPEN irus uk telescope.
Expand All @@ -139,7 +139,7 @@ def __init__(
:param irus_oapen_login_conn_id: The Airflow connection ID for IRUS API (login) - for counter 4
:param catchup: Whether to catchup the DAG or not
:param start_date: The start date of the DAG
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
:param max_active_runs: The maximum number of concurrent DAG instances
"""
if bq_table_description is None:
Expand All @@ -148,7 +148,7 @@ def __init__(
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
catchup=catchup,
airflow_conns=[
observatory_api_conn_id,
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/jstor_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
catchup: bool = False,
max_active_runs: int = 1,
schedule_interval: str = "0 0 4 * *", # 4th day of every month
schedule: str = "0 0 4 * *", # 4th day of every month
start_date: pendulum.DateTime = pendulum.datetime(2016, 10, 1),
):
"""Construct a JstorTelescope instance.
Expand All @@ -138,13 +138,13 @@ def __init__(
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param catchup: Whether to catchup the DAG or not
:param max_active_runs: The maximum number of DAG runs that can be run concurrently
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
:param start_date: The start date of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
catchup=catchup,
airflow_conns=[gmail_api_conn_id, observatory_api_conn_id],
max_active_runs=max_active_runs,
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/oapen_metadata_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
catchup=False,
start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
):
"""Construct a OapenMetadataTelescope instance.
:param dag_id: The ID of the DAG
Expand All @@ -109,12 +109,12 @@ def __init__(
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param catchup: Whether to catchup the DAG or not
:param start_date: The start date of the DAG
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
airflow_conns=[observatory_api_conn_id],
catchup=catchup,
tags=["oaebu"],
Expand Down
6 changes: 3 additions & 3 deletions oaebu_workflows/workflows/onix_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
sftp_service_conn_id: str = "sftp_service",
catchup: bool = False,
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
start_date: pendulum.DateTime = pendulum.datetime(2021, 3, 28),
):
"""Construct an OnixTelescope instance.
Expand All @@ -100,13 +100,13 @@ def __init__(
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param sftp_service_conn_id: Airflow connection ID for the SFTP service
:param catchup: Whether to catchup the DAG or not
:param schedule_interval: The schedule interval of the DAG
:param schedule: The schedule interval of the DAG
:param start_date: The start date of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule_interval,
schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id, sftp_service_conn_id],
tags=["oaebu"],
Expand Down
8 changes: 4 additions & 4 deletions oaebu_workflows/workflows/onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def __init__(
sensor_dag_ids: List[str] = None,
catchup: Optional[bool] = False,
start_date: Optional[pendulum.DateTime] = pendulum.datetime(2022, 8, 1),
schedule_interval: Optional[str] = "@weekly",
schedule: Optional[str] = "@weekly",
):
"""
Initialises the workflow object.
Expand Down Expand Up @@ -208,7 +208,7 @@ def __init__(
:param sensor_dag_ids: Dag IDs for dependent tasks
:param catchup: Whether to catch up missed DAG runs.
:param start_date: Start date of the DAG.
:param schedule_interval: Scheduled interval for running the DAG.
:param schedule: Scheduled interval for running the DAG.
"""

if not sensor_dag_ids:
Expand Down Expand Up @@ -258,7 +258,7 @@ def __init__(
self.sensor_dag_ids = sensor_dag_ids
self.catchup = catchup
self.start_date = start_date
self.schedule_interval = schedule_interval
self.schedule = schedule

# Initialise the data partners
self.data_partners = []
Expand All @@ -280,7 +280,7 @@ def __init__(
super().__init__(
dag_id=self.dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id],
tags=["oaebu"],
Expand Down
Loading

0 comments on commit 5c10154

Please sign in to comment.