Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Configurable batch size and max wait limit for targets #1876

Open
wants to merge 97 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 78 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
11c56e3
add _perftimer file to helpers folder
BuzzCutNorman Jul 24, 2023
550ecff
add sink_timer, batch_size_rows, and batch_wait_limit_seconds
BuzzCutNorman Jul 25, 2023
68c3484
calcuation fix
BuzzCutNorman Jul 25, 2023
66f750e
correct class hint for sink_timer
BuzzCutNorman Jul 25, 2023
62c2c32
mypy fixes round 1
BuzzCutNorman Jul 25, 2023
541a46d
mypy fixes round 2
BuzzCutNorman Jul 25, 2023
028e3d9
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jul 25, 2023
4309c8e
Merge branch 'main' of https://github.com/meltano/sdk into 1626-confi…
BuzzCutNorman Aug 4, 2023
dc2a2d8
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 8, 2023
01f4539
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 8, 2023
8a591c1
simple tests for PerfTimer and BatchPerfTimer
BuzzCutNorman Aug 8, 2023
eb6c9e7
clear _stop_time after lap is calculated
BuzzCutNorman Aug 8, 2023
f6bbf0c
wider variation when testing perftimer lap_time
BuzzCutNorman Aug 8, 2023
74b1653
Apply suggestions from code review
BuzzCutNorman Aug 10, 2023
bb0c8fb
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 14, 2023
c151f84
comment out batch timer additions
BuzzCutNorman Aug 14, 2023
49c014c
add BatchPerfTimer and supporting properties
BuzzCutNorman Aug 14, 2023
a425774
add BatchTimer start and finish to drain_one
BuzzCutNorman Aug 14, 2023
379f814
remove commented out code
BuzzCutNorman Aug 14, 2023
0303f25
change perf_diff_allowed_max to .25 better
BuzzCutNorman Aug 14, 2023
1fa9752
update test match perf_diff_allowed_max change
BuzzCutNorman Aug 14, 2023
d2db4fb
added tests for batch_size_rows and max_size
BuzzCutNorman Aug 14, 2023
99a9fb5
add batch_wait_limit_seconds tests
BuzzCutNorman Aug 15, 2023
059c392
added _lap_manager
BuzzCutNorman Aug 15, 2023
7ac5a6c
calling _lap_manager in drain_one
BuzzCutNorman Aug 15, 2023
0420db8
mypy fix: moved sink_timer check into _lap_manager
BuzzCutNorman Aug 15, 2023
4b5b3c6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 15, 2023
1aec608
Apply suggestions from code review
BuzzCutNorman Aug 15, 2023
b5e93a6
added batch_wait_limit_seconds value test
BuzzCutNorman Aug 15, 2023
57b7f37
add tests for new settings
BuzzCutNorman Aug 15, 2023
f5874c9
BATCH_SIZE_ROWS_CONFIG added
BuzzCutNorman Aug 15, 2023
3b73e64
BATCH_WAIT_LIMIT_SECONDS_CONFIG added
BuzzCutNorman Aug 15, 2023
b48f3d0
move logging and update formatting.
BuzzCutNorman Aug 16, 2023
6c465f7
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 16, 2023
3cee381
add pertimer on_the_clock tests
BuzzCutNorman Aug 17, 2023
7025280
add batchperftimer is_too_old tests
BuzzCutNorman Aug 17, 2023
8bbb346
add PerfTimer.on_the_clock()
BuzzCutNorman Aug 17, 2023
fd8f171
add BatchPerfTimer.is_too_old
BuzzCutNorman Aug 17, 2023
2cfa516
add Sink.is_too_old tests
BuzzCutNorman Aug 17, 2023
da2808a
added property Sink.is_too_old
BuzzCutNorman Aug 17, 2023
01962e0
remove is_too_old tests
BuzzCutNorman Aug 18, 2023
c89fd74
move is_too_old to Sink
BuzzCutNorman Aug 18, 2023
2f51fd4
Add is_too_old drain to _process_record_message
BuzzCutNorman Aug 18, 2023
bf582e0
fix AttributeError NoneType has no start_time
BuzzCutNorman Aug 18, 2023
01705c6
remove counter_based_max_size and logging
BuzzCutNorman Aug 18, 2023
29ae3de
clean up test_batch_wait_limit_seconds
BuzzCutNorman Aug 18, 2023
1b5cc6a
update test for batch_dynamic_managment
BuzzCutNorman Aug 18, 2023
3d19a03
BatchPerfTimer take in a max and internally sink_max_size internally
BuzzCutNorman Aug 18, 2023
50d00c2
add batch_dynamic_management to Sink
BuzzCutNorman Aug 18, 2023
fb5d6f2
updated perf_diff tests to utilize allowed min and max
BuzzCutNorman Aug 18, 2023
e574003
add call to counter_based_max_size and logging to _lap_manager
BuzzCutNorman Aug 18, 2023
7f48881
expand is_full to utilize sink_timer.sink_max_size
BuzzCutNorman Aug 18, 2023
7a54c69
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 22, 2023
1261c04
mypy fixes
BuzzCutNorman Aug 22, 2023
0436a74
added defualt for when only batch management is set
BuzzCutNorman Aug 22, 2023
b92d8c1
add about_info test for batch_dynamic_management
BuzzCutNorman Aug 23, 2023
f78cc0a
added batch_dynamic_management to capabilities
BuzzCutNorman Aug 23, 2023
ec57105
add batch_dynamic_management to config if missing
BuzzCutNorman Aug 23, 2023
68a520c
add and or update tests for updated is_full utilizing _drain_function
BuzzCutNorman Aug 24, 2023
b36acf8
update is_full to utilize _drain_function and set_drain_function and …
BuzzCutNorman Aug 24, 2023
933fa21
remove is_too_old check from _process_record_message and update is_fu…
BuzzCutNorman Aug 24, 2023
c342e44
mypy fixes
BuzzCutNorman Aug 24, 2023
9ce87da
update docs implementation index to include target_batch_full
BuzzCutNorman Aug 24, 2023
af84ecd
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 25, 2023
31f6ef5
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 28, 2023
9dae576
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 28, 2023
b2697ee
added documentation for updated is_full and associated new functions
BuzzCutNorman Aug 28, 2023
e874f73
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 28, 2023
f36a79f
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 29, 2023
2952513
edits to target batch full documentation
BuzzCutNorman Aug 29, 2023
c79cf64
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 29, 2023
4f75636
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 29, 2023
879995a
removed cached_property decorator from max_size
BuzzCutNorman Aug 29, 2023
1c81669
Edits and formatting changes.
BuzzCutNorman Aug 29, 2023
7a1df1d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 31, 2023
c804b76
further edits and formatting
BuzzCutNorman Sep 5, 2023
648656b
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Sep 5, 2023
dc8481d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Sep 5, 2023
ffabffa
Apply suggestions from code review
BuzzCutNorman Sep 5, 2023
50e82ae
Apply suggestion from code review
BuzzCutNorman Sep 6, 2023
f977ca2
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Oct 6, 2023
b2181e7
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Oct 16, 2023
484c7cb
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Oct 24, 2023
2a870af
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Dec 5, 2023
fd787f2
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Dec 5, 2023
e159ae6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Dec 8, 2023
7c4756c
added return type annotations to start_time, stop_time, and lap_time
BuzzCutNorman Dec 8, 2023
e50f1a2
resolve mypy unsupported operand types error
BuzzCutNorman Dec 8, 2023
f4c2c38
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 9, 2024
5c1b5ea
change private Sink._lap_manager() to public Sink.lap_manager()
BuzzCutNorman Jan 9, 2024
39dd6e7
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Jan 11, 2024
ab2782d
added settings to default_settings in test_target_about_info
BuzzCutNorman Jan 12, 2024
d06481d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 22, 2024
295802f
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 22, 2024
17f143c
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 29, 2024
a0e34c9
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Feb 2, 2024
49a64e6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/implementation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ metrics
logging
state
at_least_once
target_batch_full
```

## How to use the implementation reference material
Expand Down
58 changes: 58 additions & 0 deletions docs/implementation/target_batch_full.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Target: Batch Full

The SDK automatically handles creating and releasing properly sized batches.

## The Basics

A Tap sends records messages to the Target. Those messages are grabbed, decoded, and sent to the Target method `_process_record_message`. After each record is processed one question is aksed.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

1. Is the batch full, `Sink.is_full`
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

If the answer is `True` then the records currently held in the `Sink._pending_batch` dict are drained. The drain process is managed by the `Target.drain_one` method. Records get written, counters get reset, and on to filling the next batch with records.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

## How a Batch is Measured and Full is Defined
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

You need to know three things to determine if somthing is full
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

1. The unit of measure
2. The level at which an object is determined full
3. The current mesurment of the object
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

The units of measure used with `Sink._pending_batch` are **rows** and **time** in **seconds**. The full level marks come from the Meltano Target configuration options of `batch_size_rows`, `batch_wait_limit_seconds`. If the configuration options are not present, the defined constants of `MAX_SIZE_DEFAULT` and `WAIT_LIMIT_SECONDS_DEFAULT` are used. How is `Sink._pending_batch` measured? To measure **rows** the count of records read from the Tap is used. The current row count is available via the property `Sink.current_size`. To measure **seconds** a batch timer is utilized.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

There are four “is full” scenarios and each one has a function that looks at the batch and returns `True` if it is full or `False` if the batch can take more records.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

1. Row limit has been reached. `Sink.is_full_rows`
2. Wait limit is seconds has been reached. `Sink.is_too_old`
3. Row limit or Wait limit is seconds has been reached. `Sink.is_full_rows_and_too_old`
4. Row limit managed by the batch timer has been reached. `Sink.is_full_dynamic`

Based on the Meltano Target configuration option(s) given the function `set_drain_function` places the appropriate function into the internal variable`_batch_drain_fucntion`. This variable is what is run when the attribute `Sink.is_full` is called at the end of each Target method `_process_record_message` cycle. When `Sink.is_full` is checked and returns `True` the message:
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
```
"Target sink for [Stream Name] is full. Current size is [Current Size]. Draining...",
```
is logged to the console and `Target.drain_one(Sink)` is called.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

## Explanation of the Four "is full" Scenarios

### Rows limit has been reached. `Sink.is_full_rows`
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

To know if something is full you need to know how much it currently holds and at what point to consider it full. Sinks have the property `current_size` which gives you the number of records read and placed into the pending batch. The `current_size` gives us how much the current batch is holding. Now that we know the `current_size` we need to determine if the current size is considered full. The Sink property of `max_size` gives us the integer that defines the full mark. The property `max_size` returns the Sink’s internal variable `_batch_size_rows` if not `None` or the `DEFAULT_MAX_SIZE` constant which is `10000`.

Both the `Sink.current_size` and `Sink.max_size` are used to calculate the `bool` value returned by the property `Sink.is_full`. If `Sink.current_size` is greater than or equal to `Sink.max_size` the batch is full and `Sink.is_full` would return `True`. When `Sink.is_full` is checked at the end of `Target._process_record_message` and returns `True` the message "Target sink for 'stream_name' is full. Current size is 'current_size'Draining..." is logged to the console and `Target.drain_one(Sink)` is called.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

### Wait limit in seconds has been reached. `Sink.is_too_old`
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

To know if something is too old you need to know how much time has passed and how much time needs to pass to be considered old. When the Meltano Target configuration option of `batch_wait_limit_seconds` is present and set the internal variable `_sink_timer` is initialized with an instance of a `BatchPerfTimer`. The `BatchPerfTimer` Class is a batch specific stop watch. The timer is accessible via the property `Sink.sink_timer`. Right after the timer is initialized the stop watch is started `Sink.sink_timer.start()`. We can see how much time has passed by running `Sink.sink_timer.on_the_clock()`. The property `Sink.batch_wait_limit_seconds` hold how much time needs to pass to be considered old.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

Both the `Sink.sink_timer.on_the_clock()` and `Sink.batch_wait_limit_seconds` are used to calculate the `bool` value returned by the function`Sink.is_too_old`. If `Sink.sink_timer.on_the_clock()` is greater than or equal to `Sink.batch_wait_limit_seconds` the batch is full and `Sink.is_full` would return `True`. When `Sink.is_full` is checked at the end of `Target._process_record_message` and returns `True` the message ""Target sink for 'stream_name' is to full. Current size is 'current_size'. Draining..."" is logged to the console and `Target.drain_one(Sink)` is called. The `Target.drain_one(Sink)` method calls `Sink._lap_manager` which stops the timer, calculates the lap time, and starts the timer again.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

### Rows or Wait limit has been reached. `Sink.is_full_rows_and_too_old`
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

The previously described `is_full_rows` and `is_too_old` functions are run and their results are held in a tuple. If `True` is present in the tuple the function returns `True` so `is_full` will return `True`. When `Sink.is_full` is checked at the end of `Target._process_record_message` and returns `True` Then the message "Target sink for 'stream_name' is full. Current size is 'current_size' Draining..." is logged to the console and `Target.drain_one()` is called. The `Target.drain_one(Sink)` method calls `Sink._lap_manager` which stops the timer, calculates the lap time, and starts the timer again.
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

### Rows limit managed by `Sink.sink_timer.counter_based_max_size` has been reached. `Sink.is_full_dynamic`

When the Meltano Target configuration option `batch_dynamic_management` is set to `True` you are asking the `Sink.sink_timer` to find the maximum rows is full mark that keeps the time to fill a batch with records and write those records to the Target's target within the time in seconds given.

The `Sink.sink_timer` is passed the given `batch_size_rows` or the `DEFAULT_MAX_SIZE` constant which is `10000` if it is `None` and is also passed the given `batch_wait_limit_seconds` if present or the `WAIT_LIMIT_SECONDS_DEFAULT` constant which is `30` if it is `None`. Internally the `rows` passed turns into `Sink.sink_timer.SINK_MAX_SIZE_CEILING` which is the max size a batch can reach. The `time` in `seconds` passed turns into `Sink.sink_timer.max_perf_counter` which is the time in seconds a full cycle should take. The attribute `Sink.sink_timer.sink_max_size` starts at a predefined size of `100`. During the `Target.drain_one(Sink)` process `Sink._lap_manager` is called and the timer method `counter_based_max_size` runs and checks if `Sink.sink_timer.perf_diff`, which is `max_perf_counter` - `lap_time`, is greater than `Sink.sink_timer.perf_diff_allowed_max` or less than `Sink.sink_timer.perf_diff_allowed_min`. If `Sink.sink_timer.perf_diff` is greater than `Sink.sink_timer.perf_diff_allowed_max` the `Sink.sink_timer.sink_max_size` is increased as long as the `Sink.sink_timer.sink_max_size` is less than `Sink.sink_timer.SINK_MAX_SIZE_CEILING`. If `Sink.sink_timer.perf_diff` is less than `Sink.sink_timer.perf_diff_allowed_min` the `Sink.sink_timer.sink_max_size` is reduced. If the `Sink.sink_timer.perf_diff` is between `Sink.sink_timer.perf_diff_allowed_max` and `Sink.sink_timer.perf_diff_allowed_min` no correction to `Sink.sink_timer.sink_max_size` is made since the optimal rows size has been reached. This process is repeated when each `Sink` is initialized and starts processing records.
Comment on lines +57 to +59
Copy link
Collaborator

@edgarrmondragon edgarrmondragon Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still finding this a bit confusing. I think we want the max batch size to have a default value, and the max wait time to default to None so that in the default case, only the current batch size is checked.

Essentially:

flowchart TD
    A[Process next row] --> B{current_size ><br/>max_size?}:::q
    B -- Yes --> C[Drain]:::drain
    B -- No ---> D{is max_wait_time<br/>set?}:::q
    D -- No ---> A
    D -- Yes --> F{elapsed ><br/>max_wait_time?}:::q
    F -- No ---> A
    F -- Yes --> C
    C --> G[Reset counter<br/>and timer]
    G --> A
    classDef q fill:#FFE666
    classDef drain fill:#FF6680
Loading

Does it make sense? I'm sorry, I feel like I'm missing something 🙏

Copy link
Contributor Author

@BuzzCutNorman BuzzCutNorman Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagram makes sense. I will need to work through it again tomorrow. I think you feeling like you are missing something is a result of my poor writing. Below is the best flowchart for batch_dynamic_management I could make.

flowchart TD
    C1-->A2
    D2-->A3
    E3-->E2
    C3-->A4
    G4-->D3
    F2-->A1
    subgraph Target._process_record_message
    A1[Process next row] --> B1{is_full<p>_drain_function calls is_full_dynamic<p>sink.current_size > = sink_timer.sink_max_size}
    B1-->|True| C1[drain]
    B1-->|False| A1
   end
   subgraph Target.drain_one
   A2([start])-->B2[sink.start_drain]
   B2-->C2[sink.process_batch]
   C2-->D2[sink._lap_manager]
   D2~~~E2[sink.mark_drained]
   E2-->F2([end])
   end
   subgraph Sink.lap_manager
   A3([start])-->B3[sink_timer.stop]
   B3-->C3[sink_timer.counter_based_max_size]
   C3~~~D3[sink_timer.start]
   D3-->E3([end])  
   end
   subgraph sink_timer. counter_based_max_size
   A4([start])-->B4{perf_diff < self.perf_diff_allowed_min}
   B4-->|True| C4[correction = decrease amount]
   B4-->D4{perf_diff >= perf_diff_allowed_max<p>and<p>sink_max_size < SINK_MAX_SIZE_CEILING}
   D4-->|True| E4[correction = increase amount]
   D4-->F4[sink_max_size += correction]
   F4-->G4([end])
   end
Loading

Copy link
Contributor Author

@BuzzCutNorman BuzzCutNorman Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the default case,

This would be when batch_dynamic_management is set to True in the target config and batch_size_rows and batch_wait_limit_seconds are not preset in the target config?

or

Are you meaning default as in none of the three target config settings batch_size_rows, batch_wait_limit_seconds, and batch_dynamic_management are present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon When you have a free moment would you please provide me clarification on what you see the "default case" to be. 🙏😃

130 changes: 130 additions & 0 deletions singer_sdk/helpers/_perftimer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Performance timers which deal with dynamic timing events."""

from __future__ import annotations

import time


class PerfTimerError(Exception):
"""A custom exception used to report errors in use of BatchPerfTimer class."""


class PerfTimer:
"""A Basic Performance Timer Class."""

_start_time: float | None = None
_stop_time: float | None = None
_lap_time: float | None = None

@property
def start_time(self):
return self._start_time

@property
def stop_time(self):
return self._stop_time

@property
def lap_time(self):
return self._lap_time

def start(self) -> None:
"""Start the timer."""
if self._start_time is not None:
msg = "Timer is running. Use .stop() to stop it"
raise PerfTimerError(msg)

self._start_time = time.perf_counter()

def on_the_clock(self) -> float:
"""Give the time on the clock."""
if self._start_time is None:
msg = "Timer is not running. Use .start() to start it"
raise PerfTimerError(msg)

return (
time.perf_counter() - self._start_time
if self._start_time is not None
else 0.0
)

def stop(self) -> None:
"""Stop the timer, Stores the elapsed time, and reset."""
if self._start_time is None:
msg = "Timer is not running. Use .start() to start it"
raise PerfTimerError(msg)

self._stop_time = time.perf_counter()
self._lap_time = self._stop_time - self._start_time
self._start_time = None
self._stop_time = None


class BatchPerfTimer(PerfTimer):
"""The Performance Timer for Target bulk inserts."""

def __init__(
self,
max_size: int,
max_perf_counter: float,
) -> None:
self.SINK_MAX_SIZE_CEILING: int = max_size
self._max_perf_counter: float = max_perf_counter

SINK_MAX_SIZE_CEILING: int
"""The max size a bulk insert can be"""

_sink_max_size: int = 100
"""Hold the calculated batch size"""

@property
def sink_max_size(self) -> int:
"""The current MAX_SIZE_DEFAULT."""
return self._sink_max_size

@property
def max_perf_counter(self) -> float:
"""How many seconds can pass before a insert."""
return self._max_perf_counter

@property
def perf_diff_allowed_min(self) -> float:
"""The minimum negative variance allowed, 1/3 worse than wanted."""
return -1.0 * (self.max_perf_counter * 0.33)

@property
def perf_diff_allowed_max(self) -> float:
"""The maximum positive variance allowed, 1/4 better than wanted."""
return self.max_perf_counter * 0.25

@property
def perf_diff(self) -> float:
"""Difference between wanted elapsed time and actual elapsed time."""
diff = self.max_perf_counter - self.lap_time if self._lap_time else 0
return float(diff)

def counter_based_max_size(self) -> None: # noqa: C901
"""Calculate performance based batch size."""
correction = 0
if self.perf_diff < self.perf_diff_allowed_min:
if self.sink_max_size >= 15000: # noqa: PLR2004
correction = -5000
elif self.sink_max_size >= 10000: # noqa: PLR2004
correction = -1000
elif self.sink_max_size >= 1000: # noqa: PLR2004
correction = -100
elif self.sink_max_size > 10: # noqa: PLR2004
correction = -10
if (
self.perf_diff >= self.perf_diff_allowed_max
and self.sink_max_size < self.SINK_MAX_SIZE_CEILING
):
if self.sink_max_size >= 10000: # noqa: PLR2004
correction = 10000
elif self.sink_max_size >= 1000: # noqa: PLR2004
correction = 1000
elif self.sink_max_size >= 100: # noqa: PLR2004
correction = 100
elif self.sink_max_size >= 10: # noqa: PLR2004
correction = 10
self._sink_max_size += correction
24 changes: 24 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,30 @@
description="Add metadata to records.",
),
).to_dict()
BATCH_SIZE_ROWS_CONFIG = PropertiesList(
Property(
"batch_size_rows",
IntegerType,
description="Maximum number of rows in each batch.",
),
).to_dict()
BATCH_WAIT_LIMIT_SECONDS_CONFIG = PropertiesList(
Property(
"batch_wait_limit_seconds",
IntegerType,
description="Maximum time to elapse for a batch to fill, drain, and load.",
),
).to_dict()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you the other setting here?

BATCH_DYNAMIC_MANAGEMENT_CONFIG = PropertiesList(
Property(
"batch_dynamic_management",
BooleanType,
description=(
"Manages sink_max_size per stream to be"
"the largest size for the batch wait limit."
),
),
).to_dict()


class DeprecatedEnum(Enum):
Expand Down
Loading