Skip to content

Commit

Permalink
luks_device: add support for keyslots (#664)
Browse files Browse the repository at this point in the history
* luks_device: add support for keyslots

* luks_device: replace python3 format strings with python2 format strings, remove print statements

* luks_device: add missing copyright information in keyslot integration test files

* luks_device: updated failing unit tests for keyslot support

* luks_device: improve detection of luks version

* luks_device: Update documentation on keyslot parameters, minor code improvements

* luks_device: improve validation of keyslot parameters, fix tests for systems that do not support luks2

* luks_device: correct spelling and errors in documentation and output, check all possible locations for LUKS2 header
  • Loading branch information
zemdreg authored Oct 29, 2023
1 parent 4285501 commit 6504e67
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 21 deletions.
142 changes: 135 additions & 7 deletions plugins/modules/luks_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@
value is a string with the passphrase."
type: str
version_added: '1.0.0'
keyslot:
description:
- "Adds the O(keyfile) or O(passphrase) to a specific keyslot when
creating a new container on O(device). Parameter value is the
number of the keyslot."
- "B(Note) that a device of O(type=luks1) supports the keyslot numbers
V(0)-V(7) and a device of O(type=luks2) supports the keyslot numbers
V(0)-V(31). In order to use the keyslots V(8)-V(31) when creating a new
container, setting O(type) to V(luks2) is required."
type: int
version_added: '2.16.0'
keysize:
description:
- "Sets the key size only if LUKS container does not exist."
Expand Down Expand Up @@ -108,6 +119,16 @@
be used even if another keyslot already exists for this passphrase."
type: str
version_added: '1.0.0'
new_keyslot:
description:
- "Adds the additional O(new_keyfile) or O(new_passphrase) to a
specific keyslot on the given O(device). Parameter value is the number
of the keyslot."
- "B(Note) that a device of O(type=luks1) supports the keyslot numbers
V(0)-V(7) and a device of O(type=luks2) supports the keyslot numbers
V(0)-V(31)."
type: int
version_added: '2.16.0'
remove_keyfile:
description:
- "Removes given key from the container on O(device). Does not
Expand All @@ -133,6 +154,17 @@
to V(true)."
type: str
version_added: '1.0.0'
remove_keyslot:
description:
- "Removes the key in the given slot on O(device). Needs
O(keyfile) or O(passphrase) for authorization."
- "B(Note) that a device of O(type=luks1) supports the keyslot numbers
V(0)-V(7) and a device of O(type=luks2) supports the keyslot numbers
V(0)-V(31)."
- "B(Note) that the given O(keyfile) or O(passphrase) must not be
in the slot to be removed."
type: int
version_added: '2.16.0'
force_remove_last_key:
description:
- "If set to V(true), allows removing the last key from a container."
Expand Down Expand Up @@ -377,6 +409,26 @@
state: "present"
keyfile: "/vault/keyfile"
type: luks2
- name: Create a container with key in slot 4
community.crypto.luks_device:
device: "/dev/loop0"
state: "present"
keyfile: "/vault/keyfile"
keyslot: 4
- name: Add a new key in slot 5
community.crypto.luks_device:
device: "/dev/loop0"
keyfile: "/vault/keyfile"
new_keyfile: "/vault/keyfile"
new_keyslot: 5
- name: Remove the key from slot 4 (given keyfile must not be slot 4)
community.crypto.luks_device:
device: "/dev/loop0"
keyfile: "/vault/keyfile"
remove_keyslot: 4
'''

RETURN = '''
Expand Down Expand Up @@ -523,6 +575,29 @@ def is_luks(self, device):
result = self._run_command([self._cryptsetup_bin, 'isLuks', device])
return result[RETURN_CODE] == 0

def get_luks_type(self, device):
''' get the luks type of a device
'''
if self.is_luks(device):
with open(device, 'rb') as f:
for offset in LUKS2_HEADER_OFFSETS:
f.seek(offset)
data = f.read(LUKS_HEADER_L)
if data == LUKS2_HEADER2:
return 'luks2'
return 'luks1'
return None

def is_luks_slot_set(self, device, keyslot):
''' check if a keyslot is set
'''
result = self._run_command([self._cryptsetup_bin, 'luksDump', device])
if result[RETURN_CODE] != 0:
raise ValueError('Error while dumping LUKS header from %s' % (device, ))
result_luks1 = 'Key Slot %d: ENABLED' % (keyslot) in result[STDOUT]
result_luks2 = ' %d: luks2' % (keyslot) in result[STDOUT]
return result_luks1 or result_luks2

def _add_pbkdf_options(self, options, pbkdf):
if pbkdf['iteration_time'] is not None:
options.extend(['--iter-time', str(int(pbkdf['iteration_time'] * 1000))])
Expand All @@ -535,7 +610,7 @@ def _add_pbkdf_options(self, options, pbkdf):
if pbkdf['parallel'] is not None:
options.extend(['--pbkdf-parallel', str(pbkdf['parallel'])])

def run_luks_create(self, device, keyfile, passphrase, keysize, cipher, hash_, sector_size, pbkdf):
def run_luks_create(self, device, keyfile, passphrase, keyslot, keysize, cipher, hash_, sector_size, pbkdf):
# create a new luks container; use batch mode to auto confirm
luks_type = self._module.params['type']
label = self._module.params['label']
Expand All @@ -556,6 +631,8 @@ def run_luks_create(self, device, keyfile, passphrase, keysize, cipher, hash_, s
self._add_pbkdf_options(options, pbkdf)
if sector_size is not None:
options.extend(['--sector-size', str(sector_size)])
if keyslot is not None:
options.extend(['--key-slot', str(keyslot)])

args = [self._cryptsetup_bin, 'luksFormat']
args.extend(options)
Expand Down Expand Up @@ -615,7 +692,7 @@ def run_luks_remove(self, device):
raise ValueError('Error while wiping LUKS container signatures for %s: %s' % (device, exc))

def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
new_passphrase, pbkdf):
new_passphrase, new_keyslot, pbkdf):
''' Add new key from a keyfile or passphrase to given 'device';
authentication done using 'keyfile' or 'passphrase'.
Raises ValueError when command fails.
Expand All @@ -625,6 +702,9 @@ def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
if pbkdf is not None:
self._add_pbkdf_options(args, pbkdf)

if new_keyslot is not None:
args.extend(['--key-slot', str(new_keyslot)])

if keyfile:
args.extend(['--key-file', keyfile])
else:
Expand All @@ -640,7 +720,7 @@ def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
raise ValueError('Error while adding new LUKS keyslot to %s: %s'
% (device, result[STDERR]))

def run_luks_remove_key(self, device, keyfile, passphrase,
def run_luks_remove_key(self, device, keyfile, passphrase, keyslot,
force_remove_last_key=False):
''' Remove key from given device
Raises ValueError when command fails
Expand Down Expand Up @@ -675,15 +755,18 @@ def run_luks_remove_key(self, device, keyfile, passphrase,
"To be able to remove a key, please set "
"`force_remove_last_key` to `true`." % device)

