Skip to content

Commit

Permalink
Fixes and large enhancements
Browse files Browse the repository at this point in the history
Highlights:
- added 4K sectors support in VHDX images
- FAT format code merged into a single fat_mkfs function
- 128K and 256K clusters allowed with 4K sectors (FAT)
- really working cylinder alignment for new partitions
- partition type code is now correctly updated in MBR after format
- better compatibility switches with traditional DOS/Windows FORMAT

disk.py:
- new type() member returns a string type name

exFAT.py:
- VBR checksum is checked at boot sector from disk initialization

FAT.py:
- mostly cosmetic changes in some member names
- FAT() initialization accepts and uses disk sector size

gptutils.py:
- pack() honours sector size

mkfat.py:
- new single fat_mkfs function for all FAT12/16/32 formats with better documentation
- support for 4K sectors
- simplified cluster auto-selection (FAT)
- enhanced CHS geometry handling
- return codes and optional messages
- better FAT media byte handling
- duplicated format app tool (__main__) is removed from here

partutils.py:
- LBA-CHS conversions moved in utils.py
- new (definitive?) partition type code calculator
- 4K sector support
- partitioning algorithms revised to do proper cylinder (old DOS) or MB alignment, and to honor disk geometry whenever possible

scripts/mkfat.py:
- new options for FAT tuning (FAT32 limits, FAT12 exclusion with HDDs, no 64K clusters in old systems)
- partition type code is updated in MBR according to applied format

scripts/mkvdisk.py:
- added 4K sectors support (VHDX)

utils.py:
- LBA-CHS conversions moved from partutils.py

vdi/vhd(x)/vmdkutils.py:
- added 4K sectors support (actually effective for VHDX only)

vhdutils.py:
- removed block alignment check in BAT (Windows does not care, and blocks could be unaligned in some scenarios)
- parent's timestamp is checked no more (like Windows 11)
- fixed a bug in dwMaxTableEntries calculation
- fixed a bug in parent timestamp assignment
- fixed a bug in parent's locator encoding

Volume.py:
- added 4K sectors support
- copy_tree_in can optionally convert file names in upper case passing uppercase=1
  • Loading branch information
