From a43a17fbecf566a1e7d43160ffafce3c5441d5ab Mon Sep 17 00:00:00 2001 From: Donald Stufft Date: Wed, 23 Jan 2019 09:26:35 -0500 Subject: [PATCH 1/3] Rename schema.json to schema/downloads.json --- linehaul/cli.py | 5 ++++- linehaul/schema/__init__.py | 0 linehaul/{schema.json => schema/downloads.json} | 0 setup.py | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 linehaul/schema/__init__.py rename linehaul/{schema.json => schema/downloads.json} (100%) diff --git a/linehaul/cli.py b/linehaul/cli.py index 3ccf0ae..8aac503 100644 --- a/linehaul/cli.py +++ b/linehaul/cli.py @@ -363,6 +363,9 @@ def migrate(credentials_file, credentials_blob, table): TABLE is a BigQuery table identifier of the form ProjectId.DataSetId.TableId. """ bq = _configure_bigquery(credentials_file, credentials_blob) - schema = json.loads(importlib_resources.read_text("linehaul", "schema.json")) + _, table_name = table.rsplit(".", 1) + schema = json.loads( + importlib_resources.read_text("linehaul.schema", table_name + ".json") + ) trio.run(migrate_, bq, table, schema) diff --git a/linehaul/schema/__init__.py b/linehaul/schema/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/linehaul/schema.json b/linehaul/schema/downloads.json similarity index 100% rename from linehaul/schema.json rename to linehaul/schema/downloads.json diff --git a/setup.py b/setup.py index 585272b..5b0e65e 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ "version_scheme": lambda v: "3.{.distance}.0".format(v), }, packages=find_packages(exclude=["tests*"]), - package_data={"linehaul": ["schema.json"]}, + package_data={"linehaul.schema": ["*.json"]}, entry_points={"console_scripts": ["linehaul = linehaul.cli:cli"]}, install_requires=install_requires, setup_requires=["setuptools_scm"], From 12e122140114b75564d5e6ad90b0d709dacd04db Mon Sep 17 00:00:00 2001 From: Donald Stufft Date: Wed, 23 Jan 2019 09:50:51 -0500 Subject: [PATCH 2/3] Add a table for logging requests to /simple/* --- linehaul/schema/simple_requests.json | 149 +++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 linehaul/schema/simple_requests.json diff --git a/linehaul/schema/simple_requests.json b/linehaul/schema/simple_requests.json new file mode 100644 index 0000000..a3108bc --- /dev/null +++ b/linehaul/schema/simple_requests.json @@ -0,0 +1,149 @@ +[ + { + "name": "timestamp", + "type": "TIMESTAMP", + "mode": "REQUIRED" + }, + { + "name": "country_code", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "REQUIRED" + }, + { + "name": "project", + "type": "STRING", + "mode": "REQUIRED" + }, + { + "name": "details", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "installer", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "version", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "python", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "implementation", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "version", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "distro", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "version", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "libc", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "lib", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "version", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "system", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "release", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "cpu", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "openssl_version", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "setuptools_version", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "tls_protocol", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "tls_cipher", + "type": "STRING", + "mode": "NULLABLE" + } +] From 94b35ae769caa8e860694b954e9427ff6748ac41 Mon Sep 17 00:00:00 2001 From: Donald Stufft Date: Wed, 23 Jan 2019 11:30:54 -0500 Subject: [PATCH 3/3] Support parsing of simple requests --- linehaul/cli.py | 9 +++-- linehaul/events/parser.py | 78 ++++++++++++++++++++++++++++++++++----- linehaul/server.py | 55 +++++++++++++++++++++++---- 3 files changed, 121 insertions(+), 21 deletions(-) diff --git a/linehaul/cli.py b/linehaul/cli.py index 8aac503..8ce8242 100644 --- a/linehaul/cli.py +++ b/linehaul/cli.py @@ -260,7 +260,8 @@ def cli(log_level, log_file): show_default=True, help="Maximum number of concurrent connections to BigQuery.", ) -@click.argument("table") +@click.argument("downloads-table") +@click.argument("simple-requests-table") def server( credentials_file, credentials_blob, @@ -279,7 +280,8 @@ def server( retry_multiplier, api_timeout, api_max_connections, - table, + downloads_table, + simple_requests_table, ): """ Starts a server in the foreground that listens for incoming syslog events, processes @@ -320,7 +322,8 @@ def server( partial( server_, bq, - table, + downloads_table, + simple_requests_table, bind=bind, port=port, tls_certificate=tls_certificate, diff --git a/linehaul/events/parser.py b/linehaul/events/parser.py index 313b26e..f7e2d9a 100644 --- a/linehaul/events/parser.py +++ b/linehaul/events/parser.py @@ -21,6 +21,7 @@ import attr.validators import cattr +from packaging.utils import canonicalize_name from pyparsing import Literal as L, Word, Optional as OptionalItem from pyparsing import printables as _printables, restOfLine from pyparsing import ParseException @@ -108,6 +109,20 @@ class NullValue: USER_AGENT = USER_AGENT.setResultsName("user_agent") USER_AGENT.setName("UserAgent") +SIMPLE_SIGIL = L("simple") +SIMPLE_SIGIL = SIMPLE_SIGIL.setResultsName("message_type") +SIMPLE_SIGIL.setName("Message Type") + +SIMPLE_MESSAGE = SIMPLE_SIGIL + PIPE + REQUEST + PIPE + TLS + PIPE + USER_AGENT + +DOWNLOAD_SIGIL = L("download") +DOWNLOAD_SIGIL = DOWNLOAD_SIGIL.setResultsName("message_type") +DOWNLOAD_SIGIL.setName("message_type") + +DOWNLOAD_MESSAGE = ( + DOWNLOAD_SIGIL + PIPE + REQUEST + PIPE + TLS + PIPE + PROJECT + PIPE + USER_AGENT +) + V1_HEADER = OptionalItem(L("1").suppress() + AT) MESSAGE_v1 = V1_HEADER + REQUEST + PIPE + PROJECT + PIPE + USER_AGENT @@ -118,7 +133,11 @@ class NullValue: MESSAGE_v2 = V2_HEADER + REQUEST + PIPE + TLS + PIPE + PROJECT + PIPE + USER_AGENT MESSAGE_v2.leaveWhitespace() -MESSAGE = MESSAGE_v2 | MESSAGE_v1 +V3_HEADER = L("3").suppress() + AT + +MESSAGE_v3 = V3_HEADER + (SIMPLE_MESSAGE | DOWNLOAD_MESSAGE) + +MESSAGE = MESSAGE_v3 | MESSAGE_v2 | MESSAGE_v1 @enum.unique @@ -163,6 +182,26 @@ class Download: details = attr.ib(type=Optional[UserAgent], default=None) +@attr.s(slots=True, frozen=True) +class SimpleRequest: + timestamp = attr.ib(type=arrow.Arrow) + url = attr.ib(validator=attr.validators.instance_of(str)) + project = attr.ib(validator=attr.validators.instance_of(str)) + tls_protocol = attr.ib( + default=None, + validator=attr.validators.optional(attr.validators.instance_of(str)), + ) + tls_cipher = attr.ib( + default=None, + validator=attr.validators.optional(attr.validators.instance_of(str)), + ) + country_code = attr.ib( + default=None, + validator=attr.validators.optional(attr.validators.instance_of(str)), + ) + details = attr.ib(type=Optional[UserAgent], default=None) + + def _value_or_none(value): if value is NullValue or value == "": return None @@ -170,12 +209,7 @@ def _value_or_none(value): return value -def parse(message): - try: - parsed = MESSAGE.parseString(message, parseAll=True) - except ParseException as exc: - raise UnparseableEvent("{!r} {}".format(message, exc)) from None - +def _parse_download(parsed): data = {} data["timestamp"] = parsed.timestamp data["tls_protocol"] = _value_or_none(parsed.tls_protocol) @@ -188,7 +222,31 @@ def parse(message): data["file"]["version"] = _value_or_none(parsed.version) data["file"]["type"] = _value_or_none(parsed.package_type) - download = _cattr.structure(data, Download) + return _cattr.structure(data, Download) + + +def _parse_simple(parsed): + data = {} + data["timestamp"] = parsed.timestamp + data["tls_protocol"] = _value_or_none(parsed.tls_protocol) + data["tls_cipher"] = _value_or_none(parsed.tls_cipher) + data["country_code"] = _value_or_none(parsed.country_code) + data["url"] = parsed.url + data["project"] = canonicalize_name(posixpath.split(parsed.url.rstrip("/"))[-1]) + + return _cattr.structure(data, SimpleRequest) + + +_DISPATCH = {"download": _parse_download, "simple": _parse_simple} + + +def parse(message): + try: + parsed = MESSAGE.parseString(message, parseAll=True) + except ParseException as exc: + raise UnparseableEvent("{!r} {}".format(message, exc)) from None + + event = _DISPATCH[parsed.message_type](parsed) try: ua = user_agents.parse(parsed.user_agent) @@ -197,6 +255,6 @@ def parse(message): except user_agents.UnknownUserAgentError: logging.info("Unknown User agent: %r", parsed.user_agent) else: - download = attr.evolve(download, details=ua) + event = attr.evolve(event, details=ua) - return download + return event diff --git a/linehaul/server.py b/linehaul/server.py index 60c5719..2dc180b 100644 --- a/linehaul/server.py +++ b/linehaul/server.py @@ -114,7 +114,13 @@ def log_it(retry_obj, sleep, last_result): async def handle_connection( - stream, q, token=None, max_line_size=None, recv_size=None, cleanup_timeout=None + stream, + download_queue, + simple_queue, + token=None, + max_line_size=None, + recv_size=None, + cleanup_timeout=None, ): if recv_size is None: recv_size = 8192 @@ -136,6 +142,11 @@ async def handle_connection( lr = LineReceiver(partial(parse_line, token=token), max_line_size=max_line_size) + queues = { + _event_parser.Download: download_queue, + _event_parser.SimpleRequest: simple_queue, + } + try: while True: try: @@ -145,7 +156,12 @@ async def handle_connection( for event in lr.receive_data(data): logger.log(log_SPEW, "{%s}: Received Event: %r", peer_id, event) - await q.put(event) + try: + queue = queues[type(event)] + except KeyError: + logger.error("{%s}: Unknown event type: %r", peer_id, event) + else: + await queue.put(event) if not data: logger.debug("{%s}: Connection lost from %r.", peer_id, peer) @@ -270,7 +286,8 @@ async def sender( async def server( bq, - table, + download_table, + simple_table, bind="0.0.0.0", port=512, tls_certificate=None, @@ -298,15 +315,30 @@ async def server( # boxed, so this won't grow forever. It will not however, apply any backpressure # to the sender (we can't meaningfully apply backpressure, since these are download # events being streamed to us). - q = trio.Queue(qsize) + download_queue = trio.Queue(qsize) + simple_queue = trio.Queue(qsize) async with trio.open_nursery() as nursery: nursery.start_soon( partial( sender, bq, - table, - q, + download_table, + download_queue, + batch_size=batch_size, + batch_timeout=batch_timeout, + retry_max_attempts=retry_max_attempts, + retry_max_wait=retry_max_wait, + retry_multiplier=retry_multiplier, + api_timeout=api_timeout, + ) + ) + nursery.start_soon( + partial( + sender, + bq, + simple_table, + simple_queue, batch_size=batch_size, batch_timeout=batch_timeout, retry_max_attempts=retry_max_attempts, @@ -318,7 +350,8 @@ async def server( handler = partial( handle_connection, - q=q, + download_queue=download_queue, + simple_queue=simple_queue, token=token, max_line_size=max_line_size, recv_size=recv_size, @@ -332,5 +365,11 @@ async def server( await nursery.start(trio.serve_ssl_over_tcp, handler, port, ctx) - logging.info("Listening on %s:%d and sending to %r", bind, port, table) + logging.info( + "Listening on %s:%d and sending to %r and %r", + bind, + port, + download_table, + simple_table, + ) task_status.started()