Skip to content

Commit

Permalink
Fast path for unique insertion
Browse files Browse the repository at this point in the history
Updates the Python client to be compatible with the fast unique
insertion added to the main River in [1] which uses a unique index
instead of advisory lock + fetch as long as uniqueness is constrained to
the default set of unique job states.

Also, not so much by design originally, but upgrade to sqlc v1.27.0 as
we have going in other River projects.

[1] riverqueue/river#451
  • Loading branch information
brandur committed Aug 31, 2024
1 parent 6f06010 commit e07e966
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:

env:
BIN_PATH: /home/runner/bin
SQLC_VERSION: 1.26.0
SQLC_VERSION: 1.27.0

steps:
- name: Checkout
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #XXX](https://github.com/riverqueue/riverqueue-python/pull/XXX).

## [0.6.3] - 2024-07-08

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ $ rye lint
## Run type check (Mypy)

```shell
$ make typecheck
$ make type-check
```

## Format code
Expand Down
141 changes: 83 additions & 58 deletions src/riverqueue/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from hashlib import sha256
import re
from typing import (
Awaitable,
Optional,
Protocol,
Tuple,
List,
Callable,
cast,
runtime_checkable,
)
Expand Down Expand Up @@ -206,12 +205,7 @@ def to_json(self) -> str:
insert_opts = InsertOpts()
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)

async def insert():
return InsertResult(await exec.job_insert(insert_params))

return await self.__check_unique_job(
exec, insert_params, unique_opts, insert
)
return await self.__insert_job_with_unique(exec, insert_params, unique_opts)

async def insert_tx(
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
Expand Down Expand Up @@ -253,10 +247,7 @@ async def insert_tx(
insert_opts = InsertOpts()
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)

async def insert():
return InsertResult(await exec.job_insert(insert_params))

return await self.__check_unique_job(exec, insert_params, unique_opts, insert)
return await self.__insert_job_with_unique(exec, insert_params, unique_opts)

async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
"""
Expand Down Expand Up @@ -327,33 +318,50 @@ async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> in
exec = self.driver.unwrap_executor(tx)
return await exec.job_insert_many(_make_driver_insert_params_many(args))

async def __check_unique_job(
async def __insert_job_with_unique(
self,
exec: AsyncExecutorProtocol,
insert_params: JobInsertParams,
unique_opts: Optional[UniqueOpts],
insert_func: Callable[[], Awaitable[InsertResult]],
) -> InsertResult:
"""
Checks for an existing unique job and runs `insert_func` if one is
present.
Inserts a job, accounting for unique jobs whose insertion may be skipped
if an equivalent job is already present.
"""

get_params, lock_key = _build_unique_get_params_and_lock_key(
self.advisory_lock_prefix, insert_params, unique_opts
get_params, unique_key = _build_unique_get_params_and_unique_key(
insert_params, unique_opts
)

if not get_params:
return await insert_func()
if not get_params or not unique_opts:
return InsertResult(await exec.job_insert(insert_params))

# fast path
if (
not unique_opts.by_state
or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT
):
job, unique_skipped_as_duplicate = await exec.job_insert_unique(
insert_params, sha256(unique_key.encode("utf-8")).digest()
)
return InsertResult(
job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate
)

async with exec.transaction():
await exec.advisory_lock(lock_key)
lock_key = "unique_key"
lock_key += "kind=#{insert_params.kind}"
lock_key += unique_key

await exec.advisory_lock(
_hash_lock_key(self.advisory_lock_prefix, lock_key)
)

existing_job = await exec.job_get_by_kind_and_unique_properties(get_params)
if existing_job:
return InsertResult(existing_job, unique_skipped_as_duplicated=True)

return await insert_func()
return InsertResult(await exec.job_insert(insert_params))


class Client:
Expand Down Expand Up @@ -451,10 +459,7 @@ def to_json(self) -> str:
insert_opts = InsertOpts()
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)

def insert():
return InsertResult(exec.job_insert(insert_params))

return self.__check_unique_job(exec, insert_params, unique_opts, insert)
return self.__insert_job_with_unique(exec, insert_params, unique_opts)

def insert_tx(
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
Expand Down Expand Up @@ -496,10 +501,7 @@ def insert_tx(
insert_opts = InsertOpts()
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)

def insert():
return InsertResult(exec.job_insert(insert_params))

return self.__check_unique_job(exec, insert_params, unique_opts, insert)
return self.__insert_job_with_unique(exec, insert_params, unique_opts)

def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
"""
Expand Down Expand Up @@ -570,58 +572,72 @@ def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
exec = self.driver.unwrap_executor(tx)
return exec.job_insert_many(_make_driver_insert_params_many(args))

def __check_unique_job(
def __insert_job_with_unique(
self,
exec: ExecutorProtocol,
insert_params: JobInsertParams,
unique_opts: Optional[UniqueOpts],
insert_func: Callable[[], InsertResult],
) -> InsertResult:
"""
Checks for an existing unique job and runs `insert_func` if one is
present.
Inserts a job, accounting for unique jobs whose insertion may be skipped
if an equivalent job is already present.
"""

get_params, lock_key = _build_unique_get_params_and_lock_key(
self.advisory_lock_prefix, insert_params, unique_opts
get_params, unique_key = _build_unique_get_params_and_unique_key(
insert_params, unique_opts
)

if not get_params:
return insert_func()
if not get_params or not unique_opts:
return InsertResult(exec.job_insert(insert_params))

# fast path
if (
not unique_opts.by_state
or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT
):
job, unique_skipped_as_duplicate = exec.job_insert_unique(
insert_params, sha256(unique_key.encode("utf-8")).digest()
)
return InsertResult(
job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate
)

with exec.transaction():
exec.advisory_lock(lock_key)
lock_key = "unique_key"
lock_key += "kind=#{insert_params.kind}"
lock_key += unique_key

exec.advisory_lock(_hash_lock_key(self.advisory_lock_prefix, lock_key))

existing_job = exec.job_get_by_kind_and_unique_properties(get_params)
if existing_job:
return InsertResult(existing_job, unique_skipped_as_duplicated=True)

return insert_func()
return InsertResult(exec.job_insert(insert_params))


def _build_unique_get_params_and_lock_key(
advisory_lock_prefix: Optional[int],
def _build_unique_get_params_and_unique_key(
insert_params: JobInsertParams,
unique_opts: Optional[UniqueOpts],
) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], int]:
) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], str]:
"""
Builds driver get params and an advisory lock key from insert params and
unique options for use during a unique job insertion.
"""

