Skip to content

Commit

Permalink
Rewrite 'yunohost storage disk infos' using udisks
Browse files Browse the repository at this point in the history
  • Loading branch information
christophehenry committed Sep 22, 2024
1 parent a1a394f commit 905eafa
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 79 deletions.
2 changes: 1 addition & 1 deletion debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Depends: ${python3:Depends}, ${misc:Depends}
, python3-miniupnpc, python3-dbus, python3-jinja2
, python3-toml, python3-packaging, python3-publicsuffix2
, python3-ldap, python3-zeroconf (>= 0.36), python3-lexicon,
, python3-pyudev
, python3-pyudev, udisks2,
, python-is-python3
, nginx, nginx-extras (>=1.18)
, apt, apt-transport-https, apt-utils, aptitude, dirmngr
Expand Down
166 changes: 100 additions & 66 deletions src/disks.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import operator
from collections import OrderedDict
import dataclasses
from glob import glob
from typing import Optional
from typing import Optional, Any

import pyudev
import psutil
import dbus

from moulinette.utils.log import getActionLogger

from yunohost.utils import bytearray_to_string

from yunohost.utils.disks import filter_device
logger = getActionLogger("yunohost.storage")


logger = getActionLogger("yunohost.storage")
UDISK_DRIVE_PATH = "/org/freedesktop/UDisks2/drives/"
UDISK_BLOCK_PATH = "/org/freedesktop/UDisks2/block_devices/"
UDISK_PART_TABLE_IFC = "org.freedesktop.UDisks2.PartitionTable"
UDISK_BLOCK_IFC = "org.freedesktop.UDisks2.Block"
UDISK_DRIVE_IFC = "org.freedesktop.UDisks2.Drive"
UDISK_ENCRYPTED_IFC = "org.freedesktop.UDisks2.Encrypted"
UDISK_FILESYSTEM_IFC = "org.freedesktop.UDisks2.Filesystem"


@dataclasses.dataclass
Expand All @@ -22,39 +29,6 @@ class DiskParts:
encrypted: bool
mountpoint: str

@staticmethod
def from_parent_device(device: pyudev.Device, partitions):
result = OrderedDict()
for child_dev in sorted(
filter(filter_device, device.children), key=lambda it: it.device_node
):
encrypted_provider = glob(f"/sys/block/dm-*/slaves/{child_dev.sys_name}")
if encrypted_provider:
# retrive the dm-x part
dm = encrypted_provider[0].split("/")[3]
enc_dev = pyudev.Devices.from_name(device.context, "block", dm)
# This work for LUKS, what about other partition mecanisms?
partname = f"/dev/mapper/{enc_dev.properties['DM_NAME']}"
encrypted = True
else:
partname = child_dev.device_node
encrypted = False

if partname not in partitions:
logger.warning(
f"{child_dev.device_node} not found by 'psutil.disk_partitions'"
)
continue

result[child_dev.sys_name] = DiskParts(
devname=device.device_node,
filesystem=partitions[partname].fstype,
encrypted=encrypted,
mountpoint=partitions[partname].mountpoint,
)

return result


@dataclasses.dataclass
class DiskInfos:
Expand All @@ -63,39 +37,99 @@ class DiskInfos:
serial: str
size: int
links: list[str]
partitions: Optional[list[DiskParts]]

@staticmethod
def from_device(device, partitions):
try:
dev_size = device.attributes.asint("size")
except (AttributeError, UnicodeError, ValueError):
dev_size = None

dev_links = list(sorted(it for it in device.device_links))
child_parts = DiskParts.from_parent_device(device, partitions)

return DiskInfos(
devname=device.device_node,
model=device.get("ID_MODEL", None),
serial=device.get("ID_SERIAL_SHORT", None),
size=dev_size,
links=dev_links,
partitions=child_parts or None,
)
partitions: Optional[dict[str, DiskParts]]


def infos():
context = pyudev.Context()
partitions = {it.device: it for it in psutil.disk_partitions()}
result = OrderedDict()