args = [self._cryptsetup_bin, 'luksRemoveKey', device, '-q']
if keyslot is None:
args = [self._cryptsetup_bin, 'luksRemoveKey', device, '-q']
else:
args = [self._cryptsetup_bin, 'luksKillSlot', device, '-q', str(keyslot)]
if keyfile:
args.extend(['--key-file', keyfile])
result = self._run_command(args, data=passphrase)
if result[RETURN_CODE] != 0:
raise ValueError('Error while removing LUKS key from %s: %s'
% (device, result[STDERR]))

def luks_test_key(self, device, keyfile, passphrase):
def luks_test_key(self, device, keyfile, passphrase, keyslot=None):
''' Check whether the keyfile or passphrase works.
Raises ValueError when command fails.
'''
Expand All @@ -695,12 +778,17 @@ def luks_test_key(self, device, keyfile, passphrase):
else:
data = passphrase

if keyslot is not None:
args.extend(['--key-slot', str(keyslot)])

result = self._run_command(args, data=data)
if result[RETURN_CODE] == 0:
return True
for output in (STDOUT, STDERR):
if 'No key available with this passphrase' in result[output]:
return False
if 'No usable keyslot is available.' in result[output]:
return False

raise ValueError('Error while testing whether keyslot exists on %s: %s'
% (device, result[STDERR]))
Expand Down Expand Up @@ -817,21 +905,44 @@ def luks_add_key(self):
def luks_remove_key(self):
if (self.device is None or
(self._module.params['remove_keyfile'] is None and
self._module.params['remove_passphrase'] is None)):
self._module.params['remove_passphrase'] is None and
self._module.params['remove_keyslot'] is None)):
# conditions for removing a key not fulfilled
return False

