Skip to content

Commit

Permalink
ref(subscriptions): Move time shift from worker to consumer (#1323)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Sep 14, 2020
1 parent f770d5c commit 8b72f40
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 29 deletions.
7 changes: 5 additions & 2 deletions snuba/subscriptions/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ class TickConsumer(Consumer[Tick]):
# between B and C, since the message B was the first message received by
# the consumer.

def __init__(self, consumer: Consumer[Any]) -> None:
def __init__(
self, consumer: Consumer[Any], time_shift: Optional[timedelta] = None
) -> None:
self.__consumer = consumer
self.__previous_messages: MutableMapping[Partition, MessageDetails] = {}
self.__time_shift = time_shift if time_shift is not None else timedelta()

def subscribe(
self,
Expand Down Expand Up @@ -139,7 +142,7 @@ def poll(self, timeout: Optional[float] = None) -> Optional[Message[Tick]]:
Tick(
Interval(previous_message.offset, message.offset),
time_interval,
),
).time_shift(self.__time_shift),
message.timestamp,
)
else:
Expand Down
5 changes: 1 addition & 4 deletions snuba/subscriptions/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import itertools
import time
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from datetime import timedelta
from typing import Mapping, NamedTuple, Optional, Sequence, Tuple

from snuba.datasets.dataset import Dataset
Expand Down Expand Up @@ -42,15 +41,13 @@ def __init__(
producer: Producer[SubscriptionTaskResult],
topic: Topic,
metrics: MetricsBackend,
time_shift: Optional[timedelta] = None,
) -> None:
self.__dataset = dataset
self.__executor = executor
self.__schedulers = schedulers
self.__producer = producer
self.__topic = topic
self.__metrics = metrics
self.__time_shift = time_shift if time_shift is not None else timedelta()

self.__concurrent_gauge = Gauge(self.__metrics, "executor.concurrent")

Expand Down Expand Up @@ -98,7 +95,7 @@ def process_message(
# waiting for them to complete during ``flush_batch`` may exceed the
# consumer poll timeout (or session timeout during consumer
# rebalancing) and cause the entire batch to be have to be replayed.
tick = message.payload.time_shift(self.__time_shift)
tick = message.payload
return [
SubscriptionTaskResultFuture(
task, self.__executor.submit(self.__execute, task, tick)
Expand Down
30 changes: 24 additions & 6 deletions tests/subscriptions/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import Mapping
from typing import Mapping, Optional

import pytest

Expand All @@ -19,7 +19,16 @@ def test_tick_time_shift() -> None:
)


def test_tick_consumer(clock: Clock, broker: DummyBroker[int]) -> None:
@pytest.mark.parametrize(
"time_shift",
[
pytest.param(None, id="without time shift"),
pytest.param(timedelta(minutes=-5), id="with time shift"),
],
)
def test_tick_consumer(
clock: Clock, broker: DummyBroker[int], time_shift: Optional[timedelta]
) -> None:
epoch = datetime.fromtimestamp(clock.time())

topic = Topic("messages")
Expand All @@ -33,7 +42,10 @@ def test_tick_consumer(clock: Clock, broker: DummyBroker[int]) -> None:

inner_consumer = broker.get_consumer("group")

consumer = TickConsumer(inner_consumer)
consumer = TickConsumer(inner_consumer, time_shift=time_shift)

if time_shift is None:
time_shift = timedelta()

def assignment_callback(offsets: Mapping[Partition, int]) -> None:
assignment_callback.called = True
Expand Down Expand Up @@ -70,7 +82,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None:
assert consumer.poll() == Message(
Partition(topic, 0),
0,
Tick(offsets=Interval(0, 1), timestamps=Interval(epoch, epoch)),
Tick(offsets=Interval(0, 1), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
epoch,
)

Expand All @@ -88,7 +102,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None:
assert consumer.poll() == Message(
Partition(topic, 0),
1,
Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)),
Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
epoch,
)

Expand Down Expand Up @@ -157,7 +173,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None:
assert consumer.poll() == Message(
Partition(topic, 0),
1,
Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)),
Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
epoch,
)

Expand Down
19 changes: 2 additions & 17 deletions tests/subscriptions/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Iterable, Iterator, MutableMapping, Optional, Tuple
from typing import Iterable, Iterator, MutableMapping, Tuple
from uuid import UUID, uuid1

import pytest
Expand Down Expand Up @@ -49,17 +49,8 @@ def dataset() -> Iterator[Dataset]:
yield dataset


@pytest.mark.parametrize(
"time_shift",
[
pytest.param(None, id="without time shift"),
pytest.param(timedelta(minutes=-5), id="with time shift"),
],
)
def test_subscription_worker(
dataset: Dataset,
broker: DummyBroker[SubscriptionTaskResult],
time_shift: Optional[timedelta],
dataset: Dataset, broker: DummyBroker[SubscriptionTaskResult],
) -> None:
result_topic = Topic("subscription-results")

Expand Down Expand Up @@ -91,7 +82,6 @@ def test_subscription_worker(
broker.get_producer(),
result_topic,
metrics,
time_shift=time_shift,
)

now = datetime(2000, 1, 1)
Expand All @@ -101,11 +91,6 @@ def test_subscription_worker(
timestamps=Interval(now - (frequency * evaluations), now),
)

# If we are utilizing time shifting, push the tick time into the future so
# that time alignment is otherwise preserved during our test case.
if time_shift is not None:
tick = tick.time_shift(time_shift * -1)

result_futures = worker.process_message(
Message(Partition(Topic("events"), 0), 0, tick, now)
)
Expand Down

0 comments on commit 8b72f40

Please sign in to comment.