maxpat78 committed Jun 19, 2023
1 parent 65ee8ed commit 460a946
Show file tree
Hide file tree
Showing 16 changed files with 760 additions and 878 deletions.
36 changes: 18 additions & 18 deletions FATtools/FAT.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ class boot_fat32(object):
0x03: ('chOemID', '8s'),
0x0B: ('wBytesPerSector', '<H'),
0x0D: ('uchSectorsPerCluster', 'B'),
0x0E: ('wSectorsCount', '<H'), # reserved sectors (min 32?)
0x0E: ('wReservedSectors', '<H'), # reserved sectors before 1st FAT (min 9)
0x10: ('uchFATCopies', 'B'),
0x11: ('wMaxRootEntries', '<H'),
0x13: ('wTotalSectors', '<H'),
0x15: ('uchMediaDescriptor', 'B'),
0x13: ('wTotalSectors', '<H'), # volume sectors if < 65536, or zero
0x15: ('uchMediaDescriptor', 'B'), # F8h HDD, F0h=1.44M floppy, F9h=720K floppy
0x16: ('wSectorsPerFAT', '<H'), # not used, see 24h instead
0x18: ('wSectorsPerTrack', '<H'),
0x1A: ('wHeads', '<H'),
0x1C: ('wHiddenSectors', '<H'),
0x1E: ('wTotalHiddenSectors', '<H'),
0x20: ('dwTotalLogicalSectors', '<I'),
0x1C: ('dwHiddenSectors', '<I'), # disk sectors preceding this boot sector
0x20: ('dwTotalSectors', '<I'), # volume sectors if > 65535, or zero
0x24: ('dwSectorsPerFAT', '<I'),
0x28: ('wMirroringFlags', '<H'), # bits 0-3: active FAT, it bit 7 set; else: mirroring as usual
0x2A: ('wVersion', '<H'),
Expand Down Expand Up @@ -69,11 +68,11 @@ def __init2__(self):
# Cluster size (bytes)
self.cluster = self.wBytesPerSector * self.uchSectorsPerCluster
# Offset of the 1st FAT copy
self.fatoffs = self.wSectorsCount * self.wBytesPerSector + self._pos
self.fatoffs = self.wReservedSectors * self.wBytesPerSector + self._pos
# Data area offset (=cluster #2)
self.dataoffs = self.fatoffs + self.uchFATCopies * self.dwSectorsPerFAT * self.wBytesPerSector + self._pos
# Number of clusters represented in this FAT (if valid buffer)
self.fatsize = self.dwTotalLogicalSectors//self.uchSectorsPerCluster
self.fatsize = self.dwTotalSectors//self.uchSectorsPerCluster
if self.stream:
self.fsinfo = fat32_fsinfo(stream=self.stream, offset=self.wFSISector*self.cluster)
else:
Expand All @@ -94,7 +93,7 @@ def pack(self):
def clusters(self):
"Returns the number of clusters in the data area"
# Total sectors minus sectors preceding the data area
return (self.dwTotalLogicalSectors - (self.dataoffs//self.wBytesPerSector)) // self.uchSectorsPerCluster
return (self.dwTotalSectors - (self.dataoffs//self.wBytesPerSector)) // self.uchSectorsPerCluster

def cl2offset(self, cluster):
"Returns the real offset of a cluster"
Expand Down Expand Up @@ -150,18 +149,18 @@ class boot_fat16(object):
0x03: ('chOemID', '8s'),
0x0B: ('wBytesPerSector', '<H'),
0x0D: ('uchSectorsPerCluster', 'B'),
0x0E: ('wSectorsCount', '<H'),
0x0E: ('wReservedSectors', '<H'), # reserved sectors before 1st FAT (min 1, the boot; Windows often defaults to 8)
0x10: ('uchFATCopies', 'B'),
0x11: ('wMaxRootEntries', '<H'),
0x13: ('wTotalSectors', '<H'),
0x15: ('uchMediaDescriptor', 'B'),
0x16: ('wSectorsPerFAT', '<H'), #DWORD in FAT32
0x18: ('wSectorsPerTrack', '<H'),
0x1A: ('wHeads', '<H'),
0x1C: ('dwHiddenSectors', '<I'), # Here differs from FAT32
0x20: ('dwTotalLogicalSectors', '<I'),
0x1C: ('dwHiddenSectors', '<I'),
0x20: ('dwTotalSectors', '<I'),
0x24: ('chPhysDriveNumber', 'B'),
0x25: ('uchCurrentHead', 'B'),
0x25: ('uchCurrentHead', 'B'), # unused
0x26: ('uchSignature', 'B'), # 0x28 or 0x29
0x27: ('dwVolumeID', '<I'),
0x2B: ('sVolumeLabel', '11s'),
Expand All @@ -185,10 +184,10 @@ def __init2__(self):
# Cluster size (bytes)
self.cluster = self.wBytesPerSector * self.uchSectorsPerCluster
# Offset of the 1st FAT copy
self.fatoffs = self.wSectorsCount * self.wBytesPerSector + self._pos
self.fatoffs = self.wReservedSectors * self.wBytesPerSector + self._pos
# Number of clusters represented in this FAT
# Here the DWORD field seems to be set only if WORD one is too small
self.fatsize = (self.dwTotalLogicalSectors or self.wTotalSectors)//self.uchSectorsPerCluster
self.fatsize = (self.dwTotalSectors or self.wTotalSectors)//self.uchSectorsPerCluster
# Offset of the fixed root directory table (immediately after the FATs)
self.rootoffs = self.fatoffs + self.uchFATCopies * self.wSectorsPerFAT * self.wBytesPerSector + self._pos
# Data area offset (=cluster #2)
Expand All @@ -211,7 +210,7 @@ def pack(self):
def clusters(self):
"Returns the number of clusters in the data area"
# Total sectors minus sectors preceding the data area
return ((self.dwTotalLogicalSectors or self.wTotalSectors) - (self.dataoffs//self.wBytesPerSector)) // self.uchSectorsPerCluster
return ((self.dwTotalSectors or self.wTotalSectors) - (self.dataoffs//self.wBytesPerSector)) // self.uchSectorsPerCluster

def cl2offset(self, cluster):
"Returns the real offset of a cluster"
Expand All @@ -230,13 +229,14 @@ def fat(self, fatcopy=0):
# NOTE: limit decoded dictionary size! Zero or {}.popitem()?
class FAT(object):
"Decodes a FAT (12, 16, 32 o EX) table on disk"
def __init__ (self, stream, offset, clusters, bitsize=32, exfat=0):
def __init__ (self, stream, offset, clusters, bitsize=32, exfat=0, sector=512):
self.sector = sector # physical sector size
self.stream = stream
self.size = clusters # total clusters in the data area (max = 2^x - 11)
self.bits = bitsize # cluster slot bits (12, 16 or 32)
self.offset = offset # relative FAT offset (1st copy)
# CAVE! This accounts the 0-1 unused cluster index?
self.offset2 = offset + (((clusters*bitsize+7)//8)+511)//512*512 # relative FAT offset (2nd copy)
self.offset2 = offset + (((clusters*bitsize+7)//8)+(self.sector-1))//self.sector*self.sector # relative FAT offset (2nd copy)
self.exfat = exfat # true if exFAT (aka FAT64)
self.reserved = 0x0FF7
self.bad = 0x0FF7
Expand Down
42 changes: 24 additions & 18 deletions FATtools/Volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
import os, time, sys, re, glob, fnmatch
DEBUG=int(os.getenv('FATTOOLS_DEBUG', '0'))
from io import BytesIO
from FATtools import disk, utils, FAT, exFAT, partutils
from FATtools import vhdutils, vhdxutils, vdiutils, vmdkutils
from FATtools import disk, utils, FAT, exFAT
from FATtools.partutils import MBR, GPT
from FATtools import utils, vhdutils, vhdxutils, vdiutils, vmdkutils
from FATtools.debug import log



def vopen(path, mode='rb', what='auto'):
"""Opens a disk, partition or volume according to 'what' parameter: 'auto'
selects the volume in the first partition or disk; 'disk' selects the raw disk;
'partitionN' tries to open partition number N; 'volume' tries to open a file
system. 'path' can be: 1) a file or device path; 2) a FATtools disk or virtual
disk object; 3) a BytesIO object if mode is 'ramdisk'."""
if DEBUG&2: log("vopen in '%s' mode", what)
PHYS_SECTOR = 512 # defualt physical sector size
if type(path) in (disk.disk, vhdutils.Image, vhdxutils.Image, vdiutils.Image, vmdkutils.Image, BytesIO):
if isinstance(path, BytesIO):
# Opens a Ram Disk with a BytesIO object
Expand All @@ -38,18 +39,19 @@ def vopen(path, mode='rb', what='auto'):
d = vhdutils.Image(path, mode)
elif path.lower().endswith('.vhdx'): # VHDX image
d = vhdxutils.Image(path, mode)
PHYS_SECTOR = d.metadata.physical_sector_size
elif path.lower().endswith('.vdi'): # VDI image
d = vdiutils.Image(path, mode)
elif path.lower().endswith('.vmdk'): # VMDK image
d = vmdkutils.Image(path, mode)
else:
d = disk.disk(path, mode) # disk or disk image
if DEBUG&2: log("Opened disk: %s", d)
if DEBUG&2: log("Opened disk type '%s', size %Xh (%Xh sectors) ", d, d.size, d.size//PHYS_SECTOR)
d.seek(0)
if what == 'disk':
return d
# Tries to access a partition
mbr = partutils.MBR(d.read(512), disksize=d.size)
mbr = MBR(d.read(PHYS_SECTOR), disksize=d.size)
if DEBUG&2: log("Opened MBR: %s", mbr)
valid_mbr=1
n = mbr.partitions[0].size()
Expand Down Expand Up @@ -80,21 +82,21 @@ def vopen(path, mode='rb', what='auto'):
return 'EINVMBR'
# Tries to open MBR or GPT partition
if DEBUG&2: log("Ok, valid MBR")
partition=0
partition=0 # Windows 11 makes a reserved part first
if what.startswith('partition'):
partition = int(re.match('partition(\d+)', what).group(1))
if DEBUG&2: log("Trying to open partition #%d", partition)
part = None
if mbr.partitions[0].bType == 0xEE: # GPT
d.seek(512)
gpt = partutils.GPT(d.read(512), 512)
d.seek(PHYS_SECTOR)
gpt = GPT(d.read(PHYS_SECTOR), PHYS_SECTOR)
if DEBUG&2: log("Opened GPT Header: %s", gpt)
d.seek(gpt.u64PartitionEntryLBA*512)
d.seek(gpt.u64PartitionEntryLBA*PHYS_SECTOR)
blk = d.read(gpt.dwNumberOfPartitionEntries * gpt.dwNumberOfPartitionEntries)
gpt.parse(blk)
blocks = gpt.partitions[partition].u64EndingLBA - gpt.partitions[partition].u64StartingLBA + 1
if DEBUG&2: log("Opening Partition #%d: %s", partition, gpt.partitions[partition])
part = disk.partition(d, gpt.partitions[partition].u64StartingLBA*512, blocks*512)
part = disk.partition(d, gpt.partitions[partition].u64StartingLBA*PHYS_SECTOR, blocks*PHYS_SECTOR)
part.seek(0)
part.mbr = mbr
part.gpt = gpt
Expand All @@ -103,30 +105,32 @@ def vopen(path, mode='rb', what='auto'):
index=0
if partition > 0:
index = 1 # opens Extended Partition
if DEBUG&2: log("Opening partition @%016x (size %016x)", mbr.partitions[index].offset(), mbr.partitions[index].size())
if DEBUG&2: log("Last sector CHS: %d-%d-%d", *utils.raw2chs(mbr.partitions[index].sLastSectorCHS))
part = disk.partition(d, mbr.partitions[index].offset(), mbr.partitions[index].size())
if DEBUG&2: log("Opened %s partition @%016x (LBA %016x) %s", ('Primary', 'Extended')[index], mbr.partitions[index].chsoffset(), mbr.partitions[index].lbaoffset(), partutils.raw2chs(mbr.partitions[index].sFirstSectorCHS))
if DEBUG&2: log("Opened %s partition @%016xh (LBA %016xh) %s", ('Primary', 'Extended')[index], mbr.partitions[index].chsoffset(), mbr.partitions[index].lbaoffset(), utils.raw2chs(mbr.partitions[index].sFirstSectorCHS))
if partition > 0:
wanted = 1
extpart = part
while wanted <=partition:
bs = extpart.read(512)
ebr = partutils.MBR(bs, disksize=d.size) # reads Extended Boot Record
bs = extpart.read(PHYS_SECTOR)
ebr = MBR(bs, disksize=d.size) # reads Extended Boot Record
if DEBUG&2: log("Opened EBR: %s", ebr)
if ebr.wBootSignature != 0xAA55:
if DEBUG&2: log("Invalid Extended Boot Record")
if what == 'auto':
return d
else:
return 'EINV'
if DEBUG&2: log("Got partition @%016x (@%016x rel.) %s", ebr.partitions[0].chsoffset(), ebr.partitions[0].lbaoffset(), partutils.raw2chs(ebr.partitions[0].sFirstSectorCHS))
if DEBUG&2: log("Next logical partition @%016x (@%016x rel.) %s", ebr.partitions[1].chsoffset(), ebr.partitions[1].lbaoffset(), partutils.raw2chs(ebr.partitions[1].sFirstSectorCHS))
if DEBUG&2: log("Got partition @%016xh (@%016xh rel.) %s", ebr.partitions[0].chsoffset(), ebr.partitions[0].lbaoffset(), utils.raw2chs(ebr.partitions[0].sFirstSectorCHS))
if DEBUG&2: log("Next logical partition @%016xh (@%016xh rel.) %s", ebr.partitions[1].chsoffset(), ebr.partitions[1].lbaoffset(), utils.raw2chs(ebr.partitions[1].sFirstSectorCHS))
if wanted == partition:
if DEBUG&2: log("Opening Logical Partition #%d @%016x %s", partition, ebr.partitions[0].offset(), partutils.raw2chs(ebr.partitions[0].sFirstSectorCHS))
if DEBUG&2: log("Opening Logical Partition #%d @%016xh %s", partition, ebr.partitions[0].offset(), utils.raw2chs(ebr.partitions[0].sFirstSectorCHS))
part = disk.partition(d, ebr.partitions[0].offset(), ebr.partitions[0].size())
part.seek(0)
break
if ebr.partitions[1].dwFirstSectorLBA and ebr.partitions[1].dwTotalSectors:
if DEBUG&2: log("Scanning next Logical Partition @%016x %s size %.02f MiB", ebr.partitions[1].offset(), partutils.raw2chs(ebr.partitions[1].sFirstSectorCHS), ebr.partitions[1].size()//(1<<20))
if DEBUG&2: log("Scanning next Logical Partition @%016xh %s size %.02f MiB", ebr.partitions[1].offset(), utils.raw2chs(ebr.partitions[1].sFirstSectorCHS), ebr.partitions[1].size()//(1<<20))
extpart = disk.partition(d, ebr.partitions[1].offset(), ebr.partitions[1].size())
else:
break
Expand Down Expand Up @@ -178,6 +182,7 @@ def openvolume(part):
part.seek(0)
bs = part.read(512)

if DEBUG&2: log("Boot sector:\n%s", FAT.boot_fat16(bs))
fstyp = utils.FSguess(FAT.boot_fat16(bs)) # warning: if we call this a second time on the same Win32 disk, handle is unique and seek set already!
if DEBUG&2: log("FSguess guessed FS type: %s", fstyp)

Expand Down Expand Up @@ -319,7 +324,7 @@ def copy_in(src_list, dest, callback=None, attributes=None, chunk_size=1<<20):
else:
pass

def copy_tree_in(base, dest, callback=None, attributes=None, chunk_size=1<<20):
def copy_tree_in(base, dest, callback=None, attributes=None, chunk_size=1<<20, uppercase=0):
"""Copy recursively files and directories under real 'base' path into
virtual 'dest' directory table, 'chunk_size' bytes at a time, calling callback function if provided
and preserving date and times if desired."""
Expand All @@ -345,6 +350,7 @@ def copy_tree_in(base, dest, callback=None, attributes=None, chunk_size=1<<20):
src = os.path.join(root, file)
fp = open(src, 'rb')
st = os.stat(src)
if uppercase: file = file.upper() # force name to upper case
# Create target, preallocating all clusters
dst = target_dir.create(file, (st.st_size+dest.boot.cluster-1)//dest.boot.cluster)
if callback: callback(src[len(base)+1:]) # strip base path
Expand Down
6 changes: 6 additions & 0 deletions FATtools/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class win32_disk(object):
"Handles a Win32 disk. PLEASE NOTE: due to locking mechanism, the Win32 HANDLE is here UNIQUE - closing once, closes it everywhere."
open_handles = {}

def type(self): return 'win32disk'

def __str__ (self):
return "Win32 Disk Handle %Xh for %s, mode '%s'" % (self.handle, self.name, self.mode)

Expand Down Expand Up @@ -127,6 +129,8 @@ class disk(object):
end followed by read returns no error."""
def __str__ (self):
return "Python disk '%s' (mode '%s') @%016Xh" % (self._file.name, self.mode, self.pos)

def type(self): return 'disk'

def __init__(self, name, mode='rb', buffering=0):
"'name' is the name of a file or device to open or, if mode is 'ramdisk', a BytesIO object with raw disk data"
Expand Down Expand Up @@ -370,6 +374,8 @@ class partition(object):
"Emulates a partition using disk object"
def __str__ (self):
return "Python partition '%s' (offset=%016Xh, size=%d, mode '%s') @%016Xh" % (self.disk._file.name, self.offset, self.size, self.disk.mode, self.pos)

def type(self): return 'partition'

def __init__(self, disk, offset, size):
assert size != 0
Expand Down
13 changes: 13 additions & 0 deletions FATtools/exFAT.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init2__(self):
self.fatsize = self.dwDataRegionLength
# Data region offset (=cluster #2)
self.dataoffs = self.dwDataRegionOffset * (1 << self.uchBytesPerSector) + self._pos
self.checkvbr()

__getattr__ = utils.common_getattr

Expand All @@ -93,6 +94,18 @@ def root(self):
"Root offset"
return self.cl2offset(self.dwRootCluster)

def checkvbr(self):
if not self.stream: return 0
sector = 1 << self.uchBytesPerSector
self.stream.seek(0)
s = self.stream.read(sector*11)
calc_crc = self.GetChecksum(s)
s = self.stream.read(sector)
stored_crc = struct.unpack('<I',s[:4])[0]
print(calc_crc, stored_crc)
if calc_crc != stored_crc:
raise exFATException("FATAL: exFAT Volume Boot Region is corrupted, bad checksum!")

@staticmethod
def GetChecksum(s, UpCase=False):
"Computates the checksum for the VBR sectors (the first 11) or the UpCase table"
Expand Down
4 changes: 2 additions & 2 deletions FATtools/gptutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__ (self, s=None, offset=0, stream=None):

__getattr__ = utils.common_getattr

def pack(self):
def pack(self, sector=512):
"Updates internal buffer"
for i in self.partitions:
for k, v in list(i._kv.items()):
Expand All @@ -61,7 +61,7 @@ def pack(self):
for k, v in list(self._kv.items()):
self._buf[k:k+struct.calcsize(v[1])] = struct.pack(v[1], getattr(self, v[0]))
self._crc32()
return self._buf
return self._buf+bytearray(sector-len(self._buf))

def __str__ (self):
return utils.class2str(self, "GPT Header @%X\n" % self._pos)
Expand Down
Loading

0 comments on commit 460a946

Please sign in to comment.