From 8b72f403c0bd38e7df899104466b5124d08dd767 Mon Sep 17 00:00:00 2001 From: ted kaemming Date: Mon, 14 Sep 2020 16:13:56 -0700 Subject: [PATCH] ref(subscriptions): Move time shift from worker to consumer (#1323) --- snuba/subscriptions/consumer.py | 7 +++++-- snuba/subscriptions/worker.py | 5 +---- tests/subscriptions/test_consumer.py | 30 ++++++++++++++++++++++------ tests/subscriptions/test_worker.py | 19 ++---------------- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/snuba/subscriptions/consumer.py b/snuba/subscriptions/consumer.py index b14b5972f2..76a22d795d 100644 --- a/snuba/subscriptions/consumer.py +++ b/snuba/subscriptions/consumer.py @@ -88,9 +88,12 @@ class TickConsumer(Consumer[Tick]): # between B and C, since the message B was the first message received by # the consumer. - def __init__(self, consumer: Consumer[Any]) -> None: + def __init__( + self, consumer: Consumer[Any], time_shift: Optional[timedelta] = None + ) -> None: self.__consumer = consumer self.__previous_messages: MutableMapping[Partition, MessageDetails] = {} + self.__time_shift = time_shift if time_shift is not None else timedelta() def subscribe( self, @@ -139,7 +142,7 @@ def poll(self, timeout: Optional[float] = None) -> Optional[Message[Tick]]: Tick( Interval(previous_message.offset, message.offset), time_interval, - ), + ).time_shift(self.__time_shift), message.timestamp, ) else: diff --git a/snuba/subscriptions/worker.py b/snuba/subscriptions/worker.py index eb67fb4a35..2042cf2f74 100644 --- a/snuba/subscriptions/worker.py +++ b/snuba/subscriptions/worker.py @@ -4,7 +4,6 @@ import itertools import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from datetime import timedelta from typing import Mapping, NamedTuple, Optional, Sequence, Tuple from snuba.datasets.dataset import Dataset @@ -42,7 +41,6 @@ def __init__( producer: Producer[SubscriptionTaskResult], topic: Topic, metrics: MetricsBackend, - time_shift: Optional[timedelta] = None, ) -> None: self.__dataset = dataset self.__executor = executor @@ -50,7 +48,6 @@ def __init__( self.__producer = producer self.__topic = topic self.__metrics = metrics - self.__time_shift = time_shift if time_shift is not None else timedelta() self.__concurrent_gauge = Gauge(self.__metrics, "executor.concurrent") @@ -98,7 +95,7 @@ def process_message( # waiting for them to complete during ``flush_batch`` may exceed the # consumer poll timeout (or session timeout during consumer # rebalancing) and cause the entire batch to be have to be replayed. - tick = message.payload.time_shift(self.__time_shift) + tick = message.payload return [ SubscriptionTaskResultFuture( task, self.__executor.submit(self.__execute, task, tick) diff --git a/tests/subscriptions/test_consumer.py b/tests/subscriptions/test_consumer.py index 8d85b9e1cf..69da2c320e 100644 --- a/tests/subscriptions/test_consumer.py +++ b/tests/subscriptions/test_consumer.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import Mapping +from typing import Mapping, Optional import pytest @@ -19,7 +19,16 @@ def test_tick_time_shift() -> None: ) -def test_tick_consumer(clock: Clock, broker: DummyBroker[int]) -> None: +@pytest.mark.parametrize( + "time_shift", + [ + pytest.param(None, id="without time shift"), + pytest.param(timedelta(minutes=-5), id="with time shift"), + ], +) +def test_tick_consumer( + clock: Clock, broker: DummyBroker[int], time_shift: Optional[timedelta] +) -> None: epoch = datetime.fromtimestamp(clock.time()) topic = Topic("messages") @@ -33,7 +42,10 @@ def test_tick_consumer(clock: Clock, broker: DummyBroker[int]) -> None: inner_consumer = broker.get_consumer("group") - consumer = TickConsumer(inner_consumer) + consumer = TickConsumer(inner_consumer, time_shift=time_shift) + + if time_shift is None: + time_shift = timedelta() def assignment_callback(offsets: Mapping[Partition, int]) -> None: assignment_callback.called = True @@ -70,7 +82,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None: assert consumer.poll() == Message( Partition(topic, 0), 0, - Tick(offsets=Interval(0, 1), timestamps=Interval(epoch, epoch)), + Tick(offsets=Interval(0, 1), timestamps=Interval(epoch, epoch)).time_shift( + time_shift + ), epoch, ) @@ -88,7 +102,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None: assert consumer.poll() == Message( Partition(topic, 0), 1, - Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)), + Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift( + time_shift + ), epoch, ) @@ -157,7 +173,9 @@ def assignment_callback(offsets: Mapping[Partition, int]) -> None: assert consumer.poll() == Message( Partition(topic, 0), 1, - Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)), + Tick(offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift( + time_shift + ), epoch, ) diff --git a/tests/subscriptions/test_worker.py b/tests/subscriptions/test_worker.py index 0decbe0e9f..f7ac936d20 100644 --- a/tests/subscriptions/test_worker.py +++ b/tests/subscriptions/test_worker.py @@ -1,6 +1,6 @@ from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta -from typing import Iterable, Iterator, MutableMapping, Optional, Tuple +from typing import Iterable, Iterator, MutableMapping, Tuple from uuid import UUID, uuid1 import pytest @@ -49,17 +49,8 @@ def dataset() -> Iterator[Dataset]: yield dataset -@pytest.mark.parametrize( - "time_shift", - [ - pytest.param(None, id="without time shift"), - pytest.param(timedelta(minutes=-5), id="with time shift"), - ], -) def test_subscription_worker( - dataset: Dataset, - broker: DummyBroker[SubscriptionTaskResult], - time_shift: Optional[timedelta], + dataset: Dataset, broker: DummyBroker[SubscriptionTaskResult], ) -> None: result_topic = Topic("subscription-results") @@ -91,7 +82,6 @@ def test_subscription_worker( broker.get_producer(), result_topic, metrics, - time_shift=time_shift, ) now = datetime(2000, 1, 1) @@ -101,11 +91,6 @@ def test_subscription_worker( timestamps=Interval(now - (frequency * evaluations), now), ) - # If we are utilizing time shifting, push the tick time into the future so - # that time alignment is otherwise preserved during our test case. - if time_shift is not None: - tick = tick.time_shift(time_shift * -1) - result_futures = worker.process_message( Message(Partition(Topic("events"), 0), 0, tick, now) )