for it in sorted(
filter(filter_device, context.list_devices(subsystem="block", DEVTYPE="disk")),
key=lambda it: it.device_node,
):
result[it.sys_name] = dataclasses.asdict(
DiskInfos.from_device(it, partitions), dict_factory=OrderedDict
bus = dbus.SystemBus()
manager = bus.get_object("org.freedesktop.UDisks2", "/org/freedesktop/UDisks2")

drives = {}
devices = {}
partitions = {}

for k, v in manager.get_dbus_method(
"GetManagedObjects", "org.freedesktop.DBus.ObjectManager"
)().items():
if k.startswith(UDISK_DRIVE_PATH):
# These are hard drives
drives[k.removeprefix(UDISK_DRIVE_PATH)] = v
elif UDISK_PART_TABLE_IFC in v:
# These are block container partition tables (/dev/sda, /dev/sdb, etc.)
devices[k.removeprefix(UDISK_BLOCK_PATH)] = v
elif UDISK_BLOCK_IFC in v:
# These are partitions (/dev/sda1, /dev/dm-1, etc.). Here, we try to
# associate partitions with as much keys as possible to easier search
# These will be, for instance sdb1 and /dev/sdb1, dm-1 and /dev/dm-1, etc.
_dev = bytearray_to_string(v[UDISK_BLOCK_IFC]["Device"])
_pref_dev = bytearray_to_string(v[UDISK_BLOCK_IFC]["PreferredDevice"])
partitions[_dev] = partitions[_dev.split("/")[-1]] = v
partitions[_pref_dev] = partitions[_pref_dev.split("/")[-1]] = v
partitions[k.removeprefix(UDISK_BLOCK_PATH)] = v

for key, device in sorted(devices.items(), key=operator.itemgetter(0)):
drive = drives[device[UDISK_BLOCK_IFC]["Drive"].removeprefix(UDISK_DRIVE_PATH)][
UDISK_DRIVE_IFC
]
devname = bytearray_to_string(device[UDISK_BLOCK_IFC]["Device"])

device_partitions = OrderedDict()

for partition_key in map(
lambda p: p.removeprefix(UDISK_BLOCK_PATH),
sorted(device[UDISK_PART_TABLE_IFC]["Partitions"]),
):
partition_obj = partitions[partition_key]
partition_devname = bytearray_to_string(
partition_obj[UDISK_BLOCK_IFC]["Device"]
)
encrypted = False

if UDISK_ENCRYPTED_IFC in partition_obj:
encrypted = True
partition_obj = partitions[
partition_obj[UDISK_ENCRYPTED_IFC]["CleartextDevice"].removeprefix(
UDISK_BLOCK_PATH
)
]
else:
# If partition is a device mapper, it's not easy to associate the
# virtual device with its underlying FS. If we can find an actual
# partition (i.e. sda5) in /sys/block/dm-*/slaves/, we can then
# search the FS using the corresponding dm-X in the partitions dict.
mapper = glob(f"/sys/block/dm-*/slaves/{partition_key}")
if mapper and (mapper_key := mapper[0].split("/")[3]) in partitions:
partition_obj = partitions[mapper_key]

block = partition_obj[UDISK_BLOCK_IFC]

if UDISK_FILESYSTEM_IFC in partition_obj:
device_partitions[partition_key] = DiskParts(
devname=partition_devname,
filesystem=block["IdType"],
encrypted=encrypted,
mountpoint=bytearray_to_string(
partition_obj[UDISK_FILESYSTEM_IFC]["MountPoints"][0]
),
)

result[key] = dataclasses.asdict(
DiskInfos(
devname=devname,
model=drive["Model"],
serial=drive["Serial"],
size=drive["Size"],
links=list(
sorted(
bytearray_to_string(it)
for it in device[UDISK_BLOCK_IFC]["Symlinks"]
)
),
partitions=device_partitions or None,
),
dict_factory=OrderedDict,
)

return result
4 changes: 4 additions & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#


def bytearray_to_string(ba):
return bytearray(ba).decode("utf-8").removesuffix("\x00")
12 changes: 0 additions & 12 deletions src/utils/disks.py

This file was deleted.

0 comments on commit 905eafa

Please sign in to comment.