Skip to content

Commit

Permalink
WIP: markdown embed
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Oct 30, 2024
1 parent 03307f6 commit f44230d
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 59 deletions.
108 changes: 72 additions & 36 deletions lib/galaxy/managers/markdown_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,25 @@ class DynamicArguments:
"invocation_outputs": [],
"invocation_inputs": [],
}
EMBED_CAPABLE_DIRECTIVES = [
"history_dataset_name",
"history_dataset_type",
"workflow_license",
"invocation_time",
"generate_time",
"generate_galaxy_version",
"instance_access_link",
"instance_resources_link",
"instance_help_link",
"instance_support_link",
"instance_citation_link",
"instance_terms_link",
"instance_organization_link",
]

GALAXY_FLAVORED_MARKDOWN_CONTAINERS = list(VALID_ARGUMENTS.keys())
GALAXY_FLAVORED_MARKDOWN_CONTAINER_REGEX = r"(?P<container>{})".format("|".join(GALAXY_FLAVORED_MARKDOWN_CONTAINERS))
GALAXY_FLAVORED_MARKDOWN_EMBED_CONTAIN_REGEX = r"(?P<container>{})".format("|".join(EMBED_CAPABLE_DIRECTIVES))

ARG_VAL_REGEX = r"""[\w_\-]+|\"[^\"]+\"|\'[^\']+\'"""
FUNCTION_ARG = rf"\s*[\w\|]+\s*=\s*(?:{ARG_VAL_REGEX})\s*"
Expand All @@ -82,21 +99,15 @@ class DynamicArguments:
GALAXY_MARKDOWN_FUNCTION_CALL_LINE = re.compile(FUNCTION_CALL_LINE_TEMPLATE % GALAXY_FLAVORED_MARKDOWN_CONTAINER_REGEX)
WHITE_SPACE_ONLY_PATTERN = re.compile(r"^[\s]+$")

GALAXY_MARKDOWN_EMBED_FUNCTION_CALL_LINE = FUNCTION_CALL_LINE_TEMPLATE % GALAXY_FLAVORED_MARKDOWN_EMBED_CONTAIN_REGEX
GALAXY_MARKDOWN_EMBED_FUNCTION_CALL_LINE_PATT = re.compile(GALAXY_MARKDOWN_EMBED_FUNCTION_CALL_LINE)
EMBED_DIRECTIVE_REGEX = re.compile(r"\$\{galaxy\s+%s\}" % GALAXY_MARKDOWN_EMBED_FUNCTION_CALL_LINE)
EMBED_DIRECTIVE_REGEX_ANY = re.compile(r"\$\{galaxy\s+.*\}")


def validate_galaxy_markdown(galaxy_markdown, internal=True):
"""Validate the supplied markdown and throw an ValueError with reason if invalid."""

def invalid_line(template, line_no, **kwd):
if "line" in kwd:
kwd["line"] = kwd["line"].rstrip("\r\n")
raise ValueError("Invalid line %d: %s" % (line_no + 1, template.format(**kwd)))

def _validate_arg(arg_str, valid_args, line_no):
if arg_str is not None:
arg_name = arg_str.split("=", 1)[0].strip()
if arg_name not in valid_args and arg_name not in SHARED_ARGUMENTS:
invalid_line("Invalid argument to Galaxy directive [{argument}]", line_no, argument=arg_name)

expecting_container_close_for = None
last_line_no = 0
function_calls = 0
Expand All @@ -105,23 +116,33 @@ def _validate_arg(arg_str, valid_args, line_no):

