diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..f76e83b --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,44 @@ +name: Publish to PyPI +on: + release: + types: [created] +jobs: + build: + name: Build distribution + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install pypa/build + run: >- + python3 -m + pip install + build + --user + - name: Build a binary wheel and a source tarball + run: python3 -m build + - name: Store the distribution packages + uses: actions/upload-artifact@v4 + with: + name: python-package-distributions + path: dist/ + publish-to-pypi: + name: Publish to PyPI + needs: + - build + runs-on: ubuntu-latest + environment: + name: pypi + permissions: + id-token: write + steps: + - name: Download all the dists + uses: actions/download-artifact@v4 + with: + name: python-package-distributions + path: dist/ + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..d63b70c --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,31 @@ +# github action for running tests +# use pytest +name: Run tests +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: 3.12 + - name: Install dependencies + run: | + pip3 install -r requirements.txt + - name: Test with pytest + run: | + pip3 install pytest-cov + make test + - name: Upload Coverage to Codecov + uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: true + files: ./coverage.xml + token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..238b268 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +test: + pytest --cov=dagster_rudderstack dagster_rudderstack_tests --cov-report=xml \ No newline at end of file diff --git a/README.md b/README.md index 3b0ca42..dff9a48 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,54 @@ -# rudder-dagster -A Dagster library for triggering reverse ETL syncs in RudderStack +## dagster_rudderstack + +A Dagster library for triggering Reverse ETL syncs in RudderStack. + +### Installation + +Use pip to install the library. + +```bash +pip install dagster_rudderstack +``` + +### Configuration +Setup RudderStack resource with your [workspace access token](https://www.rudderstack.com/docs/dashboard-guides/personal-access-token/). + +```python +# resources.py +from dagster_rudderstack.resources.rudderstack import RudderStackRETLResource + +rudderstack_retl_resource = RudderStackRETLResource( + access_token="access_token") +``` +RudderStackRETLResource exposes other configurable parameters as well. Mostly default values for them would be recommended. +* request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng. +* request_retry_delay: Time (in seconds) to wait between each request retry. +* request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out. +* poll_interval: Time (in seconds) for polling status of triggered job. +* poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out. + +### Ops and Jobs + +Define ops and jobs with schedule. Provide the [connection id](https://www.rudderstack.com/docs/sources/reverse-etl/airflow-provider/#where-can-i-find-the-connection-id-for-my-reverse-etl-connection) for the sync job +```python +# jobs.py +from dagster import job, ScheduleDefinition, ScheduleDefinition +from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig +from .resources import rudderstack_retl_resource + +@job( + resource_defs={ + "retl_resource": rudderstack_retl_resource + } +) +def rs_retl_sync_job(): + rudderstack_sync_op() + +rudderstack_sync_schedule = ScheduleDefinition( + job=rs_retl_sync_job, + cron_schedule="* * * * *", # Runs every minute + run_config={"ops": {"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id")}}, + default_status=DefaultScheduleStatus.RUNNING +) +``` + diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..e69de29 diff --git a/dagster_rudderstack/__init__.py b/dagster_rudderstack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagster_rudderstack/ops/__init__.py b/dagster_rudderstack/ops/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagster_rudderstack/ops/retl.py b/dagster_rudderstack/ops/retl.py new file mode 100644 index 0000000..73e5d03 --- /dev/null +++ b/dagster_rudderstack/ops/retl.py @@ -0,0 +1,26 @@ +from dagster import Config, op, OpExecutionContext, In, Nothing, Out +from pydantic import Field + +from dagster_rudderstack.types import RudderStackRetlOutput +from ..resources.rudderstack import RudderStackRETLResource + + +class RudderStackRETLOpConfig(Config): + connection_id: str = Field( + json_schema_extra={"is_required": True}, + description="The connectionId for an rETL sync.", + ) + + +@op( + ins={"start_after": In(Nothing)}, + out=Out(RudderStackRetlOutput, description="The output of the sync run."), +) +def rudderstack_sync_op( + context: OpExecutionContext, + config: RudderStackRETLOpConfig, + retl_resource: RudderStackRETLResource, +): + context.log.info("config_param: " + config.connection_id) + output: RudderStackRetlOutput = retl_resource.start_and_poll(config.connection_id) + return output diff --git a/dagster_rudderstack/resources/__init__.py b/dagster_rudderstack/resources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagster_rudderstack/resources/rudderstack.py b/dagster_rudderstack/resources/rudderstack.py new file mode 100644 index 0000000..ce8df8d --- /dev/null +++ b/dagster_rudderstack/resources/rudderstack.py @@ -0,0 +1,210 @@ +import datetime +import logging +import requests +import time +from abc import abstractmethod +from dagster import ConfigurableResource, Failure, get_dagster_logger +from dagster._utils.cached_method import cached_method +from importlib.metadata import PackageNotFoundError, version +from pydantic import Field +from typing import Any, Dict, Mapping, Optional +from urllib.parse import urljoin + +from dagster_rudderstack.types import RudderStackRetlOutput + + +class RETLSyncStatus: + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + + +class RETLSyncType: + INCREMENTAL = "incremental" + FULL = "full" + + +DEFAULT_POLL_INTERVAL_SECONDS = 10 +DEFAULT_REQUEST_MAX_RETRIES = 3 +DEFAULT_RETRY_DELAY = 1 +DEFAULT_REQUEST_TIMEOUT = 30 +DEFAULT_RUDDERSTACK_API_ENDPOINT = "https://api.rudderstack.com" + + +class BaseRudderStackResource(ConfigurableResource): + request_max_retries: int = Field( + default=DEFAULT_REQUEST_MAX_RETRIES, + description=( + "The maximum number of times requests to the RudderStack API should be retried before failng." + ), + ) + request_retry_delay: float = Field( + default=DEFAULT_RETRY_DELAY, + description="Time (in seconds) to wait between each request retry.", + ) + request_timeout: int = Field( + default=DEFAULT_REQUEST_TIMEOUT, + description="Time (in seconds) after which the requests to RudderStack are declared timed out.", + ) + poll_interval: float = Field( + default=DEFAULT_POLL_INTERVAL_SECONDS, + description="Time (in seconds) for polling status of triggered job.", + ) + poll_timeout: float = Field( + default=None, + description="Time (in seconds) after which the polling for a triggered job is declared timed out.", + ) + + @property + @abstractmethod + def api_base_url(self) -> str: + raise NotImplementedError() + + @property + @abstractmethod + def request_headers(self) -> Dict[str, any]: + raise NotImplementedError() + + @property + @cached_method + def _log(self) -> logging.Logger: + return get_dagster_logger() + + def make_request( + self, + endpoint: str, + method: str = "POST", + params: Optional[Dict[str, Any]] = None, + data: Optional[Mapping[str, object]] = None, + ): + """Prepares and makes request to RudderStack API endpoint. + + Args: + method (str): The http method to be used for this request (e.g. "GET", "POST"). + endpoint (str): The RudderStack API endpoint to send request to. + params (Optional(dict)): Query parameters to pass to the API endpoint + + Returns: + Dict[str, Any]: Parsed json data from the response for this request. + """ + url = urljoin(self.api_base_url, endpoint) + headers = self.request_headers + num_retries = 0 + + while True: + try: + request_args: Dict[str, Any] = dict( + method=method, + url=url, + headers=headers, + timeout=self.request_timeout, + ) + if data: + request_args["json"] = data + response = requests.request(**request_args) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + self._log.error(f"Request to url: {url} failed: {e}") + if num_retries == self.request_max_retries: + break + num_retries += 1 + time.sleep(self.request_retry_delay) + + raise Failure("Exceeded max number of retries.") + + +class RudderStackRETLResource(BaseRudderStackResource): + access_token: str = Field( + json_schema_extra={"is_required": True}, description="Access Token" + ) + rs_cloud_url: str = Field( + default=DEFAULT_RUDDERSTACK_API_ENDPOINT, description="RudderStack cloud URL" + ) + + @property + def api_base_url(self) -> str: + return self.rs_cloud_url + + @property + def request_headers(self) -> Dict[str, any]: + try: + __version__ = version("dagster_rudderstack") + except PackageNotFoundError: + __version__ = "UnknownVersion" + return { + "authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "User-Agent": f"RudderDagster/{__version__}", + } + + def start_sync( + self, conn_id: str, sync_type: str = RETLSyncType.INCREMENTAL + ) -> str: + """Triggers a sync and returns runId if successful, else raises Failure. + + Args: + conn_id (str): + sync_type (str): + + Returns: + sync_id of the sync started. + """ + self._log.info(f"Triggering sync for retl connection id: {conn_id}") + return self.make_request( + endpoint=f"/v2/retl-connections/{conn_id}/start", + data={"syncType": sync_type}, + )["syncId"] + + def poll_sync(self, conn_id: str, sync_id: str) -> Dict[str, Any]: + """Polls for completion of a sync. If poll_timeout is set, raises Failure after timeout. + + Args: + conn_id (str): connetionId for an RETL sync. + sync_type (str): (optional) full or incremental. Default is incremental. + Returns: + Dict[str, Any]: Parsed json output from syncs endpoint. + """ + status_endpoint = f"/v2/retl-connections/{conn_id}/syncs/{sync_id}" + poll_start = datetime.datetime.now() + while True: + resp = self.make_request(endpoint=status_endpoint, method="GET") + sync_status = resp["status"] + self._log.info( + f"Polled status for syncId: {sync_id} for retl connection: {conn_id}, status: {sync_status}" + ) + if sync_status == RETLSyncStatus.SUCCEEDED: + self._log.info( + f"Sync finished for retl connection: {conn_id}, syncId: {sync_id}" + ) + return resp + elif sync_status == RETLSyncStatus.FAILED: + error_msg = resp.get("error", None) + raise Failure( + f"Sync for retl connection: {conn_id}, syncId: {sync_id} failed with error: {error_msg}" + ) + if ( + self.poll_timeout + and datetime.datetime.now() + > poll_start + datetime.timedelta(seconds=self.poll_timeout) + ): + raise Failure( + f"Polling for syncId: {sync_id} for retl connection: {conn_id} timed out." + ) + time.sleep(self.poll_interval) + + def start_and_poll( + self, conn_id: str, sync_type: str = RETLSyncType.INCREMENTAL + ) -> RudderStackRetlOutput: + """Triggers a sync and keeps polling till it completes. + + Args: + conn_id (str): connetionId for an RETL sync. + sync_type (str): (optional) full or incremental. Default is incremental. + Returns: + RudderStackRetlOutput: Details of the sync run. + """ + self._log.info(f"Trigger sync for connectionId: {conn_id} and wait for finish") + sync_id = self.start_sync(conn_id, sync_type) + sync_run_details = self.poll_sync(conn_id, sync_id) + return RudderStackRetlOutput(sync_run_details) diff --git a/dagster_rudderstack/types.py b/dagster_rudderstack/types.py new file mode 100644 index 0000000..f415319 --- /dev/null +++ b/dagster_rudderstack/types.py @@ -0,0 +1,35 @@ +from typing import Any, Dict, NamedTuple + + +class RudderStackRetlOutput( + NamedTuple( + "_RudderStackRetlOutput", + [ + ("sync_run_details", Dict[str, Any]), + ], + ) +): + """Contains details of a sync run. + + Attributes: + sync_run_details (Dict[str, Any]): Details of the sync run. + { + "id": "string", + "status": "running", + "startedAt": "2024-08-01T06:14:33.744Z", + "finishedAt": "2024-08-01T06:14:33.744Z", + "error": "string", + "metrics": { + "succeeded": { + "total": 0 + }, + "failed": { + "total": 0 + }, + "changed": { + "total": 0 + }, + "total": 0 + } + } + """ diff --git a/dagster_rudderstack_tests/__init__.py b/dagster_rudderstack_tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/dagster_rudderstack_tests/__init__.py @@ -0,0 +1 @@ + diff --git a/dagster_rudderstack_tests/test_ops.py b/dagster_rudderstack_tests/test_ops.py new file mode 100644 index 0000000..1cc515e --- /dev/null +++ b/dagster_rudderstack_tests/test_ops.py @@ -0,0 +1,144 @@ +import pytest +import responses +from unittest.mock import MagicMock +from dagster import Failure, build_op_context, ResourceDefinition, op +from dagster import job, RunConfig, Definitions +from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig +from dagster_rudderstack.resources.rudderstack import ( + RudderStackRETLResource, + RETLSyncStatus, +) +from dagster_rudderstack.types import RudderStackRetlOutput + + +@pytest.fixture +def mock_config(): + return RudderStackRETLOpConfig(connection_id="test_connection_id") + + +def test_rudderstack_sync_op(mock_config): + context = build_op_context() + retl_resource = MagicMock(spec=RudderStackRETLResource) + retl_resource.start_and_poll.return_value = RudderStackRetlOutput( + sync_run_details={"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED} + ) + ret_resource_def = ResourceDefinition.hardcoded_resource(retl_resource) + result = rudderstack_sync_op(context, mock_config, ret_resource_def) + retl_resource.start_and_poll.assert_called_with("test_connection_id") + assert result == RudderStackRetlOutput( + {"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED} + ) + + +def test_rudderstack_sync_job(mock_config): + @op + def dummy_op(): + pass + + @job + def rs_retl_test_sync_start_and_poll_job(): + rudderstack_sync_op(start_after=dummy_op()) + + defs = Definitions( + jobs=[rs_retl_test_sync_start_and_poll_job], + resources={ + "retl_resource": RudderStackRETLResource( + access_token="test_access_token", + rs_cloud_url="https://testapi.rudderstack.com", + poll_interval=0.1, + ) + }, + ) + + api_prefix = ( + "https://testapi.rudderstack.com/v2/retl-connections/test_connection_id/" + ) + with responses.RequestsMock() as rsps: + rsps.add( + rsps.POST, + api_prefix + "start", + json={"syncId": "test_sync_run_id"}, + ) + rsps.add( + rsps.GET, + api_prefix + "syncs/test_sync_run_id", + json={"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ) + rsps.add( + rsps.GET, + api_prefix + "syncs/test_sync_run_id", + json={"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED}, + ) + job_result = defs.get_job_def( + "rs_retl_test_sync_start_and_poll_job" + ).execute_in_process( + run_config=RunConfig( + ops={ + "rudderstack_sync_op": mock_config, + } + ) + ) + assert job_result.output_for_node( + "rudderstack_sync_op" + ) == RudderStackRetlOutput( + sync_run_details={ + "id": "test_sync_run_id", + "status": RETLSyncStatus.SUCCEEDED, + } + ) + + +def test_rudderstack_sync_job_timeout(mock_config): + @job + def rs_retl_test_sync_start_and_poll_job(): + rudderstack_sync_op() + + defs = Definitions( + jobs=[rs_retl_test_sync_start_and_poll_job], + resources={ + "retl_resource": RudderStackRETLResource( + access_token="test_access_token", + rs_cloud_url="https://testapi.rudderstack.com", + poll_interval=0.1, + poll_timeout=0.2, + ) + }, + ) + + api_prefix = ( + "https://testapi.rudderstack.com/v2/retl-connections/test_connection_id/" + ) + + with responses.RequestsMock() as rsps: + rsps.add( + rsps.POST, + api_prefix + "start", + json={"syncId": "test_sync_run_id"}, + ) + rsps.add( + rsps.GET, + api_prefix + "syncs/test_sync_run_id", + json={"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ) + rsps.add( + rsps.GET, + api_prefix + "syncs/test_sync_run_id", + json={"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ) + rsps.add( + rsps.GET, + api_prefix + "syncs/test_sync_run_id", + json={"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ) + with pytest.raises(Failure): + defs.get_job_def("rs_retl_test_sync_start_and_poll_job").execute_in_process( + run_config=RunConfig( + ops={ + "rudderstack_sync_op": mock_config, + } + ) + ) + + +if __name__ == "__main__": + pytest.main() diff --git a/dagster_rudderstack_tests/test_resources.py b/dagster_rudderstack_tests/test_resources.py new file mode 100644 index 0000000..41d8f8c --- /dev/null +++ b/dagster_rudderstack_tests/test_resources.py @@ -0,0 +1,144 @@ +import pytest +import requests +from unittest.mock import MagicMock, patch +from dagster_rudderstack.resources.rudderstack import ( + RudderStackRETLResource, + RETLSyncStatus, +) +from dagster import Failure + +from dagster_rudderstack.types import RudderStackRetlOutput + + +@pytest.fixture +def mock_retl_resource(): + return RudderStackRETLResource( + access_token="test_access_token", + rs_cloud_url="https://testapi.rudderstack.com", + poll_interval=0.1, + ) + + +def test_api_base_url(mock_retl_resource): + assert mock_retl_resource.api_base_url == "https://testapi.rudderstack.com" + + +def test_request_headers(mock_retl_resource): + headers = mock_retl_resource.request_headers + assert headers["authorization"] == "Bearer test_access_token" + assert headers["Content-Type"] == "application/json" + + +@patch("requests.request") +def test_make_request(mock_request, mock_retl_resource): + mock_request.return_value = MagicMock( + status_code=200, json=lambda: {"key": "value"} + ) + + response = mock_retl_resource.make_request(endpoint="/test-endpoint", method="GET") + + assert response == {"key": "value"} + mock_request.assert_called_once() + + +@patch("requests.request") +def test_make_request_failure(mock_request, mock_retl_resource): + mock_request.side_effect = requests.RequestException("Request failed") + + with pytest.raises(Failure, match="Exceeded max number of retries"): + mock_retl_resource.make_request(endpoint="/test-endpoint", method="GET") + + +@patch("requests.request") +def test_start_sync(mock_request, mock_retl_resource): + mock_request.return_value = MagicMock( + status_code=200, json=lambda: {"syncId": "test_sync_id"} + ) + + sync_id = mock_retl_resource.start_sync(conn_id="test_conn_id") + + assert sync_id == "test_sync_id" + mock_request.assert_called_once() + + +@patch("requests.request") +def test_poll_sync(mock_request, mock_retl_resource): + mock_request.side_effect = [ + MagicMock( + status_code=200, + json=lambda: {"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ), + MagicMock( + status_code=200, + json=lambda: {"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED}, + ), + ] + + result = mock_retl_resource.poll_sync( + conn_id="test_conn_id", sync_id="test_sync_run_id" + ) + + assert mock_request.call_count == 2 + assert result == {"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED} + + +@patch("requests.request") +def test_poll_sync_timeout(mock_request): + mock_retl_resource_with_timeout = RudderStackRETLResource( + access_token="test_access_token", + rs_cloud_url="https://testapi.rudderstack.com", + poll_interval=0.1, + poll_timeout=0.3, + ) + mock_request.return_value = MagicMock( + status_code=200, + json=lambda: {"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ) + + with pytest.raises(Failure): + mock_retl_resource_with_timeout.poll_sync( + conn_id="test_conn_id", sync_id="test_sync_run_id" + ) + + +@patch("requests.request") +def test_poll_sync_failure(mock_request, mock_retl_resource): + mock_request.side_effect = [ + MagicMock(status_code=200, json=lambda: {"status": RETLSyncStatus.RUNNING}), + MagicMock( + status_code=200, + json=lambda: {"status": RETLSyncStatus.FAILED, "error": "Test error"}, + ), + ] + + with pytest.raises( + Failure, + match="Sync for retl connection: test_conn_id, syncId: test_sync_id failed with error: Test error", + ): + mock_retl_resource.poll_sync(conn_id="test_conn_id", sync_id="test_sync_id") + + +@patch("requests.request") +def test_start_and_poll(mock_request, mock_retl_resource): + mock_request.side_effect = [ + MagicMock(status_code=200, json=lambda: {"syncId": "test_sync_run_id"}), + MagicMock( + status_code=200, + json=lambda: {"id": "test_sync_run_id", "status": RETLSyncStatus.RUNNING}, + ), + MagicMock( + status_code=200, + json=lambda: {"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED}, + ), + ] + + result = mock_retl_resource.start_and_poll(conn_id="test_conn_id") + + assert mock_request.call_count == 3 + assert result == RudderStackRetlOutput( + {"id": "test_sync_run_id", "status": RETLSyncStatus.SUCCEEDED} + ) + + +if __name__ == "__main__": + pytest.main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..14d8b1d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dagster_rudderstack" +version = "1.0.0" +readme = "README.md" +license = {file = "LICENSE"} +description = "A Dagster library for triggering Reverse ETL syncs in RudderStack." +dependencies = [ + "dagster", + "pydantic", + "pytest", + "requests", + "responses", + "setuptools" +] +requires-python = ">= 3.8" + +[tool.setuptools.packages.find] +exclude = ["dagster_rudderstack_tests"] + +[tool.dagster] +module_name = "dagster_rudderstack" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f449136 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +dagster==1.6.14 +pydantic==2.8.2 +pytest==7.3.1 +requests==2.32.3 +responses==0.25.3 +setuptools==67.6.1