Skip to content

Commit

Permalink
feat: dagster lib for retl syncs (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
RanjeetMishra authored Aug 13, 2024
1 parent e94091e commit 6664b07
Show file tree
Hide file tree
Showing 16 changed files with 722 additions and 2 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Publish to PyPI
on:
release:
types: [created]
jobs:
build:
name: Build distribution
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install pypa/build
run: >-
python3 -m
pip install
build
--user
- name: Build a binary wheel and a source tarball
run: python3 -m build
- name: Store the distribution packages
uses: actions/upload-artifact@v4
with:
name: python-package-distributions
path: dist/
publish-to-pypi:
name: Publish to PyPI
needs:
- build
runs-on: ubuntu-latest
environment:
name: pypi
permissions:
id-token: write
steps:
- name: Download all the dists
uses: actions/download-artifact@v4
with:
name: python-package-distributions
path: dist/
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
31 changes: 31 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# github action for running tests
# use pytest
name: Run tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: 3.12
- name: Install dependencies
run: |
pip3 install -r requirements.txt
- name: Test with pytest
run: |
pip3 install pytest-cov
make test
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
files: ./coverage.xml
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
test:
pytest --cov=dagster_rudderstack dagster_rudderstack_tests --cov-report=xml
56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,54 @@
# rudder-dagster
A Dagster library for triggering reverse ETL syncs in RudderStack
## dagster_rudderstack

A Dagster library for triggering Reverse ETL syncs in RudderStack.

### Installation

Use pip to install the library.

```bash
pip install dagster_rudderstack
```

### Configuration
Setup RudderStack resource with your [workspace access token](https://www.rudderstack.com/docs/dashboard-guides/personal-access-token/).

```python
# resources.py
from dagster_rudderstack.resources.rudderstack import RudderStackRETLResource

rudderstack_retl_resource = RudderStackRETLResource(
access_token="access_token")
```
RudderStackRETLResource exposes other configurable parameters as well. Mostly default values for them would be recommended.
* request_max_retries: The maximum number of times requests to the RudderStack API should be retried before failng.
* request_retry_delay: Time (in seconds) to wait between each request retry.
* request_timeout: Time (in seconds) after which the requests to RudderStack are declared timed out.
* poll_interval: Time (in seconds) for polling status of triggered job.
* poll_timeout: Time (in seconds) after which the polling for a triggered job is declared timed out.

### Ops and Jobs

Define ops and jobs with schedule. Provide the [connection id](https://www.rudderstack.com/docs/sources/reverse-etl/airflow-provider/#where-can-i-find-the-connection-id-for-my-reverse-etl-connection) for the sync job
```python
# jobs.py
from dagster import job, ScheduleDefinition, ScheduleDefinition
from dagster_rudderstack.ops.retl import rudderstack_sync_op, RudderStackRETLOpConfig
from .resources import rudderstack_retl_resource

@job(
resource_defs={
"retl_resource": rudderstack_retl_resource
}
)
def rs_retl_sync_job():
rudderstack_sync_op()

rudderstack_sync_schedule = ScheduleDefinition(
job=rs_retl_sync_job,
cron_schedule="* * * * *", # Runs every minute
run_config={"ops": {"rudderstack_sync_op": RudderStackRETLOpConfig(connection_id="connection_id")}},
default_status=DefaultScheduleStatus.RUNNING
)
```

Empty file added codecov.yml
Empty file.
Empty file added dagster_rudderstack/__init__.py
Empty file.
Empty file.
26 changes: 26 additions & 0 deletions dagster_rudderstack/ops/retl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dagster import Config, op, OpExecutionContext, In, Nothing, Out
from pydantic import Field

from dagster_rudderstack.types import RudderStackRetlOutput
from ..resources.rudderstack import RudderStackRETLResource


class RudderStackRETLOpConfig(Config):
connection_id: str = Field(
json_schema_extra={"is_required": True},
description="The connectionId for an rETL sync.",
)


@op(
ins={"start_after": In(Nothing)},
out=Out(RudderStackRetlOutput, description="The output of the sync run."),
)
def rudderstack_sync_op(
context: OpExecutionContext,
config: RudderStackRETLOpConfig,
retl_resource: RudderStackRETLResource,
):
context.log.info("config_param: " + config.connection_id)
output: RudderStackRetlOutput = retl_resource.start_and_poll(config.connection_id)
return output
Empty file.
210 changes: 210 additions & 0 deletions dagster_rudderstack/resources/rudderstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import datetime
import logging
import requests
import time
from abc import abstractmethod
from dagster import ConfigurableResource, Failure, get_dagster_logger
from dagster._utils.cached_method import cached_method
from importlib.metadata import PackageNotFoundError, version
from pydantic import Field
from typing import Any, Dict, Mapping, Optional
from urllib.parse import urljoin

from dagster_rudderstack.types import RudderStackRetlOutput


class RETLSyncStatus:
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"


class RETLSyncType:
INCREMENTAL = "incremental"
FULL = "full"


DEFAULT_POLL_INTERVAL_SECONDS = 10
DEFAULT_REQUEST_MAX_RETRIES = 3
DEFAULT_RETRY_DELAY = 1
DEFAULT_REQUEST_TIMEOUT = 30
DEFAULT_RUDDERSTACK_API_ENDPOINT = "https://api.rudderstack.com"


class BaseRudderStackResource(ConfigurableResource):
request_max_retries: int = Field(
default=DEFAULT_REQUEST_MAX_RETRIES,
description=(
"The maximum number of times requests to the RudderStack API should be retried before failng."
),
)
request_retry_delay: float = Field(
default=DEFAULT_RETRY_DELAY,
description="Time (in seconds) to wait between each request retry.",
)
request_timeout: int = Field(
default=DEFAULT_REQUEST_TIMEOUT,
description="Time (in seconds) after which the requests to RudderStack are declared timed out.",
)
poll_interval: float = Field(
default=DEFAULT_POLL_INTERVAL_SECONDS,
description="Time (in seconds) for polling status of triggered job.",
)
poll_timeout: float = Field(
default=None,
description="Time (in seconds) after which the polling for a triggered job is declared timed out.",
)

@property
@abstractmethod
def api_base_url(self) -> str:
raise NotImplementedError()

@property
@abstractmethod
def request_headers(self) -> Dict[str, any]:
raise NotImplementedError()

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

def make_request(
self,
endpoint: str,
method: str = "POST",
params: Optional[Dict[str, Any]] = None,
data: Optional[Mapping[str, object]] = None,
):
"""Prepares and makes request to RudderStack API endpoint.
Args:
method (str): The http method to be used for this request (e.g. "GET", "POST").
endpoint (str): The RudderStack API endpoint to send request to.
params (Optional(dict)): Query parameters to pass to the API endpoint
Returns:
Dict[str, Any]: Parsed json data from the response for this request.
"""
url = urljoin(self.api_base_url, endpoint)
headers = self.request_headers
num_retries = 0

while True:
try:
request_args: Dict[str, Any] = dict(
method=method,
url=url,
headers=headers,
timeout=self.request_timeout,
)
if data:
request_args["json"] = data
response = requests.request(**request_args)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
self._log.error(f"Request to url: {url} failed: {e}")
if num_retries == self.request_max_retries:
break
num_retries += 1
time.sleep(self.request_retry_delay)

raise Failure("Exceeded max number of retries.")


class RudderStackRETLResource(BaseRudderStackResource):
access_token: str = Field(
json_schema_extra={"is_required": True}, description="Access Token"
)
rs_cloud_url: str = Field(
default=DEFAULT_RUDDERSTACK_API_ENDPOINT, description="RudderStack cloud URL"
)

@property
def api_base_url(self) -> str:
return self.rs_cloud_url

@property
def request_headers(self) -> Dict[str, any]:
try:
__version__ = version("dagster_rudderstack")
except PackageNotFoundError:
__version__ = "UnknownVersion"
return {
"authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"User-Agent": f"RudderDagster/{__version__}",
}

def start_sync(
self, conn_id: str, sync_type: str = RETLSyncType.INCREMENTAL
) -> str:
"""Triggers a sync and returns runId if successful, else raises Failure.
Args:
conn_id (str):
sync_type (str):
Returns:
sync_id of the sync started.
"""
self._log.info(f"Triggering sync for retl connection id: {conn_id}")
return self.make_request(
endpoint=f"/v2/retl-connections/{conn_id}/start",
data={"syncType": sync_type},
)["syncId"]

def poll_sync(self, conn_id: str, sync_id: str) -> Dict[str, Any]:
"""Polls for completion of a sync. If poll_timeout is set, raises Failure after timeout.
Args:
conn_id (str): connetionId for an RETL sync.
sync_type (str): (optional) full or incremental. Default is incremental.
Returns:
Dict[str, Any]: Parsed json output from syncs endpoint.
"""
status_endpoint = f"/v2/retl-connections/{conn_id}/syncs/{sync_id}"
poll_start = datetime.datetime.now()
while True:
resp = self.make_request(endpoint=status_endpoint, method="GET")
sync_status = resp["status"]
self._log.info(
f"Polled status for syncId: {sync_id} for retl connection: {conn_id}, status: {sync_status}"
)
if sync_status == RETLSyncStatus.SUCCEEDED:
self._log.info(
f"Sync finished for retl connection: {conn_id}, syncId: {sync_id}"
)
return resp
elif sync_status == RETLSyncStatus.FAILED:
error_msg = resp.get("error", None)
raise Failure(
f"Sync for retl connection: {conn_id}, syncId: {sync_id} failed with error: {error_msg}"
)
if (
self.poll_timeout
and datetime.datetime.now()
> poll_start + datetime.timedelta(seconds=self.poll_timeout)
):
raise Failure(
f"Polling for syncId: {sync_id} for retl connection: {conn_id} timed out."
)
time.sleep(self.poll_interval)

def start_and_poll(
self, conn_id: str, sync_type: str = RETLSyncType.INCREMENTAL
) -> RudderStackRetlOutput:
"""Triggers a sync and keeps polling till it completes.
Args:
conn_id (str): connetionId for an RETL sync.
sync_type (str): (optional) full or incremental. Default is incremental.
Returns:
RudderStackRetlOutput: Details of the sync run.
"""
self._log.info(f"Trigger sync for connectionId: {conn_id} and wait for finish")
sync_id = self.start_sync(conn_id, sync_type)
sync_run_details = self.poll_sync(conn_id, sync_id)
return RudderStackRetlOutput(sync_run_details)
Loading

0 comments on commit 6664b07

Please sign in to comment.