Skip to content
This repository has been archived by the owner on Dec 1, 2021. It is now read-only.

Simple stats #45

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions linehaul/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ def cli(log_level, log_file):
show_default=True,
help="Maximum number of concurrent connections to BigQuery.",
)
@click.argument("table")
@click.argument("downloads-table")
@click.argument("simple-requests-table")
def server(
credentials_file,
credentials_blob,
Expand All @@ -279,7 +280,8 @@ def server(
retry_multiplier,
api_timeout,
api_max_connections,
table,
downloads_table,
simple_requests_table,
):
"""
Starts a server in the foreground that listens for incoming syslog events, processes
Expand Down Expand Up @@ -320,7 +322,8 @@ def server(
partial(
server_,
bq,
table,
downloads_table,
simple_requests_table,
bind=bind,
port=port,
tls_certificate=tls_certificate,
Expand Down Expand Up @@ -363,6 +366,9 @@ def migrate(credentials_file, credentials_blob, table):
TABLE is a BigQuery table identifier of the form ProjectId.DataSetId.TableId.
"""
bq = _configure_bigquery(credentials_file, credentials_blob)
schema = json.loads(importlib_resources.read_text("linehaul", "schema.json"))
_, table_name = table.rsplit(".", 1)
schema = json.loads(
importlib_resources.read_text("linehaul.schema", table_name + ".json")
)

trio.run(migrate_, bq, table, schema)
78 changes: 68 additions & 10 deletions linehaul/events/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import attr.validators
import cattr

from packaging.utils import canonicalize_name
from pyparsing import Literal as L, Word, Optional as OptionalItem
from pyparsing import printables as _printables, restOfLine
from pyparsing import ParseException
Expand Down Expand Up @@ -108,6 +109,20 @@ class NullValue:
USER_AGENT = USER_AGENT.setResultsName("user_agent")
USER_AGENT.setName("UserAgent")

SIMPLE_SIGIL = L("simple")
SIMPLE_SIGIL = SIMPLE_SIGIL.setResultsName("message_type")
SIMPLE_SIGIL.setName("Message Type")

SIMPLE_MESSAGE = SIMPLE_SIGIL + PIPE + REQUEST + PIPE + TLS + PIPE + USER_AGENT

DOWNLOAD_SIGIL = L("download")
DOWNLOAD_SIGIL = DOWNLOAD_SIGIL.setResultsName("message_type")
DOWNLOAD_SIGIL.setName("message_type")

DOWNLOAD_MESSAGE = (
DOWNLOAD_SIGIL + PIPE + REQUEST + PIPE + TLS + PIPE + PROJECT + PIPE + USER_AGENT
)

V1_HEADER = OptionalItem(L("1").suppress() + AT)

MESSAGE_v1 = V1_HEADER + REQUEST + PIPE + PROJECT + PIPE + USER_AGENT
Expand All @@ -118,7 +133,11 @@ class NullValue:
MESSAGE_v2 = V2_HEADER + REQUEST + PIPE + TLS + PIPE + PROJECT + PIPE + USER_AGENT
MESSAGE_v2.leaveWhitespace()

MESSAGE = MESSAGE_v2 | MESSAGE_v1
V3_HEADER = L("3").suppress() + AT

MESSAGE_v3 = V3_HEADER + (SIMPLE_MESSAGE | DOWNLOAD_MESSAGE)

MESSAGE = MESSAGE_v3 | MESSAGE_v2 | MESSAGE_v1


@enum.unique
Expand Down Expand Up @@ -163,19 +182,34 @@ class Download:
details = attr.ib(type=Optional[UserAgent], default=None)


@attr.s(slots=True, frozen=True)
class SimpleRequest:
timestamp = attr.ib(type=arrow.Arrow)
url = attr.ib(validator=attr.validators.instance_of(str))
project = attr.ib(validator=attr.validators.instance_of(str))
tls_protocol = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
tls_cipher = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
country_code = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
details = attr.ib(type=Optional[UserAgent], default=None)


def _value_or_none(value):
if value is NullValue or value == "":
return None
else:
return value


def parse(message):
try:
parsed = MESSAGE.parseString(message, parseAll=True)
except ParseException as exc:
raise UnparseableEvent("{!r} {}".format(message, exc)) from None

def _parse_download(parsed):
data = {}
data["timestamp"] = parsed.timestamp
data["tls_protocol"] = _value_or_none(parsed.tls_protocol)
Expand All @@ -188,7 +222,31 @@ def parse(message):
data["file"]["version"] = _value_or_none(parsed.version)
data["file"]["type"] = _value_or_none(parsed.package_type)

download = _cattr.structure(data, Download)
return _cattr.structure(data, Download)


def _parse_simple(parsed):
data = {}
data["timestamp"] = parsed.timestamp
data["tls_protocol"] = _value_or_none(parsed.tls_protocol)
data["tls_cipher"] = _value_or_none(parsed.tls_cipher)
data["country_code"] = _value_or_none(parsed.country_code)
data["url"] = parsed.url
data["project"] = canonicalize_name(posixpath.split(parsed.url.rstrip("/"))[-1])

return _cattr.structure(data, SimpleRequest)


_DISPATCH = {"download": _parse_download, "simple": _parse_simple}


def parse(message):
try:
parsed = MESSAGE.parseString(message, parseAll=True)
except ParseException as exc:
raise UnparseableEvent("{!r} {}".format(message, exc)) from None

event = _DISPATCH[parsed.message_type](parsed)

try:
ua = user_agents.parse(parsed.user_agent)
Expand All @@ -197,6 +255,6 @@ def parse(message):
except user_agents.UnknownUserAgentError:
logging.info("Unknown User agent: %r", parsed.user_agent)
else:
download = attr.evolve(download, details=ua)
event = attr.evolve(event, details=ua)

return download
return event
Empty file added linehaul/schema/__init__.py
Empty file.
File renamed without changes.
149 changes: 149 additions & 0 deletions linehaul/schema/simple_requests.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
[
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "REQUIRED"
},
{
"name": "country_code",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "url",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "project",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "details",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "installer",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "version",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
{
"name": "python",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "implementation",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "version",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
{
"name": "distro",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "version",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "id",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "libc",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "lib",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "version",
"type": "STRING",
"mode": "NULLABLE"
}
]
}
]
},
{
"name": "system",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "release",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
{
"name": "cpu",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "openssl_version",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "setuptools_version",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
{
"name": "tls_protocol",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "tls_cipher",
"type": "STRING",
"mode": "NULLABLE"
}
]
Loading