diff --git a/lib/galaxy/config/sample/datatypes_conf.xml.sample b/lib/galaxy/config/sample/datatypes_conf.xml.sample index 96be00d60aa7..9f654de4485a 100644 --- a/lib/galaxy/config/sample/datatypes_conf.xml.sample +++ b/lib/galaxy/config/sample/datatypes_conf.xml.sample @@ -931,10 +931,16 @@ + + + + + + @@ -1241,11 +1247,17 @@ - + + + + + + + diff --git a/lib/galaxy/datatypes/media.py b/lib/galaxy/datatypes/media.py index 5be3dea9fbb0..9da37c423913 100644 --- a/lib/galaxy/datatypes/media.py +++ b/lib/galaxy/datatypes/media.py @@ -5,6 +5,7 @@ import wave from functools import lru_cache from typing import ( + cast, List, Tuple, ) @@ -32,6 +33,97 @@ def ffprobe(path): return data["format"], data["streams"] +magic_number = { + "mp4": { + "offset": 4, + "string": ["ftypisom", "ftypmp42", "ftypMSNV"], + }, + "flv": {"offset": 0, "string": ["FLV"]}, + "mkv": {"offset": 0, "hex": ["1A 45 DF A3"]}, + "webm": {"offset": 0, "hex": ["1A 45 DF A3"]}, + "mov": {"offset": 4, "string": ["ftypqt", "moov"]}, + "wav": {"offset": 8, "string": ["WAVE"]}, + "mp3": { + "offset": 0, + "hex": [ + "49 44 33", + "FF E0", + "FF E1", + "FF E2", + "FF E3", + "FF E4", + "FF E5", + "FF E6", + "FF E7", + "FF E8", + "FF E9", + "FF EA", + "FF EB", + "FF EC", + "FF ED", + "FF EE", + "FF EF", + "FF F0", + "FF F1", + "FF F2", + "FF F3", + "FF F4", + "FF F5", + "FF F6", + "FF F7", + "FF F8", + "FF F9", + "FF FA", + "FF FB", + "FF FC", + "FF FD", + "FF FE", + "FF FF", + ], + }, + "ogg": {"offset": 0, "string": ["OggS"]}, + "wma": {"offset": 0, "hex": ["30 26 B2 75"]}, + "wmv": {"offset": 0, "hex": ["30 26 B2 75"]}, + "avi": {"offset": 8, "string": ["AVI"]}, + "mpg": { + "offset": 0, + "hex": [ + "00 00 01 B0", + "00 00 01 B1", + "00 00 01 B3", + "00 00 01 B4", + "00 00 01 B5", + "00 00 01 B6", + "00 00 01 B7", + "00 00 01 B8", + "00 00 01 B9", + "00 00 01 BA", + "00 00 01 BB", + "00 00 01 BC", + "00 00 01 BD", + "00 00 01 BE", + "00 00 01 BF", + ], + }, +} + + +def _get_file_format_from_magic_number(filename: str, file_ext: str): + with open(filename, "rb") as f: + f.seek(cast(int, magic_number[file_ext]["offset"])) + head = f.read(8) + if "string" in magic_number[file_ext]: + string_check = any( + head.startswith(string_code.encode("iso-8859-1")) + for string_code in cast(List[str], magic_number[file_ext]["string"]) + ) + if "hex" in magic_number[file_ext]: + hex_check = any( + head.startswith(bytes.fromhex(hex_code)) for hex_code in cast(List[str], magic_number[file_ext]["hex"]) + ) + return string_check or hex_check + + class Audio(Binary): MetadataElement( name="duration", @@ -183,8 +275,11 @@ class Mkv(Video): def sniff(self, filename: str) -> bool: if which("ffprobe"): metadata, streams = ffprobe(filename) - return "matroska" in metadata["format_name"].split(",") - return False + vp_check = any( + stream["codec_name"] in ["av1", "vp8", "vp9"] for stream in streams if stream["codec_type"] == "video" + ) + return "matroska" in metadata["format_name"].split(",") and not vp_check + return _get_file_format_from_magic_number(filename, "mkv") class Mp4(Video): @@ -200,8 +295,8 @@ class Mp4(Video): def sniff(self, filename: str) -> bool: if which("ffprobe"): metadata, streams = ffprobe(filename) - return "mp4" in metadata["format_name"].split(",") - return False + return "mp4" in metadata["format_name"].split(",") and _get_file_format_from_magic_number(filename, "mp4") + return _get_file_format_from_magic_number(filename, "mp4") class Flv(Video): @@ -211,7 +306,7 @@ def sniff(self, filename: str) -> bool: if which("ffprobe"): metadata, streams = ffprobe(filename) return "flv" in metadata["format_name"].split(",") - return False + return _get_file_format_from_magic_number(filename, "flv") class Mpg(Video): @@ -221,7 +316,7 @@ def sniff(self, filename: str) -> bool: if which("ffprobe"): metadata, streams = ffprobe(filename) return "mpegvideo" in metadata["format_name"].split(",") - return False + return _get_file_format_from_magic_number(filename, "mpg") class Mp3(Audio): @@ -240,7 +335,7 @@ def sniff(self, filename: str) -> bool: if which("ffprobe"): metadata, streams = ffprobe(filename) return "mp3" in metadata["format_name"].split(",") - return False + return _get_file_format_from_magic_number(filename, "mp3") class Wav(Audio): @@ -274,8 +369,7 @@ def get_mime(self) -> str: return "audio/wav" def sniff(self, filename: str) -> bool: - with wave.open(filename, "rb"): - return True + return _get_file_format_from_magic_number(filename, "wav") def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None: """Set the metadata for this dataset from the file contents.""" @@ -287,3 +381,68 @@ def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> N dataset.metadata.nchannels = fd.getnchannels() except wave.Error: pass + + +class Ogg(Audio): + file_ext = "ogg" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + return "ogg" in metadata["format_name"].split(",") + return _get_file_format_from_magic_number(filename, "ogg") + + +class Webm(Video): + file_ext = "webm" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + vp_check = any( + stream["codec_name"] in ["av1", "vp8", "vp9"] for stream in streams if stream["codec_type"] == "video" + ) + return "webm" in metadata["format_name"].split(",") and vp_check + return _get_file_format_from_magic_number(filename, "webm") + + +class Mov(Video): + file_ext = "mov" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + return "mov" in metadata["format_name"].split(",") and _get_file_format_from_magic_number(filename, "mov") + return _get_file_format_from_magic_number(filename, "mov") + + +class Avi(Video): + file_ext = "avi" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + return "avi" in metadata["format_name"].split(",") + return _get_file_format_from_magic_number(filename, "avi") + + +class Wmv(Video): + file_ext = "wmv" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + is_video = "video" in [stream["codec_type"] for stream in streams] + return "asf" in metadata["format_name"].split(",") and is_video + return _get_file_format_from_magic_number(filename, "wmv") + + +class Wma(Audio): + file_ext = "wma" + + def sniff(self, filename: str) -> bool: + if which("ffprobe"): + metadata, streams = ffprobe(filename) + is_audio = "video" not in [stream["codec_type"] for stream in streams] + return "asf" in metadata["format_name"].split(",") and is_audio + return _get_file_format_from_magic_number(filename, "wma")