Skip to content

Commit

Permalink
Improve unique key generation logic (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek authored Mar 12, 2024
1 parent 63a7172 commit 49265e8
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 21 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Changelog

## [1.6.1](../../releases/tag/v1.6.1) - Unreleased
## [1.7.0](../../releases/tag/v1.7.0) - Unreleased

### Added

- Add a new way of generating the `uniqueKey` field of the request, aligning it with the Crawlee.

### Fixed

- Improve error handling for `to_apify_request` serialization failures
- Scrapy's `Request.dont_filter` works.

## [1.6.0](../../releases/tag/v1.6.0) - 2024-02-23

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "apify"
version = "1.6.1"
version = "1.7.0"
description = "Apify SDK for Python"
readme = "README.md"
license = { text = "Apache Software License" }
Expand Down
122 changes: 117 additions & 5 deletions src/apify/_utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from __future__ import annotations

import asyncio
import base64
import builtins
import contextlib
import functools
import hashlib
import inspect
import json
import mimetypes
import os
import re
import sys
import time
from base64 import b64encode
from collections import OrderedDict
from collections.abc import MutableMapping
from datetime import datetime, timezone
from hashlib import sha256
from importlib import metadata
from logging import getLogger
from typing import (
Any,
Callable,
Expand All @@ -30,6 +31,7 @@
overload,
)
from typing import OrderedDict as OrderedDictType
from urllib.parse import parse_qsl, urlencode, urlparse

import aioshutil
import psutil
Expand Down Expand Up @@ -59,6 +61,7 @@
from apify.consts import REQUEST_ID_LENGTH, StorageTypes

T = TypeVar('T')
logger = getLogger(__name__)


def get_system_info() -> dict:
Expand Down Expand Up @@ -292,9 +295,8 @@ def maybe_parse_body(body: bytes, content_type: str) -> Any:

def unique_key_to_request_id(unique_key: str) -> str:
"""Generate request ID based on unique key in a deterministic way."""
id = re.sub(r'(\+|\/|=)', '', base64.b64encode(hashlib.sha256(unique_key.encode('utf-8')).digest()).decode('utf-8')) # noqa: A001

return id[:REQUEST_ID_LENGTH] if len(id) > REQUEST_ID_LENGTH else id
request_id = re.sub(r'(\+|\/|=)', '', b64encode(sha256(unique_key.encode('utf-8')).digest()).decode('utf-8'))
return request_id[:REQUEST_ID_LENGTH] if len(request_id) > REQUEST_ID_LENGTH else request_id


async def force_rename(src_dir: str, dst_dir: str) -> None:
Expand Down Expand Up @@ -410,3 +412,113 @@ def validate_single(field_value: Any, expected_type: type, required: bool, name:
PARSE_DATE_FIELDS_MAX_DEPTH = 3
PARSE_DATE_FIELDS_KEY_SUFFIX = 'At'
ListOrDictOrAny = TypeVar('ListOrDictOrAny', list, dict, Any)


def compute_short_hash(data: bytes, *, length: int = 8) -> str:
"""Computes a hexadecimal SHA-256 hash of the provided data and returns a substring (prefix) of it.
Args:
data: The binary data to be hashed.
length: The length of the hash to be returned.
Returns:
A substring (prefix) of the hexadecimal hash of the data.
"""
hash_object = sha256(data)
return hash_object.hexdigest()[:length]


def normalize_url(url: str, *, keep_url_fragment: bool = False) -> str:
"""Normalizes a URL.
This function cleans and standardizes a URL by removing leading and trailing whitespaces,
converting the scheme and netloc to lower case, stripping unwanted tracking parameters
(specifically those beginning with 'utm_'), sorting the remaining query parameters alphabetically,
and optionally retaining the URL fragment. The goal is to ensure that URLs that are functionally
identical but differ in trivial ways (such as parameter order or casing) are treated as the same.
Args:
url: The URL to be normalized.
keep_url_fragment: Flag to determine whether the fragment part of the URL should be retained.
Returns:
A string containing the normalized URL.
"""
# Parse the URL
parsed_url = urlparse(url.strip())
search_params = dict(parse_qsl(parsed_url.query)) # Convert query to a dict

# Remove any 'utm_' parameters
search_params = {k: v for k, v in search_params.items() if not k.startswith('utm_')}

# Construct the new query string
sorted_keys = sorted(search_params.keys())
sorted_query = urlencode([(k, search_params[k]) for k in sorted_keys])

# Construct the final URL
new_url = (
parsed_url._replace(
query=sorted_query,
scheme=parsed_url.scheme,
netloc=parsed_url.netloc,
path=parsed_url.path.rstrip('/'),
)
.geturl()
.lower()
)

# Retain the URL fragment if required
if not keep_url_fragment:
new_url = new_url.split('#')[0]

return new_url


def compute_unique_key(
url: str,
method: str = 'GET',
payload: bytes | None = None,
*,
keep_url_fragment: bool = False,
use_extended_unique_key: bool = False,
) -> str:
"""Computes a unique key for caching & deduplication of requests.
This function computes a unique key by normalizing the provided URL and method.
If 'use_extended_unique_key' is True and a payload is provided, the payload is hashed and
included in the key. Otherwise, the unique key is just the normalized URL.
Args:
url: The request URL.
method: The HTTP method, defaults to 'GET'.
payload: The request payload, defaults to None.
keep_url_fragment: A flag indicating whether to keep the URL fragment, defaults to False.
use_extended_unique_key: A flag indicating whether to include a hashed payload in the key, defaults to False.
Returns:
A string representing the unique key for the request.
"""
# Normalize the URL and method.
try:
normalized_url = normalize_url(url, keep_url_fragment=keep_url_fragment)
except Exception as exc:
logger.warning(f'Failed to normalize URL: {exc}')
normalized_url = url

normalized_method = method.upper()

# Compute and return the extended unique key if required.
if use_extended_unique_key:
payload_hash = compute_short_hash(payload) if payload else ''
return f'{normalized_method}({payload_hash}):{normalized_url}'

# Log information if there is a non-GET request with a payload.
if normalized_method != 'GET' and payload:
logger.info(
f'We have encountered a {normalized_method} Request with a payload. This is fine. Just letting you know '
'that if your requests point to the same URL and differ only in method and payload, you should consider '
'using the "use_extended_unique_key" option.'
)

# Return the normalized URL as the unique key.
return normalized_url
19 changes: 16 additions & 3 deletions src/apify/scrapy/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
) from exc

from apify._crypto import crypto_random_object_id
from apify._utils import compute_unique_key
from apify.actor import Actor


Expand Down Expand Up @@ -45,25 +46,37 @@ def to_apify_request(scrapy_request: Request, spider: Spider) -> dict | None:
apify_request = {
'url': scrapy_request.url,
'method': scrapy_request.method,
'payload': scrapy_request.body,
'userData': scrapy_request.meta.get('userData', {}),
}

# Convert Scrapy's headers to a dictionary and store them in the apify_request
if isinstance(scrapy_request.headers, Headers):
apify_request['headers'] = dict(scrapy_request.headers.to_unicode_dict())
else:
Actor.log.warning(f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}')

# If the request was produced by the middleware (e.g. retry or redirect), we must compute the unique key here
if _is_request_produced_by_middleware(scrapy_request):
apify_request['uniqueKey'] = scrapy_request.url
apify_request['uniqueKey'] = compute_unique_key(
url=scrapy_request.url,
method=scrapy_request.method,
payload=scrapy_request.body,
use_extended_unique_key=True,
)
# Othwerwise, we can use the unique key (also the id) from the meta
else:
# Add 'id' to the apify_request
if scrapy_request.meta.get('apify_request_id'):
apify_request['id'] = scrapy_request.meta['apify_request_id']

# Add 'uniqueKey' to the apify_request
if scrapy_request.meta.get('apify_request_unique_key'):
apify_request['uniqueKey'] = scrapy_request.meta['apify_request_unique_key']

# If the request's dont_filter field is set, we must generate a random `uniqueKey` to avoid deduplication
# of the request in the Request Queue.
if scrapy_request.dont_filter:
apify_request['uniqueKey'] = crypto_random_object_id(8)

# Serialize the Scrapy Request and store it in the apify_request.
# - This process involves converting the Scrapy Request object into a dictionary, encoding it to base64,
# and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request.
Expand Down
7 changes: 6 additions & 1 deletion src/apify/scrapy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ def enqueue_request(self: ApifyScheduler, request: Request) -> bool:
raise TypeError('self._rq must be an instance of the RequestQueue class')

try:
result = nested_event_loop.run_until_complete(self._rq.add_request(apify_request))
result = nested_event_loop.run_until_complete(
self._rq.add_request(
apify_request,
use_extended_unique_key=True,
)
)
except BaseException:
traceback.print_exc()
raise
Expand Down
52 changes: 42 additions & 10 deletions src/apify/storages/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from apify_shared.utils import ignore_docs

from apify._crypto import crypto_random_object_id
from apify._utils import LRUCache, budget_ow, unique_key_to_request_id
from apify._utils import LRUCache, budget_ow, compute_unique_key, unique_key_to_request_id
from apify.consts import REQUEST_QUEUE_HEAD_MAX_LIMIT
from apify.log import logger
from apify.storages.base_storage import BaseStorage
Expand Down Expand Up @@ -140,15 +140,43 @@ def _get_storage_collection_client(
) -> RequestQueueCollectionClientAsync | RequestQueueCollectionClient:
return client.request_queues()

async def add_request(self: RequestQueue, request: dict, *, forefront: bool = False) -> dict:
"""Add a request to the queue.
async def add_request(
self: RequestQueue,
request: dict,
*,
forefront: bool = False,
keep_url_fragment: bool = False,
use_extended_unique_key: bool = False,
) -> dict:
"""Adds a request to the `RequestQueue` while managing deduplication and positioning within the queue.
The deduplication of requests relies on the `uniqueKey` field within the request dictionary. If `uniqueKey`
exists, it remains unchanged; if it does not, it is generated based on the request's `url`, `method`,
and `payload` fields. The generation of `uniqueKey` can be influenced by the `keep_url_fragment` and
`use_extended_unique_key` flags, which dictate whether to include the URL fragment and the request's method
and payload, respectively, in its computation.
The request can be added to the forefront (beginning) or the back of the queue based on the `forefront`
parameter. Information about the request's addition to the queue, including whether it was already present or
handled, is returned in an output dictionary.
Args:
request (dict): The request to add to the queue
forefront (bool, optional): Whether to add the request to the head or the end of the queue
request: The request object to be added to the queue. Must include at least the `url` key.
Optionaly it can include the `method`, `payload` and `uniqueKey` keys.
Returns:
dict: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
forefront: If True, adds the request to the forefront of the queue; otherwise, adds it to the end.
keep_url_fragment: Determines whether the URL fragment (the part of the URL after '#') should be retained
in the unique key computation.
use_extended_unique_key: Determines whether to use an extended unique key, incorporating the request's
method and payload into the unique key computation.
Returns: A dictionary containing information about the operation, including:
- `requestId` (str): The ID of the request.
- `uniqueKey` (str): The unique key associated with the request.
- `wasAlreadyPresent` (bool): Indicates whether the request was already in the queue.
- `wasAlreadyHandled` (bool): Indicates whether the request was already processed.
"""
budget_ow(
request,
Expand All @@ -159,9 +187,13 @@ async def add_request(self: RequestQueue, request: dict, *, forefront: bool = Fa
self._last_activity = datetime.now(timezone.utc)

if request.get('uniqueKey') is None:
# TODO: Check Request class in crawlee and replicate uniqueKey generation logic...
# https://github.com/apify/apify-sdk-python/issues/141
request['uniqueKey'] = request['url']
request['uniqueKey'] = compute_unique_key(
url=request['url'],
method=request.get('method', 'GET'),
payload=request.get('payload'),
keep_url_fragment=keep_url_fragment,
use_extended_unique_key=use_extended_unique_key,
)

cache_key = unique_key_to_request_id(request['uniqueKey'])
cached_info = self._requests_cache.get(cache_key)
Expand Down
Loading

0 comments on commit 49265e8

Please sign in to comment.