diff --git a/docs/implementation/index.md b/docs/implementation/index.md index 06a2cec9a..361c61934 100644 --- a/docs/implementation/index.md +++ b/docs/implementation/index.md @@ -13,6 +13,7 @@ metrics logging state at_least_once +target_batch_full ``` ## How to use the implementation reference material diff --git a/docs/implementation/target_batch_full.md b/docs/implementation/target_batch_full.md new file mode 100644 index 000000000..09501c0d6 --- /dev/null +++ b/docs/implementation/target_batch_full.md @@ -0,0 +1,59 @@ +# Target: Batch Full + +The SDK automatically handles creating and releasing properly sized batches. + +## The Basics + +A Tap sends records messages to the Target. Those messages are deserialized and sent to the Target method `_process_record_message`. After each record is processed, one question is asked: + +> Is the batch full? (see [`Sink.is_full`](singer_sdk.Sink.is_full)) + +If the answer is `True`, then the records currently held in the `Sink._pending_batch` dict are drained. The drain process is managed by the `Target.drain_one` method. Records get written, counters get reset, and on to filling the next batch with records. + +## How a Batch is Measured and "Full" is Defined + +You need to know three things to determine if something is full + +1. The unit of measure +2. The level at which an object is determined full +3. The current measurement of the object + +The units of measure used with `Sink._pending_batch` are **rows** and **time** in **seconds**. The full level marks come from the Meltano Target configuration options of `batch_size_rows`, `batch_wait_limit_seconds`. If the configuration options are not present, the defined constants of `MAX_SIZE_DEFAULT` and `WAIT_LIMIT_SECONDS_DEFAULT` are used. How is `Sink._pending_batch` measured? To measure **rows**, the count of records read from the Tap is used. The current row count is available via the property `Sink.current_size`. To measure **seconds**, a batch timer is utilized. + +There are four “is full” scenarios and each one has a function that looks at the current batch and returns `True` if it is full or `False` if the batch can take more records. + +1. Row limit has been reached. `Sink.is_full_rows` +2. Wait limit is seconds has been reached. `Sink.is_too_old` +3. Row limit or Wait limit is seconds has been reached. `Sink.is_full_rows_and_too_old` +4. Row limit managed by the batch timer has been reached. `Sink.is_full_dynamic` + +Based on the Meltano Target configuration option(s), `Sink.set_drain_function` sets the `_drain_function` attribute to the appropriate method. This method is then called in `Sink.is_full` at the end of each Target method `_process_record_message` cycle. When `Sink.is_full` is checked and returns `True` the message: +``` +"Target sink for [Stream Name] is full. Current size is [Current Size]. Draining...", +``` + +is logged to the console and `Target.drain_one(Sink)` is called. + +## Explanation of the Four "is full" Scenarios + +### Rows limit has been reached (`Sink.is_full_rows`) + +To know if something is full you need to know how much it currently holds and at what point to consider it full. Sinks have the property `current_size` which gives you the number of records read and placed into the pending batch. The `current_size` gives us how much the current batch is holding. Now that we know the `current_size` we need to determine if the current size is considered full. The Sink property of `max_size` gives us the integer that defines the full mark. The property `max_size` returns the Sink’s internal variable `_batch_size_rows` if not `None` or the `DEFAULT_MAX_SIZE` constant which is `10000`. + +Both the `Sink.current_size` and `Sink.max_size` are used to calculate the `bool` value returned by the property `Sink.is_full`. If `Sink.current_size` is greater than or equal to `Sink.max_size` the batch is full and `Sink.is_full` would return `True`. When `Sink.is_full` is checked at the end of `Target._process_record_message` and returns `True` the message "Target sink for 'stream_name' is full. A log message is emitted to stderr and `Target.drain_one(Sink)` is called. + +### Wait limit in seconds has been reached (`Sink.is_too_old`) + +To know if something is too old you need to know how much time has passed and how much time needs to pass to be considered old. When the Meltano Target configuration option of `batch_wait_limit_seconds` is present and set, the internal variable `_sink_timer` is initialized with an instance of the `BatchPerfTimer` class. The `BatchPerfTimer` class is a batch-specific stopwatch. The timer is accessible via the property [`Sink.sink_timer`](singer_sdk.Sink.sink_timer). Right after the timer is initialized, the stopwatch is started with `Sink.sink_timer.start()`. We can see how much time has passed by running `Sink.sink_timer.on_the_clock()`. The property `Sink.batch_wait_limit_seconds` indicates how much time needs to pass for a batch to be considered old. + +Both the `Sink.sink_timer.on_the_clock()` method and the `Sink.batch_wait_limit_seconds` attribute are used to calculate the boolean value returned by [`Sink.is_too_old`](singer_sdk.Sink.is_too_old). If `Sink.sink_timer.on_the_clock()` is greater than or equal to `Sink.batch_wait_limit_seconds`, the batch is considered full and `Sink.is_full` would return `True`. Again, a log message is emitted to stderr and `Target.drain_one(Sink)` is called. The [`Target.drain_one(Sink)`](singer_sdk.Target.drain_one) method calls `Sink._lap_manager`, which stops the timer, calculates the lap time, and restarts the timer. + +### Rows or Wait limit has been reached (`Sink.is_full_rows_and_too_old`) + +The previously described `is_full_rows` and `is_too_old` methods are run and their results are held in a tuple. If `True` is present in the tuple the function returns `True` so `is_full` will return `True`. When `Sink.is_full` is checked at the end of `Target._process_record_message` and returns `True` a log messages is emitted to stderr and [`Target.drain_one(Sink)`](singer_sdk.Target.drain_one) is called. The [`Target.drain_one(Sink)`](singer_sdk.Target.drain_one) method calls `Sink._lap_manager` which stops the timer, calculates the lap time, and restarts the timer. + +### Rows limit managed by `Sink.sink_timer.counter_based_max_size` has been reached. `Sink.is_full_dynamic` + +When the Meltano Target configuration option `batch_dynamic_management` is set to `True` you are asking the `Sink.sink_timer` to find the maximum rows is full mark that keeps the time to fill a batch with records and write those records to the Target's target within the time in seconds given. + +The `Sink.sink_timer` is passed the given `batch_size_rows` or the `DEFAULT_MAX_SIZE` constant which is `10000` if it is `None` and is also passed the given `batch_wait_limit_seconds` if present or the `WAIT_LIMIT_SECONDS_DEFAULT` constant which is `30` if it is `None`. Internally the `rows` passed turns into `Sink.sink_timer.SINK_MAX_SIZE_CEILING` which is the max size a batch can reach. The `time` in `seconds` passed turns into `Sink.sink_timer.max_perf_counter` which is the time in seconds a full cycle should take. The attribute `Sink.sink_timer.sink_max_size` starts at a predefined size of `100`. During the `Target.drain_one(Sink)` process `Sink._lap_manager` is called and the timer method `counter_based_max_size` runs and checks if `Sink.sink_timer.perf_diff`, which is `max_perf_counter` - `lap_time`, is greater than `Sink.sink_timer.perf_diff_allowed_max` or less than `Sink.sink_timer.perf_diff_allowed_min`. If `Sink.sink_timer.perf_diff` is greater than `Sink.sink_timer.perf_diff_allowed_max` the `Sink.sink_timer.sink_max_size` is increased as long as the `Sink.sink_timer.sink_max_size` is less than `Sink.sink_timer.SINK_MAX_SIZE_CEILING`. If `Sink.sink_timer.perf_diff` is less than `Sink.sink_timer.perf_diff_allowed_min` the `Sink.sink_timer.sink_max_size` is reduced. If the `Sink.sink_timer.perf_diff` is between `Sink.sink_timer.perf_diff_allowed_max` and `Sink.sink_timer.perf_diff_allowed_min` no correction to `Sink.sink_timer.sink_max_size` is made since the optimal rows size has been reached. This process is repeated when each `Sink` is initialized and starts processing records. diff --git a/poetry.lock b/poetry.lock index 872a9a259..1abfeb6cd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "alabaster" diff --git a/singer_sdk/helpers/_perftimer.py b/singer_sdk/helpers/_perftimer.py new file mode 100644 index 000000000..4ff795d68 --- /dev/null +++ b/singer_sdk/helpers/_perftimer.py @@ -0,0 +1,130 @@ +"""Performance timers which deal with dynamic timing events.""" + +from __future__ import annotations + +import time + + +class PerfTimerError(Exception): + """A custom exception used to report errors in use of BatchPerfTimer class.""" + + +class PerfTimer: + """A Basic Performance Timer Class.""" + + _start_time: float | None = None + _stop_time: float | None = None + _lap_time: float | None = None + + @property + def start_time(self) -> float | None: + return self._start_time + + @property + def stop_time(self) -> float | None: + return self._stop_time + + @property + def lap_time(self) -> float | None: + return self._lap_time + + def start(self) -> None: + """Start the timer.""" + if self._start_time is not None: + msg = "Timer is running. Use .stop() to stop it" + raise PerfTimerError(msg) + + self._start_time = time.perf_counter() + + def on_the_clock(self) -> float: + """Give the time on the clock.""" + if self._start_time is None: + msg = "Timer is not running. Use .start() to start it" + raise PerfTimerError(msg) + + return ( + time.perf_counter() - self._start_time + if self._start_time is not None + else 0.0 + ) + + def stop(self) -> None: + """Stop the timer, Stores the elapsed time, and reset.""" + if self._start_time is None: + msg = "Timer is not running. Use .start() to start it" + raise PerfTimerError(msg) + + self._stop_time = time.perf_counter() + self._lap_time = self._stop_time - self._start_time + self._start_time = None + self._stop_time = None + + +class BatchPerfTimer(PerfTimer): + """The Performance Timer for Target bulk inserts.""" + + def __init__( + self, + max_size: int, + max_perf_counter: float, + ) -> None: + self.SINK_MAX_SIZE_CEILING: int = max_size + self._max_perf_counter: float = max_perf_counter + + SINK_MAX_SIZE_CEILING: int + """The max size a bulk insert can be""" + + _sink_max_size: int = 100 + """Hold the calculated batch size""" + + @property + def sink_max_size(self) -> int: + """The current MAX_SIZE_DEFAULT.""" + return self._sink_max_size + + @property + def max_perf_counter(self) -> float: + """How many seconds can pass before a insert.""" + return self._max_perf_counter + + @property + def perf_diff_allowed_min(self) -> float: + """The minimum negative variance allowed, 1/3 worse than wanted.""" + return -1.0 * (self.max_perf_counter * 0.33) + + @property + def perf_diff_allowed_max(self) -> float: + """The maximum positive variance allowed, 1/4 better than wanted.""" + return self.max_perf_counter * 0.25 + + @property + def perf_diff(self) -> float: + """Difference between wanted elapsed time and actual elapsed time.""" + diff = self.max_perf_counter - self.lap_time if self.lap_time else 0 + return float(diff) + + def counter_based_max_size(self) -> None: # noqa: C901 + """Calculate performance based batch size.""" + correction = 0 + if self.perf_diff < self.perf_diff_allowed_min: + if self.sink_max_size >= 15000: # noqa: PLR2004 + correction = -5000 + elif self.sink_max_size >= 10000: # noqa: PLR2004 + correction = -1000 + elif self.sink_max_size >= 1000: # noqa: PLR2004 + correction = -100 + elif self.sink_max_size > 10: # noqa: PLR2004 + correction = -10 + if ( + self.perf_diff >= self.perf_diff_allowed_max + and self.sink_max_size < self.SINK_MAX_SIZE_CEILING + ): + if self.sink_max_size >= 10000: # noqa: PLR2004 + correction = 10000 + elif self.sink_max_size >= 1000: # noqa: PLR2004 + correction = 1000 + elif self.sink_max_size >= 100: # noqa: PLR2004 + correction = 100 + elif self.sink_max_size >= 10: # noqa: PLR2004 + correction = 10 + self._sink_max_size += correction diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index b48d290a4..805edd65b 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -152,6 +152,30 @@ default=True, ), ).to_dict() +BATCH_SIZE_ROWS_CONFIG = PropertiesList( + Property( + "batch_size_rows", + IntegerType, + description="Maximum number of rows in each batch.", + ), +).to_dict() +BATCH_WAIT_LIMIT_SECONDS_CONFIG = PropertiesList( + Property( + "batch_wait_limit_seconds", + IntegerType, + description="Maximum time to elapse for a batch to fill, drain, and load.", + ), +).to_dict() +BATCH_DYNAMIC_MANAGEMENT_CONFIG = PropertiesList( + Property( + "batch_dynamic_management", + BooleanType, + description=( + "Manages sink_max_size per stream to be" + "the largest size for the batch wait limit." + ), + ), +).to_dict() class TargetLoadMethods(str, Enum): diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..504bb7119 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -33,6 +33,7 @@ datetime_fromisoformat, time_fromisoformat, ) +from singer_sdk.helpers._perftimer import BatchPerfTimer from singer_sdk.helpers._typing import ( DatetimeErrorTreatmentEnum, get_datelike_property_type, @@ -131,6 +132,7 @@ class Sink(metaclass=abc.ABCMeta): logger: Logger MAX_SIZE_DEFAULT = 10000 + WAIT_LIMIT_SECONDS_DEFAULT = 30 validate_field_string_format = False """Enable JSON schema format validation, for example `date-time` string fields.""" @@ -138,6 +140,8 @@ class Sink(metaclass=abc.ABCMeta): fail_on_record_validation_exception: bool = True """Interrupt the target execution when a record fails schema validation.""" + _drain_function: t.Callable[[], bool] | None = None + def __init__( self, target: Target, @@ -182,6 +186,31 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 + # Batch Performace Timer + self._batch_size_rows: int | None = target.config.get( + "batch_size_rows", + ) + self._batch_wait_limit_seconds: int | None = target.config.get( + "batch_wait_limit_seconds", + ) + self._batch_dynamic_management: bool = target.config.get( + "batch_dynamic_management", + False, + ) + + self._sink_timer: BatchPerfTimer | None = None + + if self._batch_wait_limit_seconds is not None or self._batch_dynamic_management: + self._sink_timer = BatchPerfTimer( + self._batch_size_rows + if self._batch_size_rows is not None + else self.MAX_SIZE_DEFAULT, + self.WAIT_LIMIT_SECONDS_DEFAULT + if self._batch_wait_limit_seconds is None + else self._batch_wait_limit_seconds, + ) + self._sink_timer.start() + self._validator: BaseJSONSchemaValidator | None = self.get_validator() @cached_property @@ -248,16 +277,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002 return {} # Size properties - - @property - def max_size(self) -> int: - """Get max batch size. - - Returns: - Max number of records to batch before `is_full=True` - """ - return self.MAX_SIZE_DEFAULT - @property def current_size(self) -> int: """Get current batch size. @@ -269,13 +288,122 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Check against size limit. + """Calls the size limit check funtion. + + Returns: + True if the sink needs to be drained. + """ + if self._drain_function is None: + self.set_drain_function() + + return self._drain_function() if self._drain_function is not None else True + + def is_full_rows(self) -> bool: + """Check against limit in rows. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size + def is_too_old(self) -> bool: + """Check against limit in seconds. + + Returns: + True if the sink needs to be drained. + """ + return ( + ( + self.sink_timer.on_the_clock() > self.batch_wait_limit_seconds + if self.sink_timer.start_time is not None + and self.batch_wait_limit_seconds is not None + else False + ) + if self.sink_timer is not None + else False + ) + + def is_full_rows_and_too_old(self) -> bool: + """Check against limit in rows and seconds. + + Returns: + True if the sink needs to be drained. + """ + return True in (self.is_full_rows(), self.is_too_old()) + + def is_full_dynamic(self) -> bool: + """Check against limit in caclulated limit in rows. + + Returns: + True if the sink needs to be drained. + """ + return ( + self.current_size >= self.sink_timer.sink_max_size + if self.sink_timer is not None + else False + ) + + def set_drain_function(self) -> None: + """Return the function to use in is_full.""" + if self.batch_dynamic_management: + self._drain_function = self.is_full_dynamic + elif self.batch_wait_limit_seconds is not None: + if self.batch_size_rows is not None: + self._drain_function = self.is_full_rows_and_too_old + else: + self._drain_function = self.is_too_old + else: + self._drain_function = self.is_full_rows + + @property + def batch_size_rows(self) -> int | None: + """Get batch_size_rows object. + + Returns: + A batch_size_rows object. + """ + return self._batch_size_rows + + @property + def batch_wait_limit_seconds(self) -> int | None: + """Get batch_wait_limit_seconds object. + + Returns: + A batch_wait_limit_seconds object. + """ + return self._batch_wait_limit_seconds + + @property + def batch_dynamic_management(self) -> bool: + """Get batch_dynamic_management object. + + Returns: + A batch_dynamic_management object. + """ + return self._batch_dynamic_management + + @property + def sink_timer(self) -> BatchPerfTimer | None: + """Get sink_timer object. + + Returns: + A sink_timer object. + """ + return self._sink_timer + + @property + def max_size(self) -> int: + """Get max batch size. + + Returns: + Max number of records to batch before `is_full=True` + """ + return ( + self.batch_size_rows + if self.batch_size_rows is not None + else self.MAX_SIZE_DEFAULT + ) + # Tally methods @t.final @@ -366,6 +494,30 @@ def key_properties(self) -> t.Sequence[str]: """ return self._key_properties + # Timer Management + + def lap_manager(self) -> None: + """Start and Stop the Perf Time during drain. + + This method is called when the target triggers a drain on this sink. + """ + if self.sink_timer is not None: + if self.sink_timer.start_time is not None: + self.sink_timer.stop() + timer_msg: str = ( + "max_size: %.0f, min_diff: %.2f, lap_diff: %.4f, max_diff: %.2f" + ) + if self.batch_dynamic_management: + self.logger.info( + timer_msg, + self.sink_timer.sink_max_size, + self.sink_timer.perf_diff_allowed_min, + self.sink_timer.perf_diff, + self.sink_timer.perf_diff_allowed_max, + ) + self.sink_timer.counter_based_max_size() + self.sink_timer.start() + # Record processing def _add_sdc_metadata_to_record( diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 5a78bf362..74b8d4100 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -18,6 +18,9 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + BATCH_DYNAMIC_MANAGEMENT_CONFIG, + BATCH_SIZE_ROWS_CONFIG, + BATCH_WAIT_LIMIT_SECONDS_CONFIG, TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, @@ -363,8 +366,9 @@ def _process_record_message(self, message_dict: dict) -> None: if sink.is_full: self.logger.info( - "Target sink for '%s' is full. Draining...", + "Target sink for '%s' is full. Current size is '%s'. Draining...", sink.stream_name, + sink.current_size, ) self.drain_one(sink) @@ -508,6 +512,7 @@ def drain_one(self, sink: Sink) -> None: draining_status = sink.start_drain() sink.process_batch(draining_status) + sink.lap_manager() sink.mark_drained() def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None: @@ -609,6 +614,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: target_jsonschema["properties"][k] = v _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) + _merge_missing(BATCH_SIZE_ROWS_CONFIG, config_jsonschema) + _merge_missing(BATCH_WAIT_LIMIT_SECONDS_CONFIG, config_jsonschema) + _merge_missing(BATCH_DYNAMIC_MANAGEMENT_CONFIG, config_jsonschema) _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) capabilities = cls.capabilities diff --git a/tests/core/targets/test_target_sql.py b/tests/core/targets/test_target_sql.py index fd71c0aeb..7a5911b2f 100644 --- a/tests/core/targets/test_target_sql.py +++ b/tests/core/targets/test_target_sql.py @@ -46,5 +46,11 @@ class MyTarget(SQLTargetMock, capabilities=capabilities): pass about = MyTarget._get_about_info() - default_settings = {"add_record_metadata", "load_method"} + default_settings = { + "add_record_metadata", + "load_method", + "batch_dynamic_management", + "batch_size_rows", + "batch_wait_limit_seconds", + } assert set(about.settings["properties"]) == expected_settings | default_settings diff --git a/tests/core/test_perf_timer.py b/tests/core/test_perf_timer.py new file mode 100644 index 000000000..c3abce8df --- /dev/null +++ b/tests/core/test_perf_timer.py @@ -0,0 +1,117 @@ +"""Perf Timer tests.""" + +from __future__ import annotations + +import time + +import pytest + +from singer_sdk.helpers._perftimer import BatchPerfTimer, PerfTimer, PerfTimerError + + +def test_perftimer_properties(): + timer: PerfTimer = PerfTimer() + timer._start_time = 1.1 + timer._stop_time = 1.10 + timer._lap_time = 0.09 + assert timer.start_time is timer._start_time + assert timer.stop_time is timer.stop_time + assert timer._lap_time is timer.lap_time + assert timer.start_time == 1.1 + assert timer.stop_time == 1.10 + assert timer.lap_time == 0.09 + + +def test_perftimer_actions(): + timer: PerfTimer = PerfTimer() + timer.start() + assert timer.start_time is not None + assert timer.stop_time is None + assert timer.lap_time is None + time.sleep(1.1) + assert timer.on_the_clock() >= 1 + timer.stop() + assert timer.lap_time >= 1 + assert timer.lap_time < 1.5 + assert timer.start_time is None + assert timer.stop_time is None + + +def test_perftimer_errors(): + timer: PerfTimer = PerfTimer() + with pytest.raises( + PerfTimerError, + match=r"Timer is not running. Use .start\(\) to start it", + ): + timer.stop() + with pytest.raises( + PerfTimerError, + match=r"Timer is not running. Use .start\(\) to start it", + ): + timer.on_the_clock() + # starting a timer to test start() error + timer.start() + with pytest.raises( + PerfTimerError, + match=r"Timer is running. Use .stop\(\) to stop it", + ): + timer.start() + # stopping the timer at the end of the test + timer.stop() + + +def test_batchperftimer_properties(): + batchtimer: BatchPerfTimer = BatchPerfTimer(100000, 30) + batchtimer._lap_time = 20 + assert batchtimer._sink_max_size is batchtimer.sink_max_size + assert batchtimer._max_perf_counter is batchtimer.max_perf_counter + assert batchtimer.SINK_MAX_SIZE_CEILING == 100000 + assert batchtimer.sink_max_size == 100 + assert batchtimer.max_perf_counter == 30 + assert batchtimer.perf_diff_allowed_max == 7.5 + assert batchtimer.perf_diff_allowed_min == -9.9 + assert batchtimer.perf_diff == 10 + + +def test_batchperftimer_counter_based_max_size_additive(): + batchtimer: BatchPerfTimer = BatchPerfTimer(100000, 1) + batchtimer._lap_time = 0.24 + assert batchtimer.perf_diff > batchtimer.perf_diff_allowed_max + batchtimer._sink_max_size = 10 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 20 + batchtimer._sink_max_size = 100 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 200 + batchtimer._sink_max_size = 1000 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 2000 + batchtimer._sink_max_size = 10000 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 20000 + batchtimer._sink_max_size = 100000 + assert batchtimer.sink_max_size == batchtimer.SINK_MAX_SIZE_CEILING + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 100000 + + +def test_batchperftimer_counter_based_max_size_reductive(): + batchtimer: BatchPerfTimer = BatchPerfTimer(100000, 1) + batchtimer._lap_time = 1.5 + batchtimer._sink_max_size = 100000 + assert batchtimer.perf_diff < batchtimer.perf_diff_allowed_min + assert batchtimer.sink_max_size == batchtimer.SINK_MAX_SIZE_CEILING + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 95000 + batchtimer._sink_max_size = 10000 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 9000 + batchtimer._sink_max_size = 1000 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 900 + batchtimer._sink_max_size = 100 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 90 + batchtimer._sink_max_size = 10 + batchtimer.counter_based_max_size() + assert batchtimer.sink_max_size == 10 diff --git a/tests/core/test_target_base.py b/tests/core/test_target_base.py index eaff6d6a1..0c135f96d 100644 --- a/tests/core/test_target_base.py +++ b/tests/core/test_target_base.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy +import time import pytest @@ -77,6 +78,9 @@ def test_target_about_info(): assert "flattening_max_depth" in about.settings["properties"] assert "batch_config" in about.settings["properties"] assert "add_record_metadata" in about.settings["properties"] + assert "batch_size_rows" in about.settings["properties"] + assert "batch_wait_limit_seconds" in about.settings["properties"] + assert "batch_dynamic_management" in about.settings["properties"] def test_sql_get_sink(): @@ -142,3 +146,231 @@ def test_add_sqlsink_and_get_sink(): target.get_sink( "bar", ) + + +def test_batch_size_rows_and_max_size(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_default = TargetMock() + sink_default = BatchSinkMock( + target=target_default, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + target_set = TargetMock(config={"batch_size_rows": 100000}) + sink_set = BatchSinkMock( + target=target_set, + stream_name="bar", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_default.stream_name == "foo" + assert sink_default._batch_size_rows is None + assert sink_default.batch_size_rows is None + assert sink_default.max_size == 10000 + assert sink_set.stream_name == "bar" + assert sink_set._batch_size_rows == 100000 + assert sink_set.batch_size_rows == 100000 + assert sink_set.max_size == 100000 + + +def test_batch_wait_limit_seconds(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_set = TargetMock(config={"batch_wait_limit_seconds": 1}) + sink_set = BatchSinkMock( + target=target_set, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_set.stream_name == "foo" + assert sink_set.batch_wait_limit_seconds == 1 + assert sink_set.sink_timer is not None + assert sink_set.sink_timer.start_time is not None + assert sink_set.batch_size_rows is None + assert sink_set.max_size == 10000 + assert sink_set.is_too_old() is False + time.sleep(1.1) + assert sink_set.is_too_old() is True + assert sink_set.sink_timer.start_time > 1.1 + assert sink_set.sink_timer.stop_time is None + assert sink_set.sink_timer.lap_time is None + assert sink_set.sink_timer.perf_diff == 0.0 + sink_set.lap_manager() + assert sink_set.sink_timer.start_time > 0.0 + assert sink_set.sink_timer.stop_time is None + assert sink_set.sink_timer.lap_time > 1.0 + assert sink_set.sink_timer.perf_diff < 0.0 + + +def test_batch_dynamic_management(): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target_set = TargetMock( + config={ + "batch_size_rows": 100000, + "batch_wait_limit_seconds": 2, + "batch_dynamic_management": True, + }, + ) + sink_set = BatchSinkMock( + target=target_set, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + assert sink_set.stream_name == "foo" + assert sink_set.batch_dynamic_management is True + assert sink_set.batch_wait_limit_seconds == 2 + assert sink_set.sink_timer is not None + assert sink_set.sink_timer.start_time is not None + assert sink_set.batch_size_rows == 100000 + assert sink_set.max_size == 100000 + assert sink_set.sink_timer.sink_max_size == 100 + time.sleep(1.1) + sink_set.lap_manager() + assert sink_set.sink_timer.start_time > 0.0 + assert sink_set.sink_timer.stop_time is None + assert sink_set.sink_timer.lap_time > 1.0 + assert sink_set.sink_timer.lap_time < 2.0 + assert sink_set.sink_timer.perf_diff > sink_set.sink_timer.perf_diff_allowed_max + assert sink_set.sink_timer.sink_max_size == 200 + time.sleep(3.1) + sink_set.lap_manager() + assert sink_set.sink_timer.start_time > 0.0 + assert sink_set.sink_timer.stop_time is None + assert sink_set.sink_timer.lap_time > 1.0 + assert sink_set.sink_timer.perf_diff < sink_set.sink_timer.perf_diff_allowed_min + assert sink_set.sink_timer.sink_max_size == 190 + + +@pytest.mark.parametrize( + "config,rows,wait_time,expected", + [ + pytest.param( + {}, + 10001, + None, + True, + id="default_config_true", + ), + pytest.param( + {}, + 100, + None, + False, + id="default_config_false", + ), + pytest.param( + {"batch_size_rows": 100000}, + 100001, + None, + True, + id="batch_size_rows_only_true", + ), + pytest.param( + {"batch_size_rows": 100000}, + 1, + None, + False, + id="batch_size_rows_only_false", + ), + pytest.param( + {"batch_wait_limit_seconds": 1}, + None, + 2, + True, + id="wait_limit_seconds_only_true", + ), + pytest.param( + {"batch_wait_limit_seconds": 2}, + None, + 1, + False, + id="wait_limit_seconds_only_false", + ), + pytest.param( + {"batch_size_rows": 100000, "batch_wait_limit_seconds": 2}, + 100001, + 1, + True, + id="batch_rows_and_wait_limit_seconds_true_rows", + ), + pytest.param( + {"batch_size_rows": 100000, "batch_wait_limit_seconds": 1}, + 100, + 2, + True, + id="batch_rows_and_wait_limit_seconds_true_wait", + ), + pytest.param( + {"batch_size_rows": 100000, "batch_wait_limit_seconds": 2}, + 100, + 1, + False, + id="batch_rows_and_wait_limit_seconds_false", + ), + ], +) +def test_is_full( + config: str, + rows: int | None, + wait_time: int | None, + expected: bool, +): + input_schema_1 = { + "properties": { + "id": { + "type": ["string", "null"], + }, + "col_ts": { + "format": "date-time", + "type": ["string", "null"], + }, + }, + } + key_properties = [] + target = TargetMock(config=config) + sink = BatchSinkMock( + target=target, + stream_name="foo", + schema=input_schema_1, + key_properties=key_properties, + ) + if rows is not None: + sink._batch_records_read = rows + if wait_time is not None: + time.sleep(wait_time) + assert sink.is_full == expected