From 49265e88ed64966f1863c6b2eaed177cde3fbcf8 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 12 Mar 2024 16:20:09 +0100 Subject: [PATCH] Improve unique key generation logic (#193) --- CHANGELOG.md | 7 +- pyproject.toml | 2 +- src/apify/_utils.py | 122 ++++++++++++++++++++++++++-- src/apify/scrapy/requests.py | 19 ++++- src/apify/scrapy/scheduler.py | 7 +- src/apify/storages/request_queue.py | 52 +++++++++--- tests/unit/test_utils.py | 100 +++++++++++++++++++++++ 7 files changed, 288 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f36ffc96..fe313f71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,15 @@ # Changelog -## [1.6.1](../../releases/tag/v1.6.1) - Unreleased +## [1.7.0](../../releases/tag/v1.7.0) - Unreleased + +### Added + +- Add a new way of generating the `uniqueKey` field of the request, aligning it with the Crawlee. ### Fixed - Improve error handling for `to_apify_request` serialization failures +- Scrapy's `Request.dont_filter` works. ## [1.6.0](../../releases/tag/v1.6.0) - 2024-02-23 diff --git a/pyproject.toml b/pyproject.toml index 62f46776..c2aefaaa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "apify" -version = "1.6.1" +version = "1.7.0" description = "Apify SDK for Python" readme = "README.md" license = { text = "Apache Software License" } diff --git a/src/apify/_utils.py b/src/apify/_utils.py index d31c62ba..6ebcff20 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -1,11 +1,9 @@ from __future__ import annotations import asyncio -import base64 import builtins import contextlib import functools -import hashlib import inspect import json import mimetypes @@ -13,10 +11,13 @@ import re import sys import time +from base64 import b64encode from collections import OrderedDict from collections.abc import MutableMapping from datetime import datetime, timezone +from hashlib import sha256 from importlib import metadata +from logging import getLogger from typing import ( Any, Callable, @@ -30,6 +31,7 @@ overload, ) from typing import OrderedDict as OrderedDictType +from urllib.parse import parse_qsl, urlencode, urlparse import aioshutil import psutil @@ -59,6 +61,7 @@ from apify.consts import REQUEST_ID_LENGTH, StorageTypes T = TypeVar('T') +logger = getLogger(__name__) def get_system_info() -> dict: @@ -292,9 +295,8 @@ def maybe_parse_body(body: bytes, content_type: str) -> Any: def unique_key_to_request_id(unique_key: str) -> str: """Generate request ID based on unique key in a deterministic way.""" - id = re.sub(r'(\+|\/|=)', '', base64.b64encode(hashlib.sha256(unique_key.encode('utf-8')).digest()).decode('utf-8')) # noqa: A001 - - return id[:REQUEST_ID_LENGTH] if len(id) > REQUEST_ID_LENGTH else id + request_id = re.sub(r'(\+|\/|=)', '', b64encode(sha256(unique_key.encode('utf-8')).digest()).decode('utf-8')) + return request_id[:REQUEST_ID_LENGTH] if len(request_id) > REQUEST_ID_LENGTH else request_id async def force_rename(src_dir: str, dst_dir: str) -> None: @@ -410,3 +412,113 @@ def validate_single(field_value: Any, expected_type: type, required: bool, name: PARSE_DATE_FIELDS_MAX_DEPTH = 3 PARSE_DATE_FIELDS_KEY_SUFFIX = 'At' ListOrDictOrAny = TypeVar('ListOrDictOrAny', list, dict, Any) + + +def compute_short_hash(data: bytes, *, length: int = 8) -> str: + """Computes a hexadecimal SHA-256 hash of the provided data and returns a substring (prefix) of it. + + Args: + data: The binary data to be hashed. + length: The length of the hash to be returned. + + Returns: + A substring (prefix) of the hexadecimal hash of the data. + """ + hash_object = sha256(data) + return hash_object.hexdigest()[:length] + + +def normalize_url(url: str, *, keep_url_fragment: bool = False) -> str: + """Normalizes a URL. + + This function cleans and standardizes a URL by removing leading and trailing whitespaces, + converting the scheme and netloc to lower case, stripping unwanted tracking parameters + (specifically those beginning with 'utm_'), sorting the remaining query parameters alphabetically, + and optionally retaining the URL fragment. The goal is to ensure that URLs that are functionally + identical but differ in trivial ways (such as parameter order or casing) are treated as the same. + + Args: + url: The URL to be normalized. + keep_url_fragment: Flag to determine whether the fragment part of the URL should be retained. + + Returns: + A string containing the normalized URL. + """ + # Parse the URL + parsed_url = urlparse(url.strip()) + search_params = dict(parse_qsl(parsed_url.query)) # Convert query to a dict + + # Remove any 'utm_' parameters + search_params = {k: v for k, v in search_params.items() if not k.startswith('utm_')} + + # Construct the new query string + sorted_keys = sorted(search_params.keys()) + sorted_query = urlencode([(k, search_params[k]) for k in sorted_keys]) + + # Construct the final URL + new_url = ( + parsed_url._replace( + query=sorted_query, + scheme=parsed_url.scheme, + netloc=parsed_url.netloc, + path=parsed_url.path.rstrip('/'), + ) + .geturl() + .lower() + ) + + # Retain the URL fragment if required + if not keep_url_fragment: + new_url = new_url.split('#')[0] + + return new_url + + +def compute_unique_key( + url: str, + method: str = 'GET', + payload: bytes | None = None, + *, + keep_url_fragment: bool = False, + use_extended_unique_key: bool = False, +) -> str: + """Computes a unique key for caching & deduplication of requests. + + This function computes a unique key by normalizing the provided URL and method. + If 'use_extended_unique_key' is True and a payload is provided, the payload is hashed and + included in the key. Otherwise, the unique key is just the normalized URL. + + Args: + url: The request URL. + method: The HTTP method, defaults to 'GET'. + payload: The request payload, defaults to None. + keep_url_fragment: A flag indicating whether to keep the URL fragment, defaults to False. + use_extended_unique_key: A flag indicating whether to include a hashed payload in the key, defaults to False. + + Returns: + A string representing the unique key for the request. + """ + # Normalize the URL and method. + try: + normalized_url = normalize_url(url, keep_url_fragment=keep_url_fragment) + except Exception as exc: + logger.warning(f'Failed to normalize URL: {exc}') + normalized_url = url + + normalized_method = method.upper() + + # Compute and return the extended unique key if required. + if use_extended_unique_key: + payload_hash = compute_short_hash(payload) if payload else '' + return f'{normalized_method}({payload_hash}):{normalized_url}' + + # Log information if there is a non-GET request with a payload. + if normalized_method != 'GET' and payload: + logger.info( + f'We have encountered a {normalized_method} Request with a payload. This is fine. Just letting you know ' + 'that if your requests point to the same URL and differ only in method and payload, you should consider ' + 'using the "use_extended_unique_key" option.' + ) + + # Return the normalized URL as the unique key. + return normalized_url diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index fec91ad7..688c2bc4 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -13,6 +13,7 @@ ) from exc from apify._crypto import crypto_random_object_id +from apify._utils import compute_unique_key from apify.actor import Actor @@ -45,25 +46,37 @@ def to_apify_request(scrapy_request: Request, spider: Spider) -> dict | None: apify_request = { 'url': scrapy_request.url, 'method': scrapy_request.method, + 'payload': scrapy_request.body, 'userData': scrapy_request.meta.get('userData', {}), } + # Convert Scrapy's headers to a dictionary and store them in the apify_request if isinstance(scrapy_request.headers, Headers): apify_request['headers'] = dict(scrapy_request.headers.to_unicode_dict()) else: Actor.log.warning(f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}') + # If the request was produced by the middleware (e.g. retry or redirect), we must compute the unique key here if _is_request_produced_by_middleware(scrapy_request): - apify_request['uniqueKey'] = scrapy_request.url + apify_request['uniqueKey'] = compute_unique_key( + url=scrapy_request.url, + method=scrapy_request.method, + payload=scrapy_request.body, + use_extended_unique_key=True, + ) + # Othwerwise, we can use the unique key (also the id) from the meta else: - # Add 'id' to the apify_request if scrapy_request.meta.get('apify_request_id'): apify_request['id'] = scrapy_request.meta['apify_request_id'] - # Add 'uniqueKey' to the apify_request if scrapy_request.meta.get('apify_request_unique_key'): apify_request['uniqueKey'] = scrapy_request.meta['apify_request_unique_key'] + # If the request's dont_filter field is set, we must generate a random `uniqueKey` to avoid deduplication + # of the request in the Request Queue. + if scrapy_request.dont_filter: + apify_request['uniqueKey'] = crypto_random_object_id(8) + # Serialize the Scrapy Request and store it in the apify_request. # - This process involves converting the Scrapy Request object into a dictionary, encoding it to base64, # and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request. diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index 5679e33b..4b280d8a 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -95,7 +95,12 @@ def enqueue_request(self: ApifyScheduler, request: Request) -> bool: raise TypeError('self._rq must be an instance of the RequestQueue class') try: - result = nested_event_loop.run_until_complete(self._rq.add_request(apify_request)) + result = nested_event_loop.run_until_complete( + self._rq.add_request( + apify_request, + use_extended_unique_key=True, + ) + ) except BaseException: traceback.print_exc() raise diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index cb0ccfb8..79d64b5e 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -9,7 +9,7 @@ from apify_shared.utils import ignore_docs from apify._crypto import crypto_random_object_id -from apify._utils import LRUCache, budget_ow, unique_key_to_request_id +from apify._utils import LRUCache, budget_ow, compute_unique_key, unique_key_to_request_id from apify.consts import REQUEST_QUEUE_HEAD_MAX_LIMIT from apify.log import logger from apify.storages.base_storage import BaseStorage @@ -140,15 +140,43 @@ def _get_storage_collection_client( ) -> RequestQueueCollectionClientAsync | RequestQueueCollectionClient: return client.request_queues() - async def add_request(self: RequestQueue, request: dict, *, forefront: bool = False) -> dict: - """Add a request to the queue. + async def add_request( + self: RequestQueue, + request: dict, + *, + forefront: bool = False, + keep_url_fragment: bool = False, + use_extended_unique_key: bool = False, + ) -> dict: + """Adds a request to the `RequestQueue` while managing deduplication and positioning within the queue. + + The deduplication of requests relies on the `uniqueKey` field within the request dictionary. If `uniqueKey` + exists, it remains unchanged; if it does not, it is generated based on the request's `url`, `method`, + and `payload` fields. The generation of `uniqueKey` can be influenced by the `keep_url_fragment` and + `use_extended_unique_key` flags, which dictate whether to include the URL fragment and the request's method + and payload, respectively, in its computation. + + The request can be added to the forefront (beginning) or the back of the queue based on the `forefront` + parameter. Information about the request's addition to the queue, including whether it was already present or + handled, is returned in an output dictionary. Args: - request (dict): The request to add to the queue - forefront (bool, optional): Whether to add the request to the head or the end of the queue + request: The request object to be added to the queue. Must include at least the `url` key. + Optionaly it can include the `method`, `payload` and `uniqueKey` keys. - Returns: - dict: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`. + forefront: If True, adds the request to the forefront of the queue; otherwise, adds it to the end. + + keep_url_fragment: Determines whether the URL fragment (the part of the URL after '#') should be retained + in the unique key computation. + + use_extended_unique_key: Determines whether to use an extended unique key, incorporating the request's + method and payload into the unique key computation. + + Returns: A dictionary containing information about the operation, including: + - `requestId` (str): The ID of the request. + - `uniqueKey` (str): The unique key associated with the request. + - `wasAlreadyPresent` (bool): Indicates whether the request was already in the queue. + - `wasAlreadyHandled` (bool): Indicates whether the request was already processed. """ budget_ow( request, @@ -159,9 +187,13 @@ async def add_request(self: RequestQueue, request: dict, *, forefront: bool = Fa self._last_activity = datetime.now(timezone.utc) if request.get('uniqueKey') is None: - # TODO: Check Request class in crawlee and replicate uniqueKey generation logic... - # https://github.com/apify/apify-sdk-python/issues/141 - request['uniqueKey'] = request['url'] + request['uniqueKey'] = compute_unique_key( + url=request['url'], + method=request.get('method', 'GET'), + payload=request.get('payload'), + keep_url_fragment=keep_url_fragment, + use_extended_unique_key=use_extended_unique_key, + ) cache_key = unique_key_to_request_id(request['uniqueKey']) cached_info = self._requests_cache.get(cache_key) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ab46cac9..ac9b3567 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -14,6 +14,8 @@ from apify._utils import ( budget_ow, + compute_short_hash, + compute_unique_key, fetch_and_parse_env_var, force_remove, force_rename, @@ -23,6 +25,7 @@ maybe_parse_bool, maybe_parse_datetime, maybe_parse_int, + normalize_url, raise_on_duplicate_storage, raise_on_non_existing_storage, run_func_at_interval_async, @@ -266,3 +269,100 @@ def test__budget_ow() -> None: 'ordered_dict': (dict, False), }, ) + + +def test_get_short_base64_hash_with_known_input() -> None: + data = b'Hello world!' + expected_hash = 'c0535e4b' + assert compute_short_hash(data) == expected_hash, 'The hash does not match the expected output' + + +def test_get_short_base64_hash_with_empty_input() -> None: + data = b'' + expected_hash = 'e3b0c442' + assert compute_short_hash(data) == expected_hash, 'The hash for an empty input should follow the expected pattern' + + +def test_get_short_base64_hash_output_length() -> None: + data = b'some random data' + assert len(compute_short_hash(data)) == 8, 'The output hash should be 8 characters long' + + +def test_get_short_base64_hash_differentiates_input() -> None: + data1 = b'input 1' + data2 = b'input 2' + assert compute_short_hash(data1) != compute_short_hash(data2), 'Different inputs should produce different hashes' + + +@pytest.mark.parametrize( + ('url', 'expected_output', 'keep_url_fragment'), + [ + ('https://example.com/?utm_source=test&utm_medium=test&key=value', 'https://example.com?key=value', False), + ('http://example.com/?key=value&another_key=another_value', 'http://example.com?another_key=another_value&key=value', False), + ('HTTPS://EXAMPLE.COM/?KEY=VALUE', 'https://example.com?key=value', False), + ('', '', False), + ('http://example.com/#fragment', 'http://example.com#fragment', True), + ('http://example.com/#fragment', 'http://example.com', False), + (' https://example.com/ ', 'https://example.com', False), + ('http://example.com/?b=2&a=1', 'http://example.com?a=1&b=2', False), + ], + ids=[ + 'remove_utm_params', + 'retain_sort_non_utm_params', + 'convert_scheme_netloc_to_lowercase', + 'handle_empty_url', + 'retain_fragment', + 'remove_fragment', + 'trim_whitespace', + 'sort_query_params', + ], +) +def test_normalize_url(url: str, expected_output: str, *, keep_url_fragment: bool) -> None: + output = normalize_url(url, keep_url_fragment=keep_url_fragment) + assert output == expected_output + + +@pytest.mark.parametrize( + ('url', 'method', 'payload', 'keep_url_fragment', 'use_extended_unique_key', 'expected_output'), + [ + ('http://example.com', 'GET', None, False, False, 'http://example.com'), + ('http://example.com', 'POST', None, False, False, 'http://example.com'), + ('http://example.com', 'GET', b'data', False, False, 'http://example.com'), + ('http://example.com', 'GET', b'data', False, True, 'GET(3a6eb079):http://example.com'), + ('http://example.com', 'POST', b'data', False, True, 'POST(3a6eb079):http://example.com'), + ('http://example.com#fragment', 'GET', None, True, False, 'http://example.com#fragment'), + ('http://example.com#fragment', 'GET', None, False, False, 'http://example.com'), + ('http://example.com', 'DELETE', b'test', False, True, 'DELETE(9f86d081):http://example.com'), + ('https://example.com?utm_content=test', 'GET', None, False, False, 'https://example.com'), + ('https://example.com?utm_content=test', 'GET', None, True, False, 'https://example.com'), + ], + ids=[ + 'simple_get', + 'simple_post', + 'get_with_payload', + 'get_with_payload_extended', + 'post_with_payload_extended', + 'get_with_fragment', + 'get_remove_fragment', + 'delete_with_payload_extended', + 'get_remove_utm', + 'get_keep_utm_fragment', + ], +) +def test_compute_unique_key( + url: str, + method: str, + payload: bytes | None, + *, + keep_url_fragment: bool, + use_extended_unique_key: bool, + expected_output: str, +) -> None: + output = compute_unique_key( + url, + method, + payload, + keep_url_fragment=keep_url_fragment, + use_extended_unique_key=use_extended_unique_key, + ) + assert output == expected_output