diff --git a/lib/galaxy/config/sample/datatypes_conf.xml.sample b/lib/galaxy/config/sample/datatypes_conf.xml.sample
index 96be00d60aa7..9f654de4485a 100644
--- a/lib/galaxy/config/sample/datatypes_conf.xml.sample
+++ b/lib/galaxy/config/sample/datatypes_conf.xml.sample
@@ -931,10 +931,16 @@
+
+
+
+
+
+
@@ -1241,11 +1247,17 @@
-
+
+
+
+
+
+
+
diff --git a/lib/galaxy/datatypes/media.py b/lib/galaxy/datatypes/media.py
index 5be3dea9fbb0..9da37c423913 100644
--- a/lib/galaxy/datatypes/media.py
+++ b/lib/galaxy/datatypes/media.py
@@ -5,6 +5,7 @@
import wave
from functools import lru_cache
from typing import (
+ cast,
List,
Tuple,
)
@@ -32,6 +33,97 @@ def ffprobe(path):
return data["format"], data["streams"]
+magic_number = {
+ "mp4": {
+ "offset": 4,
+ "string": ["ftypisom", "ftypmp42", "ftypMSNV"],
+ },
+ "flv": {"offset": 0, "string": ["FLV"]},
+ "mkv": {"offset": 0, "hex": ["1A 45 DF A3"]},
+ "webm": {"offset": 0, "hex": ["1A 45 DF A3"]},
+ "mov": {"offset": 4, "string": ["ftypqt", "moov"]},
+ "wav": {"offset": 8, "string": ["WAVE"]},
+ "mp3": {
+ "offset": 0,
+ "hex": [
+ "49 44 33",
+ "FF E0",
+ "FF E1",
+ "FF E2",
+ "FF E3",
+ "FF E4",
+ "FF E5",
+ "FF E6",
+ "FF E7",
+ "FF E8",
+ "FF E9",
+ "FF EA",
+ "FF EB",
+ "FF EC",
+ "FF ED",
+ "FF EE",
+ "FF EF",
+ "FF F0",
+ "FF F1",
+ "FF F2",
+ "FF F3",
+ "FF F4",
+ "FF F5",
+ "FF F6",
+ "FF F7",
+ "FF F8",
+ "FF F9",
+ "FF FA",
+ "FF FB",
+ "FF FC",
+ "FF FD",
+ "FF FE",
+ "FF FF",
+ ],
+ },
+ "ogg": {"offset": 0, "string": ["OggS"]},
+ "wma": {"offset": 0, "hex": ["30 26 B2 75"]},
+ "wmv": {"offset": 0, "hex": ["30 26 B2 75"]},
+ "avi": {"offset": 8, "string": ["AVI"]},
+ "mpg": {
+ "offset": 0,
+ "hex": [
+ "00 00 01 B0",
+ "00 00 01 B1",
+ "00 00 01 B3",
+ "00 00 01 B4",
+ "00 00 01 B5",
+ "00 00 01 B6",
+ "00 00 01 B7",
+ "00 00 01 B8",
+ "00 00 01 B9",
+ "00 00 01 BA",
+ "00 00 01 BB",
+ "00 00 01 BC",
+ "00 00 01 BD",
+ "00 00 01 BE",
+ "00 00 01 BF",
+ ],
+ },
+}
+
+
+def _get_file_format_from_magic_number(filename: str, file_ext: str):
+ with open(filename, "rb") as f:
+ f.seek(cast(int, magic_number[file_ext]["offset"]))
+ head = f.read(8)
+ if "string" in magic_number[file_ext]:
+ string_check = any(
+ head.startswith(string_code.encode("iso-8859-1"))
+ for string_code in cast(List[str], magic_number[file_ext]["string"])
+ )
+ if "hex" in magic_number[file_ext]:
+ hex_check = any(
+ head.startswith(bytes.fromhex(hex_code)) for hex_code in cast(List[str], magic_number[file_ext]["hex"])
+ )
+ return string_check or hex_check
+
+
class Audio(Binary):
MetadataElement(
name="duration",
@@ -183,8 +275,11 @@ class Mkv(Video):
def sniff(self, filename: str) -> bool:
if which("ffprobe"):
metadata, streams = ffprobe(filename)
- return "matroska" in metadata["format_name"].split(",")
- return False
+ vp_check = any(
+ stream["codec_name"] in ["av1", "vp8", "vp9"] for stream in streams if stream["codec_type"] == "video"
+ )
+ return "matroska" in metadata["format_name"].split(",") and not vp_check
+ return _get_file_format_from_magic_number(filename, "mkv")
class Mp4(Video):
@@ -200,8 +295,8 @@ class Mp4(Video):
def sniff(self, filename: str) -> bool:
if which("ffprobe"):
metadata, streams = ffprobe(filename)
- return "mp4" in metadata["format_name"].split(",")
- return False
+ return "mp4" in metadata["format_name"].split(",") and _get_file_format_from_magic_number(filename, "mp4")
+ return _get_file_format_from_magic_number(filename, "mp4")
class Flv(Video):
@@ -211,7 +306,7 @@ def sniff(self, filename: str) -> bool:
if which("ffprobe"):
metadata, streams = ffprobe(filename)
return "flv" in metadata["format_name"].split(",")
- return False
+ return _get_file_format_from_magic_number(filename, "flv")
class Mpg(Video):
@@ -221,7 +316,7 @@ def sniff(self, filename: str) -> bool:
if which("ffprobe"):
metadata, streams = ffprobe(filename)
return "mpegvideo" in metadata["format_name"].split(",")
- return False
+ return _get_file_format_from_magic_number(filename, "mpg")
class Mp3(Audio):
@@ -240,7 +335,7 @@ def sniff(self, filename: str) -> bool:
if which("ffprobe"):
metadata, streams = ffprobe(filename)
return "mp3" in metadata["format_name"].split(",")
- return False
+ return _get_file_format_from_magic_number(filename, "mp3")
class Wav(Audio):
@@ -274,8 +369,7 @@ def get_mime(self) -> str:
return "audio/wav"
def sniff(self, filename: str) -> bool:
- with wave.open(filename, "rb"):
- return True
+ return _get_file_format_from_magic_number(filename, "wav")
def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None:
"""Set the metadata for this dataset from the file contents."""
@@ -287,3 +381,68 @@ def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> N
dataset.metadata.nchannels = fd.getnchannels()
except wave.Error:
pass
+
+
+class Ogg(Audio):
+ file_ext = "ogg"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ return "ogg" in metadata["format_name"].split(",")
+ return _get_file_format_from_magic_number(filename, "ogg")
+
+
+class Webm(Video):
+ file_ext = "webm"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ vp_check = any(
+ stream["codec_name"] in ["av1", "vp8", "vp9"] for stream in streams if stream["codec_type"] == "video"
+ )
+ return "webm" in metadata["format_name"].split(",") and vp_check
+ return _get_file_format_from_magic_number(filename, "webm")
+
+
+class Mov(Video):
+ file_ext = "mov"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ return "mov" in metadata["format_name"].split(",") and _get_file_format_from_magic_number(filename, "mov")
+ return _get_file_format_from_magic_number(filename, "mov")
+
+
+class Avi(Video):
+ file_ext = "avi"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ return "avi" in metadata["format_name"].split(",")
+ return _get_file_format_from_magic_number(filename, "avi")
+
+
+class Wmv(Video):
+ file_ext = "wmv"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ is_video = "video" in [stream["codec_type"] for stream in streams]
+ return "asf" in metadata["format_name"].split(",") and is_video
+ return _get_file_format_from_magic_number(filename, "wmv")
+
+
+class Wma(Audio):
+ file_ext = "wma"
+
+ def sniff(self, filename: str) -> bool:
+ if which("ffprobe"):
+ metadata, streams = ffprobe(filename)
+ is_audio = "video" not in [stream["codec_type"] for stream in streams]
+ return "asf" in metadata["format_name"].split(",") and is_audio
+ return _get_file_format_from_magic_number(filename, "wma")