Skip to content

Commit

Permalink
feat(opensearch): fetch live logs from OpenSearch (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh authored and tiborsimko committed Oct 21, 2024
1 parent e36c6a2 commit ca38c69
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 40 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
Expand Down
4 changes: 4 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@
"description": "Request succeeded. Info about workflow, including the status is returned.",
"examples": {
"application/json": {
"live_logs_enabled": false,
"logs": "{'workflow_logs': string, 'job_logs': { '256b25f4-4cfb-4684-b7a8-73872ef455a2': string, '256b25f4-4cfb-4684-b7a8-73872ef455a3': string, }, 'engine_specific': object, }",
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
Expand All @@ -695,6 +696,9 @@
},
"schema": {
"properties": {
"live_logs_enabled": {
"type": "boolean"
},
"logs": {
"type": "string"
},
Expand Down
30 changes: 30 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
)
"""Common to all workflow engines environment variables for debug mode."""

REANA_OPENSEARCH_ENABLED = (
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
)
"""OpenSearch enabled flag."""

REANA_OPENSEARCH_HOST = os.getenv(
"REANA_OPENSEARCH_HOST", "reana-opensearch-master.default.svc.cluster.local"
)
"""OpenSearch host."""

REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
"""OpenSearch port."""

REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
"""OpenSearch URL prefix."""

REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
"""OpenSearch user."""

REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
"""OpenSearch password."""

REANA_OPENSEARCH_USE_SSL = (
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
)
"""OpenSearch SSL flag."""

REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
"""OpenSearch CA certificates."""


def _parse_interactive_sessions_environments(env_var):
config = {}
Expand Down
176 changes: 176 additions & 0 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""OpenSearch client and log fetcher."""

import logging
from opensearchpy import OpenSearch

from reana_workflow_controller.config import (
REANA_OPENSEARCH_CA_CERTS,
REANA_OPENSEARCH_HOST,
REANA_OPENSEARCH_PASSWORD,
REANA_OPENSEARCH_PORT,
REANA_OPENSEARCH_URL_PREFIX,
REANA_OPENSEARCH_USE_SSL,
REANA_OPENSEARCH_USER,
REANA_OPENSEARCH_ENABLED,
)


def build_opensearch_client(
host: str = REANA_OPENSEARCH_HOST,
port: str = REANA_OPENSEARCH_PORT,
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
) -> OpenSearch:
"""
Build an OpenSearch client object.
:param host: OpenSearch host.
:param port: OpenSearch port.
:param url_prefix: URL prefix.
:param http_auth: HTTP authentication credentials.
:param use_ssl: Use SSL/TLS for connection.
:param ca_certs: Path to CA certificates.
:return: OpenSearch client object.
"""
opensearch_client = OpenSearch(
hosts=f"{host}:{port}",
http_compress=True, # enables gzip compression for request bodies
http_auth=http_auth,
use_ssl=use_ssl,
ca_certs=ca_certs,
url_prefix=url_prefix,
verify_certs=True,
)
return opensearch_client


class OpenSearchLogFetcher(object):
"""Retrieves job and workflow logs from OpenSearch API."""

def __init__(
self,
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
timeout: int = 5,
) -> None:
"""
Initialize the OpenSearchLogFetcher object.
:param os_client: OpenSearch client object.
:param job_index: Index name for job logs.
:param workflow_index: Index name for workflow logs.
:param max_rows: Maximum number of rows to fetch.
:param log_key: Key for log message in the response.
:param order: Order of logs (asc/desc).
:param job_log_matcher: Job log matcher.
:param workflow_log_matcher: Workflow log matcher.
:param timeout: Timeout for OpenSearch queries.
:return: None
"""
if os_client is None:
os_client = build_opensearch_client()

self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
"""
Fetch logs of a specific job or workflow.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:return: Job or workflow logs.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
return None

return self._concat_rows(response["hits"]["hits"])

def fetch_job_logs(self, backend_job_id: str) -> str:
"""
Fetch logs of a specific job.
:param backend_job_id: Job ID.
:return: Job logs.
"""
return self.fetch_logs(
backend_job_id,
self.job_index,
self.job_log_matcher,
)

def fetch_workflow_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of a specific workflow.
:param workflow_id: Workflow ID.
:return: Workflow logs.
"""
return self.fetch_logs(
workflow_id,
self.workflow_index,
self.workflow_log_matcher,
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
:param rows: List of rows.
:return: Concatenated log messages.
"""
logs = ""

for hit in rows:
logs += hit["_source"][self.log_key] + "\n"

return logs


def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None:
"""
Build OpenSearchLogFetcher object.
:return: OpenSearchLogFetcher object.
"""
return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None
15 changes: 13 additions & 2 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool:

def build_workflow_logs(workflow, steps=None, paginate=None):
"""Return the logs for all jobs of a workflow."""
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher

query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
if steps:
query = query.filter(Job.job_name.in_(steps))
Expand All @@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
finished_at = (
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
"job_name": job.job_name or "",
Expand All @@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
"docker_img": job.docker_img or "",
"cmd": job.prettified_cmd or "",
"status": job.status.name or "",
"logs": job.logs or "",
"logs": logs or job.logs or "",
"started_at": started_at,
"finished_at": finished_at,
}
Expand Down
23 changes: 20 additions & 3 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022 CERN.
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -16,6 +16,7 @@
from reana_commons.errors import REANASecretDoesNotExist
from reana_db.utils import _get_workflow_with_uuid_or_name

from reana_workflow_controller.config import REANA_OPENSEARCH_ENABLED
from reana_workflow_controller.errors import (
REANAExternalCallError,
REANAWorkflowControllerError,
Expand Down Expand Up @@ -100,6 +101,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
type: string
user:
type: string
live_logs_enabled:
type: boolean
examples:
application/json:
{
Expand All @@ -112,7 +115,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
},
'engine_specific': object,
}",
"user": "00000000-0000-0000-0000-000000000000"
"user": "00000000-0000-0000-0000-000000000000",
"live_logs_enabled": false
}
400:
description: >-
Expand Down Expand Up @@ -150,8 +154,20 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
"engine_specific": None,
}
else:
from reana_workflow_controller.opensearch import (
build_opensearch_log_fetcher,
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_workflow_logs(workflow.id_)
if open_search_log_fetcher
else None
)

workflow_logs = {
"workflow_logs": workflow.logs,
"workflow_logs": logs or workflow.logs,
"job_logs": build_workflow_logs(workflow, paginate=paginate),
"engine_specific": workflow.engine_specific,
}
Expand All @@ -162,6 +178,7 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
"workflow_name": get_workflow_name(workflow),
"logs": json.dumps(workflow_logs),
"user": user_uuid,
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
}
),
200,
Expand Down
Loading

0 comments on commit ca38c69

Please sign in to comment.