Skip to content

Commit

Permalink
Merge branch 'y-scope:main' into main-exact-lib-version
Browse files Browse the repository at this point in the history
  • Loading branch information
jackluo923 authored Jan 6, 2025
2 parents 3dc9eb7 + 61f9902 commit db45967
Show file tree
Hide file tree
Showing 88 changed files with 2,657 additions and 556 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/clp-pr-title-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ name: "clp-pr-title-checks"

on:
pull_request_target:
# NOTE: Workflows triggered by this event give the workflow access to secrets and grant the
# `GITHUB_TOKEN` read/write repository access by default. So we need to ensure:
# - This workflow doesn't inadvertently check out, build, or execute untrusted code from the
# pull request triggered by this event.
# - Each job has `permissions` set to only those necessary.
types: ["edited", "opened", "reopened"]
branches: ["main"]

permissions: {}

concurrency:
group: "${{github.workflow}}-${{github.ref}}"

Expand Down
26 changes: 20 additions & 6 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.core import (
get_config_value,
Expand Down Expand Up @@ -239,17 +241,17 @@ def generate_container_config(
DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory
)

container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output"
container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
):
docker_mounts.archives_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
Expand All @@ -268,6 +270,18 @@ def generate_container_config(
return container_clp_config, docker_mounts


def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config = WorkerConfig()
worker_config.package = clp_config.package.copy(deep=True)
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config


def dump_container_config(
container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str
) -> Tuple[pathlib.Path, pathlib.Path]:
Expand Down Expand Up @@ -482,7 +496,7 @@ def validate_results_cache_config(

def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_dir()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig
from clp_py_utils.clp_config import CLPConfig, StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -81,6 +81,11 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"File extraction is not supported for archive storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down Expand Up @@ -156,6 +161,13 @@ def handle_extract_stream_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(
f"Stream extraction is not supported for archive storage type: {storage_type}."
)
return -1

container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys
from pathlib import Path

from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
Expand Down Expand Up @@ -57,6 +59,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def validate_and_load_config_file(
"""
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_logs_dir()
return clp_config
except Exception:
Expand Down Expand Up @@ -207,7 +207,7 @@ def handle_extract_file_cmd(
list_path = parsed_args.files_from

logs_dir = clp_config.logs_directory
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def main(argv):
return -1

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,31 @@ def create_and_monitor_job_in_db(
logger.error(f"job {job_id} finished with unexpected status: {job_status}")


async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)
def get_worker_connection_handler(raw_output: bool):
async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
unpacker = msgpack.Unpacker()
while True:
# Read some data from the worker and feed it to msgpack
buf = await reader.read(1024)
if b"" == buf:
# Worker closed
return
unpacker.feed(buf)

# Print out any messages we can decode in the form of ORIG_PATH: MSG
for unpacked in unpacker:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()
# Print out any messages we can decode in the form of ORIG_PATH: MSG, or simply MSG
# if raw output is enabled.
for unpacked in unpacker:
if raw_output:
print(f"{unpacked[1]}", end="")
else:
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
writer.close()

return worker_connection_handler


async def do_search_without_aggregation(
Expand All @@ -112,6 +119,7 @@ async def do_search_without_aggregation(
end_timestamp: int | None,
ignore_case: bool,
path_filter: str | None,
raw_output: bool,
):
ip_list = socket.gethostbyname_ex(socket.gethostname())[2]
if len(ip_list) == 0:
Expand All @@ -125,7 +133,7 @@ async def do_search_without_aggregation(
break

server = await asyncio.start_server(
client_connected_cb=worker_connection_handler,
client_connected_cb=get_worker_connection_handler(raw_output),
host=host,
port=0,
family=socket.AF_INET,
Expand Down Expand Up @@ -184,6 +192,7 @@ async def do_search(
path_filter: str | None,
do_count_aggregation: bool | None,
count_by_time_bucket_size: int | None,
raw_output: bool,
):
if do_count_aggregation is None and count_by_time_bucket_size is None:
await do_search_without_aggregation(
Expand All @@ -195,6 +204,7 @@ async def do_search(
end_timestamp,
ignore_case,
path_filter,
raw_output,
)
else:
await run_function_in_process(
Expand Down Expand Up @@ -226,12 +236,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -250,6 +260,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

if (
Expand Down Expand Up @@ -281,6 +294,7 @@ def main(argv):
parsed_args.file_path,
parsed_args.count,
parsed_args.count_by_time,
parsed_args.raw,
)
)
except asyncio.CancelledError:
Expand Down
15 changes: 13 additions & 2 deletions components/clp-package-utils/clp_package_utils/scripts/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid

import yaml
from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -41,12 +42,12 @@ def main(argv):
args_parser.add_argument(
"--begin-time",
type=int,
help="Time range filter lower-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-time",
type=int,
help="Time range filter upper-bound (inclusive) as milliseconds" " from the UNIX epoch.",
help="Time range filter upper-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--ignore-case",
Expand All @@ -60,6 +61,9 @@ def main(argv):
type=int,
help="Count the number of results in each time span of the given size (ms).",
)
args_parser.add_argument(
"--raw", action="store_true", help="Output the search results as raw logs."
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
Expand All @@ -74,6 +78,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Search is not supported for archive storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.SEARCH))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
Expand Down Expand Up @@ -113,6 +122,8 @@ def main(argv):
if parsed_args.count_by_time is not None:
search_cmd.append("--count-by-time")
search_cmd.append(str(parsed_args.count_by_time))
if parsed_args.raw:
search_cmd.append("--raw")
cmd = container_start_cmd + search_cmd
subprocess.run(cmd, check=True)

Expand Down
Loading

0 comments on commit db45967

Please sign in to comment.