if self._module.params['state'] == 'absent':
self._module.fail_json(msg="Contradiction in setup: Asking to "
"remove a key from absent LUKS.")

if self._module.params['remove_keyslot']:
if not self._crypthandler.is_luks_slot_set(self.device, self._module.params['remove_keyslot']):
return False
result = self._crypthandler.luks_test_key(self.device, self._module.params['keyfile'], self._module.params['passphrase'])
if self._crypthandler.luks_test_key(self.device, self._module.params['keyfile'], self._module.params['passphrase'],
self._module.params['remove_keyslot']):
self._module.fail_json(msg='Cannot remove keyslot with keyfile or passphrase in same slot.')
return result

return self._crypthandler.luks_test_key(self.device, self._module.params['remove_keyfile'], self._module.params['remove_passphrase'])

def luks_remove(self):
return (self.device is not None and
self._module.params['state'] == 'absent' and
self._crypthandler.is_luks(self.device))

def validate_keyslot(self, param, luks_type):
if self._module.params[param] is not None:
if luks_type is None and param == 'keyslot':
if 8 <= self._module.params[param] <= 31:
self._module.fail_json(msg="You must specify type=luks2 when creating a new LUKS device to use keyslots 8-31.")
elif not (0 <= self._module.params[param] <= 7):
self._module.fail_json(msg="When not specifying a type, only the keyslots 0-7 are allowed.")

if luks_type == 'luks1' and not 0 <= self._module.params[param] <= 7:
self._module.fail_json(msg="%s must be between 0 and 7 when using LUKS1." % self._module.params[param])
elif luks_type == 'luks2' and not 0 <= self._module.params[param] <= 31:
self._module.fail_json(msg="%s must be between 0 and 31 when using LUKS2." % self._module.params[param])


def run_module():
# available arguments/parameters that a user can pass
Expand All @@ -845,6 +956,9 @@ def run_module():
passphrase=dict(type='str', no_log=True),
new_passphrase=dict(type='str', no_log=True),
remove_passphrase=dict(type='str', no_log=True),
keyslot=dict(type='int', no_log=False),
new_keyslot=dict(type='int', no_log=False),
remove_keyslot=dict(type='int', no_log=False),
force_remove_last_key=dict(type='bool', default=False),
keysize=dict(type='int'),
label=dict(type='str'),
Expand Down Expand Up @@ -874,7 +988,7 @@ def run_module():
mutually_exclusive = [
('keyfile', 'passphrase'),
('new_keyfile', 'new_passphrase'),
('remove_keyfile', 'remove_passphrase')
('remove_keyfile', 'remove_passphrase', 'remove_keyslot')
]

# seed the result dict in the object
Expand Down Expand Up @@ -904,6 +1018,17 @@ def run_module():
if module.params['label'] is not None and module.params['type'] == 'luks1':
module.fail_json(msg='You cannot combine type luks1 with the label option.')

if module.params['keyslot'] is not None or module.params['new_keyslot'] is not None or module.params['remove_keyslot'] is not None:
luks_type = crypt.get_luks_type(conditions.get_device_name())
if luks_type is None and module.params['type'] is not None:
luks_type = module.params['type']
for param in ['keyslot', 'new_keyslot', 'remove_keyslot']:
conditions.validate_keyslot(param, luks_type)

for param in ['new_keyslot', 'remove_keyslot']:
if module.params[param] is not None and module.params['keyfile'] is None and module.params['passphrase'] is None:
module.fail_json(msg="Removing a keyslot requires the passphrase or keyfile of another slot.")

# The conditions are in order to allow more operations in one run.
# (e.g. create luks and add a key to it)

Expand All @@ -914,6 +1039,7 @@ def run_module():
crypt.run_luks_create(conditions.device,
module.params['keyfile'],
module.params['passphrase'],
module.params['keyslot'],
module.params['keysize'],
module.params['cipher'],
module.params['hash'],
Expand Down Expand Up @@ -986,6 +1112,7 @@ def run_module():
module.params['passphrase'],
module.params['new_keyfile'],
module.params['new_passphrase'],
module.params['new_keyslot'],
module.params['pbkdf'])
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
Expand All @@ -1001,6 +1128,7 @@ def run_module():
crypt.run_luks_remove_key(conditions.device,
module.params['remove_keyfile'],
module.params['remove_passphrase'],
module.params['remove_keyslot'],
force_remove_last_key=last_key)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
Expand Down
Loading

0 comments on commit 6504e67

Please sign in to comment.