Skip to content

Commit

Permalink
feat: Add a Docker container for taskworkers (#110)
Browse files Browse the repository at this point in the history
When testing in sandboxes, it's helpful to have a worker image that can be scaled up and down
easily, and that can be configured with different failure modes. Add a new docker image for a simple
worker that can be set to fail randomly a certain percentage of times.

The failure can be an actual task failure, or simply never returning a status for a task, causing it
to "timeout".
  • Loading branch information
evanh authored Jan 8, 2025
1 parent c25eb92 commit 8ff9bde
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 1 deletion.
35 changes: 35 additions & 0 deletions .github/workflows/image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,38 @@ jobs:
--tag ghcr.io/getsentry/taskbroker:${{ github.sha }} \
"${args[@]}" \
.
build-taskworker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- run: docker login --username '${{ github.actor }}' --password-stdin ghcr.io <<< "$GHCR_TOKEN"
env:
GHCR_TOKEN: ${{ secrets.GITHUB_TOKEN }}
if: github.event_name != 'pull_request'

- run: docker buildx create --driver docker-container --use

- name: build
run: |
set -euxo pipefail
if [ ${{ github.event_name }} = 'push' ]; then
args=(
--tag ghcr.io/getsentry/taskworker:latest
--tag ghcr.io/getsentry/taskworker:amd64-latest
--push
)
else
args=()
fi
docker buildx build \
--pull \
--platform linux/amd64 \
--cache-from ghcr.io/getsentry/taskworker:latest \
--cache-to type=inline \
--tag ghcr.io/getsentry/taskworker:${{ github.sha }} \
"${args[@]}" \
./python
13 changes: 13 additions & 0 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,16 @@ steps:
"./Dockerfile",
]
timeout: 1200s

- name: "gcr.io/kaniko-project/executor:v1.16.0"
id: runtime-worker-image
waitFor: ["-"]
args:
[
"--cache=true",
"--use-new-run",
"--destination=us-central1-docker.pkg.dev/$PROJECT_ID/taskworker/image:$COMMIT_SHA",
"--context=dir://./python",
"--dockerfile=./python/Dockerfile",
]
timeout: 1200s
15 changes: 15 additions & 0 deletions python/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app
COPY requirements-dev.txt /app/requirements-dev.txt

# Install any needed dependencies specified in requirements.txt
RUN pip install --no-cache-dir -r requirements-dev.txt

# Run app.py when the container launches
CMD ["python", "integration_tests/runner.py"]
Empty file.
30 changes: 30 additions & 0 deletions python/integration_tests/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from worker import ConfigurableTaskWorker, TaskWorkerClient


def main() -> None:
print("Starting worker")
grpc_host = os.getenv("SENTRY_TASKWORKER_GRPC_HOST", "127.0.0.1")
grpc_port = os.getenv("SENTRY_TASKWORKER_GRPC_PORT", "50051")
namespace = os.getenv("SENTRY_TASKWORKER_NAMESPACE", "test")
failure_rate = float(os.getenv("SENTRY_TASKWORKER_FAILURE_RATE", "0.0"))
timeout_rate = float(os.getenv("SENTRY_TASKWORKER_TIMEOUT_RATE", "0.0"))

if not (0 <= failure_rate <= 1) or not (0 <= timeout_rate <= 1):
raise ValueError("Failure rate and timeout rate must be between 0 and 1")

worker = ConfigurableTaskWorker(
client=TaskWorkerClient(f"{grpc_host}:{grpc_port}"),
namespace=namespace,
failure_rate=failure_rate,
timeout_rate=timeout_rate,
)

while True:
task = worker.fetch_task()
if task:
worker.process_task(task)


if __name__ == "__main__":
main()
45 changes: 44 additions & 1 deletion python/integration_tests/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import grpc
import time
import random
import logging

from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation, FetchNextTask, GetTaskRequest, SetTaskStatusRequest, TaskActivationStatus, TASK_ACTIVATION_STATUS_COMPLETE
from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation, FetchNextTask, GetTaskRequest, SetTaskStatusRequest, TaskActivationStatus, TASK_ACTIVATION_STATUS_COMPLETE, TASK_ACTIVATION_STATUS_FAILURE
from sentry_protos.sentry.v1.taskworker_pb2_grpc import ConsumerServiceStub


Expand Down Expand Up @@ -87,3 +89,44 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None:
status=TASK_ACTIVATION_STATUS_COMPLETE,
fetch_next_task=FetchNextTask(namespace=self._namespace),
)


class ConfigurableTaskWorker:
"""
A taskworker that can be configured to fail/timeout while processing tasks.
"""

def __init__(self, client: TaskWorkerClient, namespace: str | None = None, failure_rate: float = 0.0, timeout_rate: float = 0.0) -> None:
self.client = client
self._namespace: str | None = namespace
self._failure_rate: float = failure_rate
self._timeout_rate: float = timeout_rate

def fetch_task(self) -> TaskActivation | None:
try:
activation = self.client.get_task(self._namespace)
except grpc.RpcError:
print("get_task failed. Retrying in 1 second")
time.sleep(1)
return None

if not activation:
print("No task fetched")
return None

return activation

def process_task(self, activation: TaskActivation) -> TaskActivation | None:
if self._timeout_rate and random.random() < self._timeout_rate:
return None # Pretend that the task was dropped

if self._failure_rate and random.random() < self._failure_rate:
update_status = TASK_ACTIVATION_STATUS_FAILURE
else:
update_status = TASK_ACTIVATION_STATUS_COMPLETE

return self.client.update_task(
task_id=activation.id,
status=update_status,
fetch_next_task=FetchNextTask(namespace=self._namespace),
)

0 comments on commit 8ff9bde

Please sign in to comment.