Skip to content

Commit

Permalink
feat(otel): wip, needs async context manager support
Browse files Browse the repository at this point in the history
  • Loading branch information
vringar committed Jan 24, 2024
1 parent 3f627f7 commit 149d0b4
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 175 deletions.
41 changes: 23 additions & 18 deletions demo.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import argparse
import os
from pathlib import Path
from typing import Literal

import tranco
from honeycomb.opentelemetry import HoneycombOptions, configure_opentelemetry
from opentelemetry import trace

from custom_command import LinkCountingCommand
from openwpm.command_sequence import CommandSequence
Expand All @@ -29,7 +32,6 @@
latest_list = t.list()
sites = ["http://" + x for x in latest_list.top(10)]


display_mode: Literal["native", "headless", "xvfb"] = "native"
if args.headless:
display_mode = "headless"
Expand Down Expand Up @@ -67,6 +69,7 @@
# manager_params.memory_watchdog = True
# manager_params.process_watchdog = True

_tracer = trace.get_tracer(__name__)

# Commands time out by default after 60 seconds
with TaskManager(
Expand All @@ -77,23 +80,25 @@
) as manager:
# Visits the sites
for index, site in enumerate(sites):

def callback(success: bool, val: str = site) -> None:
print(
f"CommandSequence for {val} ran {'successfully' if success else 'unsuccessfully'}"
with _tracer.start_as_current_span(name="command_issuing"):
span = trace.get_current_span()

def callback(success: bool, val: str = site) -> None:
print(
f"CommandSequence for {val} ran {'successfully' if success else 'unsuccessfully'}"
)

# Parallelize sites over all number of browsers set above.
command_sequence = CommandSequence(
site,
site_rank=index,
callback=callback,
)

# Parallelize sites over all number of browsers set above.
command_sequence = CommandSequence(
site,
site_rank=index,
callback=callback,
)

# Start by visiting the page
command_sequence.append_command(GetCommand(url=site, sleep=3), timeout=60)
# Have a look at custom_command.py to see how to implement your own command
command_sequence.append_command(LinkCountingCommand())
# Start by visiting the page
command_sequence.append_command(GetCommand(url=site, sleep=3), timeout=60)
# Have a look at custom_command.py to see how to implement your own command
command_sequence.append_command(LinkCountingCommand())

# Run commands across all browsers (simple parallelization)
manager.execute_command_sequence(command_sequence)
# Run commands across all browsers (simple parallelization)
manager.execute_command_sequence(command_sequence)
1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies:
- domain-utils==0.7.1
- honeycomb-opentelemetry==0.2.3b0
- jsonschema==4.21.1
- opentelemetry-api==1.20.0
- tranco==0.7.1
- types-pyyaml==6.0.12.12
- types-redis==4.6.0.20240106
Expand Down
216 changes: 117 additions & 99 deletions openwpm/browser_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, Union

import psutil
from multiprocess import Process, Queue
from multiprocess import Queue
from opentelemetry import trace
from selenium.common.exceptions import WebDriverException
from tblib import Traceback, pickling_support

Expand All @@ -31,6 +32,7 @@
from .storage.storage_providers import TableName
from .types import BrowserId, VisitId
from .utilities.multiprocess_utils import (
Process,
kill_process_and_children,
parse_traceback_for_sentry,
)
Expand All @@ -41,6 +43,8 @@
if TYPE_CHECKING:
from .task_manager import TaskManager

_tracer = trace.get_tracer(__name__)


class BrowserManagerHandle:
"""The BrowserManagerHandle class is responsible for holding all the
Expand Down Expand Up @@ -340,6 +344,7 @@ def close_browser_manager(self, force: bool = False) -> None:
if not shutdown_complete:
self.kill_browser_manager()

@_tracer.start_as_current_span("execute_command_sequence")
def execute_command_sequence(
self,
# Quoting to break cyclic import, see https://stackoverflow.com/a/39757388
Expand All @@ -351,6 +356,10 @@ def execute_command_sequence(
"""
assert self.browser_id is not None
assert self.curr_visit_id is not None
trace.get_current_span().set_attributes(
{"browser_id": self.browser_id, "visit_id": self.curr_visit_id}
)

task_manager.sock.store_record(
TableName("site_visits"),
self.curr_visit_id,
Expand All @@ -374,71 +383,70 @@ def execute_command_sequence(
assert self.status_queue is not None

for command_and_timeout in command_sequence.get_commands_with_timeout():
command, timeout = command_and_timeout
command.set_visit_browser_id(self.curr_visit_id, self.browser_id)
command.set_start_time(time.time())
self.current_timeout = timeout

# Adding timer to track performance of commands
t1 = time.time_ns()

# passes off command and waits for a success (or failure signal)
self.command_queue.put(command)

# received reply from BrowserManager, either success or failure
error_text = None
tb = None
status = None
try:
status = self.status_queue.get(True, self.current_timeout)
except EmptyQueue:
self.logger.info(
"BROWSER %i: Timeout while executing command, %s, killing "
"browser manager" % (self.browser_id, repr(command))
)
command: BaseCommand = command_and_timeout[0]
timeout: int = command_and_timeout[1]
with _tracer.start_as_current_span(type(command).__name__):
trace.get_current_span().set_attributes({"timeout": timeout})
command.set_visit_browser_id(self.curr_visit_id, self.browser_id)
command.set_start_time(time.time())
self.current_timeout = timeout

# Adding timer to track performance of commands
t1 = time.time_ns()

# passes off command and waits for a success (or failure signal)
self.command_queue.put(command)

# received reply from BrowserManager, either success or failure
error_text = None
tb = None
status = None
try:
status = self.status_queue.get(True, self.current_timeout)
except EmptyQueue:
self.logger.info(
"BROWSER %i: Timeout while executing command, %s, killing "
"browser manager" % (self.browser_id, repr(command))
)

if status is None:
# allows us to skip this entire block without having to bloat
# every if statement
command_status = "timeout"
pass
elif status == "OK":
command_status = "ok"
elif status[0] == "CRITICAL":
command_status = "critical"
self.logger.critical(
"BROWSER %i: Received critical error from browser "
"process while executing command %s. Setting failure "
"status." % (self.browser_id, str(command))
)
task_manager.failure_status = {
"ErrorType": "CriticalChildException",
"CommandSequence": command_sequence,
"Exception": status[1],
}
error_text, tb = self._unpack_pickled_error(status[1])
elif status[0] == "FAILED":
command_status = "error"
error_text, tb = self._unpack_pickled_error(status[1])
self.logger.info(
"BROWSER %i: Received failure status while executing "
"command: %s" % (self.browser_id, repr(command))
)
elif status[0] == "NETERROR":
command_status = "neterror"
error_text, tb = self._unpack_pickled_error(status[1])
error_text = parse_neterror(error_text)
self.logger.info(
"BROWSER %i: Received neterror %s while executing "
"command: %s" % (self.browser_id, error_text, repr(command))
)
else:
raise ValueError("Unknown browser status message %s" % status)
if status is None:
# allows us to skip this entire block without having to bloat
# every if statement
command_status = "timeout"
elif status == "OK":
command_status = "ok"
elif status[0] == "CRITICAL":
command_status = "critical"
self.logger.critical(
"BROWSER %i: Received critical error from browser "
"process while executing command %s. Setting failure "
"status." % (self.browser_id, str(command))
)
task_manager.failure_status = {
"ErrorType": "CriticalChildException",
"CommandSequence": command_sequence,
"Exception": status[1],
}
error_text, tb = self._unpack_pickled_error(status[1])
elif status[0] == "FAILED":
command_status = "error"
error_text, tb = self._unpack_pickled_error(status[1])
self.logger.info(
"BROWSER %i: Received failure status while executing "
"command: %s" % (self.browser_id, repr(command))
)
elif status[0] == "NETERROR":
command_status = "neterror"
error_text, tb = self._unpack_pickled_error(status[1])
error_text = parse_neterror(error_text)
self.logger.info(
"BROWSER %i: Received neterror %s while executing "
"command: %s" % (self.browser_id, error_text, repr(command))
)
else:
raise ValueError("Unknown browser status message %s" % status)

task_manager.sock.store_record(
TableName("crawl_history"),
self.curr_visit_id,
{
table_entry = {
"browser_id": self.browser_id,
"visit_id": self.curr_visit_id,
"command": type(command).__name__,
Expand All @@ -450,44 +458,54 @@ def execute_command_sequence(
"error": error_text,
"traceback": tb,
"duration": int((time.time_ns() - t1) / 1000000),
},
)

if command_status == "critical":
task_manager.sock.finalize_visit_id(
success=False,
visit_id=self.curr_visit_id,
}
task_manager.sock.store_record(
TableName("crawl_history"),
self.curr_visit_id,
table_entry,
)
return

if command_status != "ok":
with task_manager.threadlock:
task_manager.failure_count += 1
if task_manager.failure_count > task_manager.failure_limit:
self.logger.critical(
"BROWSER %i: Command execution failure pushes failure "
"count above the allowable limit. Setting "
"failure_status." % self.browser_id
for k in ["browser_id", "visit_id", "command", "duration"]:
table_entry.pop(k, None)
for k in list(table_entry.keys()):
if table_entry[k] is None:
del table_entry[k]
trace.get_current_span().set_attributes(table_entry)

if command_status == "critical":
task_manager.sock.finalize_visit_id(
success=False,
visit_id=self.curr_visit_id,
)
task_manager.failure_status = {
"ErrorType": "ExceedCommandFailureLimit",
"CommandSequence": command_sequence,
}
return
self.restart_required = True
self.logger.debug(
"BROWSER %i: Browser restart required" % self.browser_id
)
# Reset failure_count at the end of each successful command sequence
elif type(command) is FinalizeCommand:
with task_manager.threadlock:
task_manager.failure_count = 0

if self.restart_required:
task_manager.sock.finalize_visit_id(
success=False, visit_id=self.curr_visit_id
)
break

if command_status != "ok":
with task_manager.threadlock:
task_manager.failure_count += 1
if task_manager.failure_count > task_manager.failure_limit:
self.logger.critical(
"BROWSER %i: Command execution failure pushes failure "
"count above the allowable limit. Setting "
"failure_status." % self.browser_id
)
task_manager.failure_status = {
"ErrorType": "ExceedCommandFailureLimit",
"CommandSequence": command_sequence,
}
return
self.restart_required = True
self.logger.debug(
"BROWSER %i: Browser restart required" % self.browser_id
)
# Reset failure_count at the end of each successful command sequence
elif type(command) is FinalizeCommand:
with task_manager.threadlock:
task_manager.failure_count = 0

if self.restart_required:
task_manager.sock.finalize_visit_id(
success=False, visit_id=self.curr_visit_id
)
break

self.logger.info(
"Finished working on CommandSequence with "
Expand Down
Loading

0 comments on commit 149d0b4

Please sign in to comment.