diff --git a/demo.py b/demo.py index 758ca299f..72e615db8 100644 --- a/demo.py +++ b/demo.py @@ -54,6 +54,9 @@ # browser_param.callstack_instrument = True # Record DNS resolution browser_param.dns_instrument = True + # Set this value as appropriate for the size of your temp directory + # if you are running out of space + browser_param.maximum_profile_size = 50 * (10**20) # 50 MB = 50 * 2^20 Bytes # Update TaskManager configuration (use this for crawl-wide settings) manager_params.data_directory = Path("./datadir/") diff --git a/environment.yaml b/environment.yaml index a11aa9181..81fb1ff53 100644 --- a/environment.yaml +++ b/environment.yaml @@ -3,39 +3,40 @@ channels: - main dependencies: - beautifulsoup4=4.12.2 -- black=23.7.0 +- black=23.9.1 - click=8.1.7 - codecov=2.1.13 - dill=0.3.7 +- dill=0.3.7 - easyprocess=1.1 -- gcsfs=2023.9.0 +- gcsfs=2023.9.1 - geckodriver=0.33.0 - ipython=8.15.0 - isort=5.12.0 - leveldb=1.23 - multiprocess=0.70.15 - mypy=1.5.1 -- nodejs=20.6.0 +- nodejs=20.7.0 - pandas=2.1.0 -- pillow=10.0.0 +- pillow=10.0.1 - pip=23.2.1 - pre-commit=3.4.0 - psutil=5.9.5 - pyarrow=13.0.0 - pytest-asyncio=0.21.1 - pytest-cov=4.1.0 -- pytest=7.4.1 +- pytest=7.4.2 - python=3.11.5 - pyvirtualdisplay=3.0 - recommonmark=0.7.1 - redis-py=5.0.0 -- s3fs=2023.9.0 +- s3fs=2023.9.1 - selenium=4.12.0 -- sentry-sdk=1.30.0 +- sentry-sdk=1.31.0 - sphinx-markdown-tables=0.0.17 -- sphinx=7.2.5 +- sphinx=7.2.6 - tabulate=0.9.0 -- tblib=1.7.0 +- tblib=2.0.0 - wget=1.20.3 - pip: - dataclasses-json==0.6.0 @@ -44,6 +45,6 @@ dependencies: - plyvel==1.5.0 - tranco==0.6 - types-pyyaml==6.0.12.11 - - types-redis==4.6.0.5 + - types-redis==4.6.0.6 - types-tabulate==0.9.0.3 name: openwpm diff --git a/openwpm/browser_manager.py b/openwpm/browser_manager.py index af7685d17..775352371 100644 --- a/openwpm/browser_manager.py +++ b/openwpm/browser_manager.py @@ -34,6 +34,7 @@ kill_process_and_children, parse_traceback_for_sentry, ) +from .utilities.storage_watchdog import profile_size_exceeds_max_size pickling_support.install() @@ -42,7 +43,7 @@ class BrowserManagerHandle: - """The BrowserManagerHandle class is responsible for holding all of the + """The BrowserManagerHandle class is responsible for holding all the configuration and status information on BrowserManager process it corresponds to. It also includes a set of methods for managing the BrowserManager process and its child processes/threads. @@ -501,6 +502,16 @@ def execute_command_sequence( if task_manager.closing: return + # Allow StorageWatchdog to utilize built-in browser reset functionality + # which results in a graceful restart of the browser instance + if self.browser_params.maximum_profile_size: + assert self.current_profile_path is not None + + reset = profile_size_exceeds_max_size( + self.current_profile_path, + self.browser_params.maximum_profile_size, + ) + if self.restart_required or reset: success = self.restart_browser_manager(clear_profile=reset) if not success: @@ -564,7 +575,11 @@ def kill_browser_manager(self): "type %s" % (self.browser_id, str(self.display_pid)) ) if self.display_port is not None: # xvfb display lock - lockfile = "/tmp/.X%s-lock" % self.display_port + # lockfile = "/tmp/.X%s-lock" % self.display_port + lockfile = os.path.join( + self.browser_params.tmp_profile_dir, f".X{self.display_port}-lock" + ) + try: os.remove(lockfile) except OSError: diff --git a/openwpm/config.py b/openwpm/config.py index 5c0b811f6..1898d3901 100644 --- a/openwpm/config.py +++ b/openwpm/config.py @@ -1,4 +1,4 @@ -import os +import tempfile from dataclasses import dataclass, field from json import JSONEncoder from pathlib import Path @@ -99,6 +99,47 @@ class BrowserParams(DataClassJsonMixin): profile_archive_dir: Optional[Path] = field( default=None, metadata=DCJConfig(encoder=path_to_str, decoder=str_to_path) ) + + tmp_profile_dir: Path = field( + default=Path(tempfile.gettempdir()), + metadata=DCJConfig(encoder=path_to_str, decoder=str_to_path), + ) + """ + The tmp_profile_dir defaults to the OS's temporary file folder (typically /tmp) and is where the generated + browser profiles and residual files are stored. + """ + + maximum_profile_size: Optional[int] = None + """ + The total amount of on disk space the generated + browser profiles and residual files are allowed to consume in bytes. + If this option is not set, no checks will be performed + + Rationale + --------- + This option can serve as a happy medium between killing a browser after each + crawl and allowing the application to still perform quickly. + + Used as a way to save space + in a limited environment with minimal detriment to speed. + + If the maximum_profile_size is exceeded after a CommandSequence + is completed, the browser will be shut down and a new one will + be created. **Even with this setting you may temporarily have + more disk usage than the sum of all maximum_profile_sizes** + However, this will also ensure that a CommandSequence is + allowed to complete without undue interruptions. + + Sample values + ------------- + * 1073741824: 1GB + * 20971520: 20MB - for testing purposes + * 52428800: 50MB + * 73400320: 70MB + * 104857600: 100MB - IDEAL for 10+ browsers + + """ + recovery_tar: Optional[Path] = None donottrack: bool = False tracking_protection: bool = False @@ -133,8 +174,11 @@ class ManagerParams(DataClassJsonMixin): """A watchdog that tries to ensure that no Firefox instance takes up too much memory. It is mostly useful for long running cloud crawls""" process_watchdog: bool = False - """It is used to create another thread that kills off `GeckoDriver` (or `Xvfb`) instances that haven't been spawned by OpenWPM. (GeckoDriver is used by - Selenium to control Firefox and Xvfb a "virtual display" so we simulate having graphics when running on a server).""" + """It is used to create another thread that kills off `GeckoDriver` (or `Xvfb`) + instances that haven't been spawned by OpenWPM. (GeckoDriver is used by + Selenium to control Firefox and Xvfb a "virtual display" so we simulate having graphics when running on a server). + """ + num_browsers: int = 1 _failure_limit: Optional[int] = None """The number of command failures the platform will tolerate before raising a diff --git a/openwpm/deploy_browsers/deploy_firefox.py b/openwpm/deploy_browsers/deploy_firefox.py index 98c06cc56..543b236da 100755 --- a/openwpm/deploy_browsers/deploy_firefox.py +++ b/openwpm/deploy_browsers/deploy_firefox.py @@ -35,7 +35,9 @@ def deploy_firefox( root_dir = os.path.dirname(__file__) # directory of this file - browser_profile_path = Path(tempfile.mkdtemp(prefix="firefox_profile_")) + browser_profile_path = Path( + tempfile.mkdtemp(prefix="firefox_profile_", dir=browser_params.tmp_profile_dir) + ) status_queue.put(("STATUS", "Profile Created", browser_profile_path)) # Use Options instead of FirefoxProfile to set preferences since the @@ -167,8 +169,6 @@ def deploy_firefox( # Get browser process pid if hasattr(driver, "service") and hasattr(driver.service, "process"): pid = driver.service.process.pid - elif hasattr(driver, "binary") and hasattr(driver.options.binary, "process"): - pid = driver.options.binary.process.pid else: raise RuntimeError("Unable to identify Firefox process ID.") diff --git a/openwpm/task_manager.py b/openwpm/task_manager.py index c3d63a150..b1b1ac027 100644 --- a/openwpm/task_manager.py +++ b/openwpm/task_manager.py @@ -3,6 +3,7 @@ import pickle import threading import time +from functools import reduce from types import TracebackType from typing import Any, Dict, List, Optional, Set, Type @@ -29,6 +30,7 @@ ) from .utilities.multiprocess_utils import kill_process_and_children from .utilities.platform_utils import get_configuration_string, get_version +from .utilities.storage_watchdog import StorageLogger tblib.pickling_support.install() @@ -79,8 +81,8 @@ def __init__( manager_params.source_dump_path = manager_params.data_directory / "sources" - self.manager_params = manager_params - self.browser_params = browser_params + self.manager_params: ManagerParamsInternal = manager_params + self.browser_params: List[BrowserParamsInternal] = browser_params self._logger_kwargs = logger_kwargs # Create data directories if they do not exist @@ -108,7 +110,7 @@ def __init__( self.logging_server = MPLogger( self.manager_params.log_path, str(structured_storage_provider), - **self._logger_kwargs + **self._logger_kwargs, ) self.manager_params.logger_address = self.logging_server.logger_address self.logger = logging.getLogger("openwpm") @@ -128,6 +130,20 @@ def __init__( thread.name = "OpenWPM-watchdog" thread.start() + # Start the StorageLogger if a maximum storage value has been specified for any browser + if reduce( + lambda x, y: x or y, + map(lambda p: p.maximum_profile_size is not None, self.browser_params), + False, + ): + storage_logger = StorageLogger( + self.browser_params[0].tmp_profile_dir, + ) + + storage_logger.daemon = True + storage_logger.name = "OpenWPM-storage-logger" + + storage_logger.start() # Save crawl config information to database openwpm_v, browser_v = get_version() self.storage_controller_handle.save_configuration( @@ -363,6 +379,7 @@ def _start_thread( # Start command execution thread args = (self, command_sequence) thread = threading.Thread(target=browser.execute_command_sequence, args=args) + thread.name = f"BrowserManagerHandle-{browser.browser_id}" browser.command_thread = thread thread.daemon = True thread.start() diff --git a/openwpm/utilities/storage_watchdog.py b/openwpm/utilities/storage_watchdog.py new file mode 100644 index 000000000..d6ae29e38 --- /dev/null +++ b/openwpm/utilities/storage_watchdog.py @@ -0,0 +1,123 @@ +import logging +import math +import os +import subprocess +import time +from pathlib import Path +from threading import Thread +from typing import Optional + +# Nifty little function to prettyfi the output. Takes in a number of bytes and spits out the +# corresponding size in the largest unit it is able to convert to. + + +def convert_size(size_bytes: int) -> str: + if size_bytes == 0: + return "0B" + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + i: int = int(math.floor(math.log(size_bytes, 1024))) + p: float = math.pow(1024, i) + s: float = round(size_bytes / p, 2) + return "%s %s" % (s, size_name[i]) + + +def total_folder_size(startup: bool = False, root_dir: str = "/tmp") -> str: + """Generates a human-readable message about the current size of the directory + + Args: + startup (bool, optional): Runs the function on the total supplied folder. + root_dir (str, optional): The root directory that will be recursively checked. + """ + + running_total: int = 0 + if not startup: + for file in os.listdir(root_dir): + if "firefox" in file or ".xpi" in file or "owpm" in file or "Temp" in file: + path = os.path.join(root_dir, file) + try: + running_total += int( + subprocess.check_output(["du", "-s", "-b", path]) + .split()[0] + .decode("utf-8") + ) + except: + pass + return f"Currently using: {convert_size(running_total)} of storage on disk..." + + for file in os.listdir(root_dir): + path = os.path.join(root_dir, file) + try: + running_total += int( + subprocess.check_output( + ["du", "-s", "-b", path], stderr=subprocess.DEVNULL + ) + .split()[0] + .decode("utf-8") + ) + except: + pass + + return f"Readable files in {root_dir} folder take up {convert_size(running_total)} of storage on disk at start time..." + + +class StorageLogger(Thread): + """Logs the total amount of storage used in the supplied_dir""" + + def __init__(self, supplied_dir: Optional[Path] = None) -> None: + super().__init__() + self.dir_to_watch = supplied_dir + + def run(self) -> None: + logger = logging.getLogger("openwpm") + # Checks if the default dirsize and directory to watch were configured. + # If they are still the default, it exits because + # it would essentially work identically to setting the "reset" flag in the command sequence + if self.dir_to_watch is None: + logger.info("No dir_to_watch specified. StorageLogger shutting down") + return + + logger.info("Starting the StorageLogger...") + logger.info(total_folder_size(startup=True)) + try: + while True: + time.sleep(300) # Give storage updates every 5 minutes + logger.info(total_folder_size()) + except: + print("Error") + + +def profile_size_exceeds_max_size( + profile_path: Path, + max_dir_size: int, +) -> bool: + logger = logging.getLogger("openwpm") + # 1073741824: # 1GB + # 20971520: # 20MB - for testing purposes + # 52428800: # 50MB + # 73400320: # 70MB + # 104857600: 100MB - IDEAL for 10+ browsers + + readable_max_dir_size = convert_size(max_dir_size) + + dir_size = int( + subprocess.check_output(["du", "-s", "-b", profile_path]) + .split()[0] + .decode("utf-8") + ) + readable_dir_size = convert_size(dir_size) + + if dir_size < max_dir_size: + logger.info( + f"Current browser profile directory {profile_path} size is less than {readable_max_dir_size}: {readable_dir_size}" + ) + return False + else: + logger.info( + f"{profile_path}: Folder scheduled to be deleted and recover {readable_dir_size} of storage." + ) + return True + + +if __name__ == "__main__": + print("---Testing the StorageWatchdog folder size function---") + print(total_folder_size(startup=True)) diff --git a/test/conftest.py b/test/conftest.py index b46a41122..dc9212f23 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -2,7 +2,7 @@ import os import subprocess from pathlib import Path -from typing import Any, Callable, Generator, List, Literal, Tuple +from typing import Any, Callable, Generator, List, Literal, Protocol, Tuple, TypeAlias import pytest @@ -44,10 +44,11 @@ def server(): server_thread.join() +FullConfig: TypeAlias = tuple[ManagerParams, list[BrowserParams]] + + @pytest.fixture() -def default_params( - tmp_path: Path, num_browsers: int = NUM_BROWSERS -) -> Tuple[ManagerParams, List[BrowserParams]]: +def default_params(tmp_path: Path, num_browsers: int = NUM_BROWSERS) -> FullConfig: """Just a simple wrapper around task_manager.load_default_params""" manager_params = ManagerParams( @@ -63,17 +64,16 @@ def default_params( return manager_params, browser_params +TaskManagerCreator: TypeAlias = Callable[[FullConfig], Tuple[TaskManager, Path]] + + @pytest.fixture() -def task_manager_creator( - server: None, xpi: None -) -> Callable[[Tuple[ManagerParams, List[BrowserParams]]], Tuple[TaskManager, Path]]: +def task_manager_creator(server: None, xpi: None) -> TaskManagerCreator: """We create a callable that returns a TaskManager that has been configured with the Manager and BrowserParams""" # We need to create the fixtures like this because usefixtures doesn't work on fixtures - def _create_task_manager( - params: Tuple[ManagerParams, List[BrowserParams]] - ) -> Tuple[TaskManager, Path]: + def _create_task_manager(params: FullConfig) -> Tuple[TaskManager, Path]: manager_params, browser_params = params db_path = manager_params.data_directory / "crawl-data.sqlite" structured_provider = SQLiteStorageProvider(db_path) @@ -88,17 +88,24 @@ def _create_task_manager( return _create_task_manager +class HttpParams(Protocol): + def __call__( + self, display_mode: Literal["headless", "xvfb"] = "headless" + ) -> FullConfig: + ... + + @pytest.fixture() def http_params( - default_params: Tuple[ManagerParams, List[BrowserParams]], -) -> Callable[[Literal["headless", "xvfb"]], Tuple[ManagerParams, List[BrowserParams]]]: + default_params: FullConfig, +) -> HttpParams: manager_params, browser_params = default_params for browser_param in browser_params: browser_param.http_instrument = True def parameterize( display_mode: Literal["headless", "xvfb"] = "headless", - ) -> Tuple[ManagerParams, List[BrowserParams]]: + ) -> FullConfig: for browser_param in browser_params: browser_param.display_mode = display_mode return manager_params, browser_params diff --git a/test/test_callback.py b/test/test_callback.py index 51a6c3513..89725f456 100644 --- a/test/test_callback.py +++ b/test/test_callback.py @@ -3,10 +3,13 @@ from openwpm.command_sequence import CommandSequence +from .conftest import FullConfig, TaskManagerCreator from .utilities import BASE_TEST_URL -def test_local_callbacks(default_params, task_manager_creator): +def test_local_callbacks( + default_params: FullConfig, task_manager_creator: TaskManagerCreator +) -> None: """Test the storage controller as well as the entire callback machinery to see if all callbacks get correctly called""" manager, _ = task_manager_creator(default_params) diff --git a/test/test_custom_function_command.py b/test/test_custom_function_command.py index 64a083fbf..c35415e44 100644 --- a/test/test_custom_function_command.py +++ b/test/test_custom_function_command.py @@ -54,7 +54,7 @@ def execute( element.get_attribute("href") for element in webdriver.find_elements(By.TAG_NAME, "a") ) - if x.startswith(self.scheme + "://") + if x is not None and x.startswith(self.scheme + "://") ] current_url = webdriver.current_url diff --git a/test/test_http_instrumentation.py b/test/test_http_instrumentation.py index 842f98531..f98fa7474 100644 --- a/test/test_http_instrumentation.py +++ b/test/test_http_instrumentation.py @@ -6,8 +6,9 @@ import os from hashlib import sha256 from pathlib import Path +from sqlite3 import Row from time import sleep -from typing import List, Optional, Set, Tuple +from typing import List, Optional, Set, Tuple, Union from urllib.parse import urlparse import pytest @@ -21,6 +22,7 @@ from openwpm.utilities import db_utils from . import utilities +from .conftest import FullConfig, HttpParams, TaskManagerCreator from .openwpmtest import OpenWPMTest # Data for test_page_visit @@ -32,7 +34,7 @@ # loading_href, # is_XHR, is_tp_content, is_tp_window, # resource_type -HTTP_REQUESTS = { +HTTP_REQUESTS: set[tuple[Union[str, None, int], ...]] = { ( f"{utilities.BASE_TEST_URL}/http_test_page.html", f"{utilities.BASE_TEST_URL}/http_test_page.html", @@ -202,7 +204,7 @@ # format: (request_url, referrer, location) # TODO: webext instrumentation doesn't support referrer yet -HTTP_RESPONSES = { +HTTP_RESPONSES: set[tuple[str, str]] = { ( f"{utilities.BASE_TEST_URL}/http_test_page.html", # u'', @@ -256,7 +258,7 @@ } # format: (source_url, destination_url, location header) -HTTP_REDIRECTS = { +HTTP_REDIRECTS: set[tuple[str, str, str | None]] = { ( f"{utilities.BASE_TEST_URL_NOPATH}/MAGIC_REDIRECT/req1.png", f"{utilities.BASE_TEST_URL_NOPATH}/MAGIC_REDIRECT/req2.png", @@ -285,7 +287,7 @@ } # Data for test_cache_hits_recorded -HTTP_CACHED_REQUESTS = { +HTTP_CACHED_REQUESTS: set[tuple[Union[str, None, int], ...]] = { ( f"{utilities.BASE_TEST_URL}/http_test_page.html", f"{utilities.BASE_TEST_URL}/http_test_page.html", @@ -411,7 +413,7 @@ # format: (request_url, referrer, is_cached) # TODO: referrer isn't recorded by webext instrumentation yet. -HTTP_CACHED_RESPONSES = { +HTTP_CACHED_RESPONSES: set[tuple[str, int]] = { ( f"{utilities.BASE_TEST_URL}/http_test_page.html", # u'', @@ -441,7 +443,7 @@ } # format: (source_url, destination_url) -HTTP_CACHED_REDIRECTS = { +HTTP_CACHED_REDIRECTS: set[tuple[str, str]] = { ( f"{utilities.BASE_TEST_URL_NOPATH}/MAGIC_REDIRECT/frame1.png", f"{utilities.BASE_TEST_URL_NOPATH}/MAGIC_REDIRECT/frame2.png", @@ -465,7 +467,7 @@ } # Test URL attribution for worker script requests -HTTP_WORKER_SCRIPT_REQUESTS = { +HTTP_WORKER_SCRIPT_REQUESTS: set[tuple[Union[str, None, int], ...]] = { ( f"{utilities.BASE_TEST_URL}/http_worker_page.html", f"{utilities.BASE_TEST_URL}/http_worker_page.html", @@ -524,7 +526,7 @@ } # Test URL-attribution for Service Worker requests. -HTTP_SERVICE_WORKER_REQUESTS = { +HTTP_SERVICE_WORKER_REQUESTS: set[tuple[Union[str, None, int], ...]] = { ( "http://localhost:8000/test_pages/http_service_worker_page.html", "http://localhost:8000/test_pages/http_service_worker_page.html", @@ -586,9 +588,7 @@ class TestHTTPInstrument(OpenWPMTest): - def get_config( - self, data_dir: Optional[Path] - ) -> Tuple[ManagerParams, List[BrowserParams]]: + def get_config(self, data_dir: Optional[Path]) -> FullConfig: manager_params, browser_params = self.get_test_config(data_dir) browser_params[0].http_instrument = True return manager_params, browser_params @@ -814,7 +814,9 @@ def test_record_file_upload(self, task_manager_creator): @pytest.mark.parametrize("delayed", [True, False]) -def test_page_visit(task_manager_creator, http_params, delayed): +def test_page_visit( + task_manager_creator: TaskManagerCreator, http_params: HttpParams, delayed: bool +) -> None: test_url = utilities.BASE_TEST_URL + "/http_test_page.html" manager_params, browser_params = http_params() if delayed: @@ -836,9 +838,10 @@ def test_page_visit(task_manager_creator, http_params, delayed): # HTTP Requests rows = db_utils.query_db(db, "SELECT * FROM http_requests") - observed_records = set() + observed_requests = set() for row in rows: - observed_records.add( + assert isinstance(row, Row) + observed_requests.add( ( row["url"].split("?")[0], row["top_level_url"], @@ -853,13 +856,14 @@ def test_page_visit(task_manager_creator, http_params, delayed): ) request_id_to_url[row["request_id"]] = row["url"] - assert HTTP_REQUESTS == observed_records + assert HTTP_REQUESTS == observed_requests # HTTP Responses rows = db_utils.query_db(db, "SELECT * FROM http_responses") - observed_records: Set[Tuple[str, str]] = set() + observed_responses: Set[Tuple[str, str]] = set() for row in rows: - observed_records.add( + assert isinstance(row, Row) + observed_responses.add( ( row["url"].split("?")[0], # TODO: webext-instrumentation doesn't support referrer @@ -869,12 +873,13 @@ def test_page_visit(task_manager_creator, http_params, delayed): ) assert row["request_id"] in request_id_to_url assert request_id_to_url[row["request_id"]] == row["url"] - assert HTTP_RESPONSES == observed_records + assert HTTP_RESPONSES == observed_responses # HTTP Redirects rows = db_utils.query_db(db, "SELECT * FROM http_redirects") - observed_records = set() + observed_redirects: set[tuple[str, str, str | None]] = set() for row in rows: + assert isinstance(row, Row) # TODO: webext instrumentation doesn't support new_request_id yet # src = request_id_to_url[row['old_request_id']].split('?')[0] # dst = request_id_to_url[row['new_request_id']].split('?')[0] @@ -886,8 +891,8 @@ def test_page_visit(task_manager_creator, http_params, delayed): if header.lower() == "location": location = value break - observed_records.add((src, dst, location)) - assert HTTP_REDIRECTS == observed_records + observed_redirects.add((src, dst, location)) + assert HTTP_REDIRECTS == observed_redirects def test_javascript_saving(http_params, xpi, server):