expecting_container_close = expecting_container_close_for is not None
if not fenced and expecting_container_close:
invalid_line(
_invalid_line(
"[{line}] is not expected close line for [{expected_for}]",
line_no,
line=line,
expected_for=expecting_container_close_for,
)
continue
elif not fenced:
continue
first_match_any = EMBED_DIRECTIVE_REGEX_ANY.search(line)
first_match = EMBED_DIRECTIVE_REGEX.search(line)
if first_match_any:
if not first_match:
_invalid_line(
"[{line}] contains invalid template expansion",
line_no,
line=line,
)
else:
_check_func_call(first_match, line_no)
elif fenced and expecting_container_close and BLOCK_FENCE_END.match(line):
# reset
expecting_container_close_for = None
function_calls = 0
elif open_fence and GALAXY_FLAVORED_MARKDOWN_CONTAINER_LINE_PATTERN.match(line):
if expecting_container_close:
if not VALID_CONTAINER_END_PATTERN.match(line):
invalid_line(
_invalid_line(
"Invalid command close line [{line}] for [{expected_for}]",
line_no,
line=line,
Expand All @@ -139,29 +160,10 @@ def _validate_arg(arg_str, valid_args, line_no):
if func_call_match:
function_calls += 1
if function_calls > 1:
invalid_line("Only one Galaxy directive is allowed per fenced Galaxy block (```galaxy)", line_no)
container = func_call_match.group("container")
valid_args_raw = VALID_ARGUMENTS[container]
if isinstance(valid_args_raw, DynamicArguments):
continue
valid_args = cast(List[str], valid_args_raw)

first_arg_call = func_call_match.group("firstargcall")

_validate_arg(first_arg_call, valid_args, line_no)
rest = func_call_match.group("restargcalls")
while rest:
rest = rest.strip().split(",", 1)[1]
arg_match = FUNCTION_MULTIPLE_ARGS_PATTERN.match(rest)
if not arg_match:
break
first_arg_call = arg_match.group("firstargcall")
_validate_arg(first_arg_call, valid_args, line_no)
rest = arg_match.group("restargcalls")

continue
_invalid_line("Only one Galaxy directive is allowed per fenced Galaxy block (```galaxy)", line_no)
_check_func_call(func_call_match, line_no)
else:
invalid_line("Invalid embedded Galaxy markup line [{line}]", line_no, line=line)
_invalid_line("Invalid embedded Galaxy markup line [{line}]", line_no, line=line)

# Markdown unrelated to Galaxy object containers.
continue
Expand All @@ -172,6 +174,40 @@ def _validate_arg(arg_str, valid_args, line_no):
raise ValueError(msg)


def _invalid_line(template: str, line_no: int, **kwd):
if "line" in kwd:
kwd["line"] = kwd["line"].rstrip("\r\n")
raise ValueError("Invalid line %d: %s" % (line_no + 1, template.format(**kwd)))


def _validate_arg(arg_str: str, valid_args, line_no: int):
if arg_str is not None:
arg_name = arg_str.split("=", 1)[0].strip()
if arg_name not in valid_args and arg_name not in SHARED_ARGUMENTS:
_invalid_line("Invalid argument to Galaxy directive [{argument}]", line_no, argument=arg_name)


def _check_func_call(func_call_match, line_no):
container = func_call_match.group("container")
valid_args_raw = VALID_ARGUMENTS[container]
if isinstance(valid_args_raw, DynamicArguments):
return
valid_args = cast(List[str], valid_args_raw)

first_arg_call = func_call_match.group("firstargcall")

_validate_arg(first_arg_call, valid_args, line_no)
rest = func_call_match.group("restargcalls")
while rest:
rest = rest.strip().split(",", 1)[1]
arg_match = FUNCTION_MULTIPLE_ARGS_PATTERN.match(rest)
if not arg_match:
break
first_arg_call = arg_match.group("firstargcall")
_validate_arg(first_arg_call, valid_args, line_no)
rest = arg_match.group("restargcalls")


def _split_markdown_lines(markdown):
"""Yield lines of a markdown document line-by-line keeping track of fencing.
Expand Down
153 changes: 131 additions & 22 deletions lib/galaxy/managers/markdown_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import re
import shutil
import tempfile
from datetime import datetime
from typing import (
Any,
cast,
Dict,
List,
Match,
Expand Down Expand Up @@ -60,7 +62,9 @@
from galaxy.util.resources import resource_string
from galaxy.util.sanitize_html import sanitize_html
from .markdown_parse import (
EMBED_DIRECTIVE_REGEX,
GALAXY_MARKDOWN_FUNCTION_CALL_LINE,
VALID_ARGUMENTS,
validate_galaxy_markdown,
)

Expand Down Expand Up @@ -97,7 +101,19 @@ def _remap(container, line):
line = line.replace(id_match.group(), "%s=%d" % (id_match.group(1), decoded_id))
return (line, False)

def _remap_embed_container(match):
object_id: Optional[str] = None

whole_match = match.group()
if id_match := re.search(ENCODED_ID_PATTERN, whole_match):
object_id = id_match.group(2)
decoded_id = trans.security.decode_id(object_id)
whole_match = whole_match.replace(id_match.group(), "%s=%d" % (id_match.group(1), decoded_id))

return whole_match

internal_markdown = _remap_galaxy_markdown_calls(_remap, external_galaxy_markdown)
internal_markdown = _remap_galaxy_markdown_embedded_containers(_remap_embed_container, internal_markdown)
return internal_markdown


Expand All @@ -109,10 +125,6 @@ def walk(self, trans, internal_galaxy_markdown):
job_manager = JobManager(trans.app)
collection_manager = trans.app.dataset_collection_manager

def _check_object(object_id, line):
if object_id is None:
raise MalformedContents(f"Missing object identifier [{line}].")

def _remap(container, line):
line, object_id, encoded_id = self._encode_line(trans, line)
if container == "history_link":
Expand Down Expand Up @@ -235,7 +247,63 @@ def _remap_container(container, line):
line, *_ = self._encode_line(trans, line)
return self.handle_error(container, line, str(e))

def _remap_embed_container(match):
container = match.group("container")
valid_args_raw = VALID_ARGUMENTS[container]
object_id: Optional[int] = None
encoded_id: Optional[str] = None

if id_match := re.search(UNENCODED_ID_PATTERN, match.group()):
object_id = int(id_match.group(2))
encoded_id = trans.security.encode_id(object_id)
if container == "history_dataset_type":
_check_object(object_id, match.group(0))
hda = hda_manager.get_accessible(object_id, trans.user)
return hda.extension or "data"
elif container == "history_dataset_name":
_check_object(object_id, match.group(0))
hda = hda_manager.get_accessible(object_id, trans.user)
return hda.name or ""
elif container == "workflow_license":
_check_object(object_id, match.group(0))
stored_workflow = workflow_manager.get_stored_accessible_workflow(trans, encoded_id)
return _workflow_license_as_simple_markdown(stored_workflow)
elif container == "invocation_time":
_check_object(object_id, match.group(0))
invocation = workflow_manager.get_invocation(trans, object_id)
return _database_time_to_str(invocation.create_time)
elif container == "generate_time":
return now().isoformat()
elif container == "generate_galaxy_version":
version = trans.app.config.version_major
return version
elif container == "instance_access_link":
url = trans.app.config.instance_access_url
return _link_to_markdown(url)
elif container == "instance_resources_link":
url = trans.app.config.instance_resource_url
return _link_to_markdown(url)
elif container == "instance_help_link":
url = trans.app.config.helpsite_url
return _link_to_markdown(url)
elif container == "instance_support_link":
url = trans.app.config.support_url
return _link_to_markdown(url)
elif container == "instance_citation_link":
url = trans.app.config.citation_url
return _link_to_markdown(url)
elif container == "instance_terms_link":
url = trans.app.config.terms_url
return _link_to_markdown(url)
elif container == "instance_organization_link":
title = trans.app.config.organization_name
url = trans.app.config.organization_url
return _link_to_markdown(url, title)
else:
raise MalformedContents(f"Unknown embedded Galaxy Markdown directive encountered [{container}].")

export_markdown = _remap_galaxy_markdown_calls(_remap_container, internal_galaxy_markdown)
export_markdown = _remap_galaxy_markdown_embedded_containers(_remap_embed_container, export_markdown)
return export_markdown

def _encode_line(self, trans, line):
Expand Down Expand Up @@ -460,8 +528,8 @@ def handle_instance_organization_link(self, line, title, url):
pass

def handle_invocation_time(self, line, invocation):
self.ensure_rendering_data_for("invocations", invocation)["create_time"] = invocation.create_time.strftime(
"%Y-%m-%d, %H:%M:%S"
self.ensure_rendering_data_for("invocations", invocation)["create_time"] = _database_time_to_str(
invocation.create_time
)

def handle_dataset_type(self, line, hda):
Expand Down Expand Up @@ -599,15 +667,7 @@ def handle_workflow_display(self, line, stored_workflow, workflow_version: Optio
return (markdown, True)

def handle_workflow_license(self, line, stored_workflow):
# workflow_manager = self.trans.app.workflow_manager
license_manager = LicensesManager()
markdown = "*No license specified.*"
if license_id := stored_workflow.latest_workflow.license:
try:
license_metadata = license_manager.get_license_by_id(license_id)
markdown = f"[{license_metadata.name}]({license_metadata.url})"
except ObjectNotFound:
markdown = f"Unknown license ({license_id})"
markdown = _workflow_license_as_simple_markdown(stored_workflow)
return (f"\n\n{markdown}\n\n", True)

def handle_workflow_image(self, line, stored_workflow, workflow_version: Optional[int]):
Expand Down Expand Up @@ -714,15 +774,11 @@ def handle_instance_organization_link(self, line, title, url):
return self._handle_link(url, title)

def _handle_link(self, url, title=None):
if not url:
content = "*Not configured, please contact Galaxy admin*"
return (content, True)
elif not title:
title = url
return (f"[{title}]({url})", True)
content = _link_to_markdown(url, title)
return (content, True)

def handle_invocation_time(self, line, invocation):
content = literal_via_fence(invocation.create_time.strftime("%Y-%m-%d, %H:%M:%S"))
content = literal_via_fence(_database_time_to_str(invocation.create_time))
return (content, True)

def handle_dataset_name(self, line, hda):
Expand Down Expand Up @@ -953,6 +1009,36 @@ def find_non_empty_group(match):
return galaxy_markdown


def _workflow_license_as_simple_markdown(stored_workflow):
license_manager = LicensesManager()
markdown = "*No license specified.*"
if license_id := stored_workflow.latest_workflow.license:
try:
license_metadata = license_manager.get_license_by_id(license_id)
markdown = f"[{license_metadata.name}]({license_metadata.url})"
except ObjectNotFound:
markdown = f"Unknown license ({license_id})"
return markdown


def _check_object(object_id: Optional[int], line: str) -> None:
if object_id is None:
raise MalformedContents(f"Missing object identifier [{line}].")


def _database_time_to_str(database_time: datetime) -> str:
return database_time.strftime("%Y-%m-%d, %H:%M:%S")


def _link_to_markdown(url: Optional[str], title: Optional[str] = None):
if not url:
content = "*Link not configured, please contact Galaxy admin*"
return content
elif not title:
title = url
return f"[{title}]({url})"


def _remap_galaxy_markdown_containers(func, markdown):
new_markdown = markdown

Expand All @@ -979,6 +1065,29 @@ def _remap_galaxy_markdown_containers(func, markdown):
return new_markdown


def _remap_galaxy_markdown_embedded_containers(func, markdown):
new_markdown = markdown

searching_from = 0
while True:
from_markdown = new_markdown[searching_from:]
match = re.search(EMBED_DIRECTIVE_REGEX, from_markdown)
if match is not None:
replace = match.group(1)
replacement = func(match)
start_pos = match.start()
end_pos = match.end()

start_pos = start_pos + searching_from
end_pos = end_pos + searching_from
new_markdown = new_markdown[:start_pos] + replacement + new_markdown[end_pos:]
searching_from = start_pos + len(replacement)
else:
break

return new_markdown


def _parse_directive_argument_value(arg_name: str, line: str) -> Optional[str]:
arg_pattern = re.compile(rf"{arg_name}=\s*{ARG_VAL_CAPTURED_REGEX}\s*")
match = re.search(arg_pattern, line)
Expand Down
Loading

0 comments on commit f44230d

Please sign in to comment.