if unique_opts is None:
return (None, 0)
return (None, "")

any_unique_opts = False
get_params = JobGetByKindAndUniquePropertiesParam(kind=insert_params.kind)

lock_str = f"unique_keykind={insert_params.kind}"
unique_key = ""

if unique_opts.by_args:
any_unique_opts = True
get_params.by_args = True
get_params.args = insert_params.args
lock_str += f"&args={insert_params.args}"
unique_key += f"&args={insert_params.args}"

if unique_opts.by_period:
lower_period_bound = _truncate_time(
Expand All @@ -634,33 +650,27 @@ def _build_unique_get_params_and_lock_key(
lower_period_bound,
lower_period_bound + timedelta(seconds=unique_opts.by_period),
]
lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}"
unique_key += f"&period={lower_period_bound.strftime('%FT%TZ')}"

if unique_opts.by_queue:
any_unique_opts = True
get_params.by_queue = True
get_params.queue = insert_params.queue
lock_str += f"&queue={insert_params.queue}"
unique_key += f"&queue={insert_params.queue}"

if unique_opts.by_state:
any_unique_opts = True
get_params.by_state = True
get_params.state = cast(list[str], unique_opts.by_state)
lock_str += f"&state={','.join(unique_opts.by_state)}"
unique_key += f"&state={','.join(unique_opts.by_state)}"
else:
get_params.state = UNIQUE_STATES_DEFAULT
lock_str += f"&state={','.join(UNIQUE_STATES_DEFAULT)}"
unique_key += f"&state={','.join(UNIQUE_STATES_DEFAULT)}"

if not any_unique_opts:
return (None, 0)
return (None, "")

if advisory_lock_prefix is None:
lock_key = fnv1_hash(lock_str.encode("utf-8"), 64)
else:
prefix = advisory_lock_prefix
lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32)

return (get_params, _uint64_to_int64(lock_key))
return (get_params, unique_key)


def _check_advisory_lock_prefix_bounds(
Expand All @@ -678,6 +688,21 @@ def _check_advisory_lock_prefix_bounds(
return advisory_lock_prefix


def _hash_lock_key(advisory_lock_prefix: Optional[int], lock_key: str) -> int:
"""
Generates an FNV-1 hash from the given lock key string suitable for use with
a PG advisory lock while checking for the existence of a unique job.
"""

if advisory_lock_prefix is None:
lock_key_hash = fnv1_hash(lock_key.encode("utf-8"), 64)
else:
prefix = advisory_lock_prefix
lock_key_hash = (prefix << 32) | fnv1_hash(lock_key.encode("utf-8"), 32)

return _uint64_to_int64(lock_key_hash)


def _make_driver_insert_params(
args: JobArgs,
insert_opts: InsertOpts,
Expand Down
10 changes: 10 additions & 0 deletions src/riverqueue/driver/driver_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ async def job_insert(self, insert_params: JobInsertParams) -> Job:
async def job_insert_many(self, all_params) -> int:
pass

async def job_insert_unique(
self, insert_params: JobInsertParams, unique_key: bytes
) -> tuple[Job, bool]:
pass

async def job_get_by_kind_and_unique_properties(
self, get_params: JobGetByKindAndUniquePropertiesParam
) -> Optional[Job]:
Expand Down Expand Up @@ -137,6 +142,11 @@ def job_insert(self, insert_params: JobInsertParams) -> Job:
def job_insert_many(self, all_params) -> int:
pass

def job_insert_unique(
self, insert_params: JobInsertParams, unique_key: bytes
) -> tuple[Job, bool]:
pass

def job_get_by_kind_and_unique_properties(
self, get_params: JobGetByKindAndUniquePropertiesParam
) -> Optional[Job]:
Expand Down
3 changes: 2 additions & 1 deletion src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Code generated by sqlc. DO NOT EDIT.
# versions:
# sqlc v1.26.0
# sqlc v1.27.0
import dataclasses
import datetime
import enum
Expand Down Expand Up @@ -36,3 +36,4 @@ class RiverJob:
state: RiverJobState
scheduled_at: datetime.datetime
tags: List[str]
unique_key: Optional[memoryview]
2 changes: 1 addition & 1 deletion src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Code generated by sqlc. DO NOT EDIT.
# versions:
# sqlc v1.26.0
# sqlc v1.27.0
# source: pg_misc.sql
from typing import Any

Expand Down
Loading

0 comments on commit e07e966

Please sign in to comment.