Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

luks_device: add support for keyslots #664

Merged
merged 8 commits into from
Oct 29, 2023
138 changes: 131 additions & 7 deletions plugins/modules/luks_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
value is a string with the passphrase."
type: str
version_added: '1.0.0'
keyslot:
description:
- "Adds the O(keyfile) or O(passphrase) to a specific keyslot when
creating a new container on O(device). Parameter value is the
number of the keyslot."
- "NOTE that a device of O(type) luks1 supports the keyslot numbers
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
0-7 and a device of O(type) luks2 supports the keyslot numbers
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
0-32. In order to use the keyslots 8-31 when creating a new
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
container, setting O(type) to luks2 is required."
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
type: int
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
keysize:
description:
- "Sets the key size only if LUKS container does not exist."
Expand Down Expand Up @@ -108,6 +118,15 @@
be used even if another keyslot already exists for this passphrase."
type: str
version_added: '1.0.0'
new_keyslot:
description:
- "Adds the additional O(new_keyfile) or O(new_passphrase) to a
specific keyslot on the given O(device). Parameter value is the number
of the keyslot."
- "NOTE that a device of O(type) luks1 supports the keyslot numbers
0-7 and a device of O(type) luks2 supports the keyslot numbers
0-32."
type: int
remove_keyfile:
description:
- "Removes given key from the container on O(device). Does not
Expand All @@ -133,6 +152,16 @@
to V(true)."
type: str
version_added: '1.0.0'
remove_keyslot:
description:
- "Removes the key in the given slot on O(device). Needs
O(keyfile) or O(passphrase) for authorization."
- "NOTE that a device of O(type) luks1 supports the keyslot numbers
0-7 and a device of O(type) luks2 supports the keyslot numbers
0-32."
- "NOTE that the given O(keyfile) or O(passphrase) must not be
in the slot to be removed"
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
type: int
force_remove_last_key:
description:
- "If set to V(true), allows removing the last key from a container."
Expand Down Expand Up @@ -377,6 +406,26 @@
state: "present"
keyfile: "/vault/keyfile"
type: luks2

- name: Create a container with key in slot 4
community.crypto.luks_device:
device: "/dev/loop0"
state: "present"
keyfile: "/vault/keyfile"
keyslot: 4

- name: Add a new key in slot 5
community.crypto.luks_device:
device: "/dev/loop0"
keyfile: "/vault/keyfile"
new_keyfile: "/vault/keyfile"
new_keyslot: 5

- name: Remove the key from slot 4 (given keyfile must not be slot 4)
community.crypto.luks_device:
device: "/dev/loop0"
keyfile: "/vault/keyfile"
remove_keyslot: 4
'''

RETURN = '''
Expand Down Expand Up @@ -523,6 +572,27 @@ def is_luks(self, device):
result = self._run_command([self._cryptsetup_bin, 'isLuks', device])
return result[RETURN_CODE] == 0

def get_luks_type(self, device):
''' get the luks type of a device
'''
if self.is_luks(device):
result = self._run_command([self._cryptsetup_bin, 'luksDump', device])
for line in result[STDOUT].splitlines():
if 'Version' in line:
version = line.split()[1]
return 'luks2' if version == '2' else 'luks1'
return None

def is_luks_slot_set(self, device, keyslot):
''' check if a keyslot is set
'''
luks_header = result = self._run_command([self._cryptsetup_bin, 'luksDump', device])
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
if result[RETURN_CODE] != 0:
raise ValueError('Error while dumping LUKS header from %s' % (device, ))
result_luks1 = 'Key Slot %d: ENABLED' % (keyslot) in luks_header[STDOUT]
result_luks2 = '%d: luks2' % (keyslot) in luks_header[STDOUT]
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
return result_luks1 or result_luks2

def _add_pbkdf_options(self, options, pbkdf):
if pbkdf['iteration_time'] is not None:
options.extend(['--iter-time', str(int(pbkdf['iteration_time'] * 1000))])
Expand All @@ -535,7 +605,7 @@ def _add_pbkdf_options(self, options, pbkdf):
if pbkdf['parallel'] is not None:
options.extend(['--pbkdf-parallel', str(pbkdf['parallel'])])

def run_luks_create(self, device, keyfile, passphrase, keysize, cipher, hash_, sector_size, pbkdf):
def run_luks_create(self, device, keyfile, passphrase, keyslot, keysize, cipher, hash_, sector_size, pbkdf):
# create a new luks container; use batch mode to auto confirm
luks_type = self._module.params['type']
label = self._module.params['label']
Expand All @@ -556,6 +626,8 @@ def run_luks_create(self, device, keyfile, passphrase, keysize, cipher, hash_, s
self._add_pbkdf_options(options, pbkdf)
if sector_size is not None:
options.extend(['--sector-size', str(sector_size)])
if keyslot is not None:
options.extend(['--key-slot', str(keyslot)])

args = [self._cryptsetup_bin, 'luksFormat']
args.extend(options)
Expand Down Expand Up @@ -615,7 +687,7 @@ def run_luks_remove(self, device):
raise ValueError('Error while wiping LUKS container signatures for %s: %s' % (device, exc))

def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
new_passphrase, pbkdf):
new_passphrase, new_keyslot, pbkdf):
''' Add new key from a keyfile or passphrase to given 'device';
authentication done using 'keyfile' or 'passphrase'.
Raises ValueError when command fails.
Expand All @@ -625,6 +697,9 @@ def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
if pbkdf is not None:
self._add_pbkdf_options(args, pbkdf)

if new_keyslot is not None:
args.extend(['--key-slot', str(new_keyslot)])

if keyfile:
args.extend(['--key-file', keyfile])
else:
Expand All @@ -640,7 +715,7 @@ def run_luks_add_key(self, device, keyfile, passphrase, new_keyfile,
raise ValueError('Error while adding new LUKS keyslot to %s: %s'
% (device, result[STDERR]))

def run_luks_remove_key(self, device, keyfile, passphrase,
def run_luks_remove_key(self, device, keyfile, passphrase, keyslot,
force_remove_last_key=False):
''' Remove key from given device
Raises ValueError when command fails
Expand Down Expand Up @@ -675,15 +750,18 @@ def run_luks_remove_key(self, device, keyfile, passphrase,
"To be able to remove a key, please set "
"`force_remove_last_key` to `true`." % device)

args = [self._cryptsetup_bin, 'luksRemoveKey', device, '-q']
if keyslot is None:
args = [self._cryptsetup_bin, 'luksRemoveKey', device, '-q']
else:
args = [self._cryptsetup_bin, 'luksKillSlot', device, '-q', str(keyslot)]
if keyfile:
args.extend(['--key-file', keyfile])
result = self._run_command(args, data=passphrase)
if result[RETURN_CODE] != 0:
raise ValueError('Error while removing LUKS key from %s: %s'
% (device, result[STDERR]))

def luks_test_key(self, device, keyfile, passphrase):
def luks_test_key(self, device, keyfile, passphrase, keyslot=None):
''' Check whether the keyfile or passphrase works.
Raises ValueError when command fails.
'''
Expand All @@ -695,12 +773,17 @@ def luks_test_key(self, device, keyfile, passphrase):
else:
data = passphrase

if keyslot is not None:
args.extend(['--key-slot', str(keyslot)])

result = self._run_command(args, data=data)
if result[RETURN_CODE] == 0:
return True
for output in (STDOUT, STDERR):
if 'No key available with this passphrase' in result[output]:
return False
if 'No usable keyslot is available.' in result[output]:
return False

raise ValueError('Error while testing whether keyslot exists on %s: %s'
% (device, result[STDERR]))
Expand Down Expand Up @@ -817,14 +900,24 @@ def luks_add_key(self):
def luks_remove_key(self):
if (self.device is None or
(self._module.params['remove_keyfile'] is None and
self._module.params['remove_passphrase'] is None)):
self._module.params['remove_passphrase'] is None and
self._module.params['remove_keyslot'] is None)):
# conditions for removing a key not fulfilled
return False

if self._module.params['state'] == 'absent':
self._module.fail_json(msg="Contradiction in setup: Asking to "
"remove a key from absent LUKS.")

if self._module.params['remove_keyslot']:
if not self._crypthandler.is_luks_slot_set(self.device, self._module.params['remove_keyslot']):
return False
result = self._crypthandler.luks_test_key(self.device, self._module.params['keyfile'], self._module.params['passphrase'])
if self._crypthandler.luks_test_key(self.device, self._module.params['keyfile'], self._module.params['passphrase'],
self._module.params['remove_keyslot']):
self._module.fail_json(msg='Cannot remove keyslot with keyfile or passphrase in same slot')
return result

return self._crypthandler.luks_test_key(self.device, self._module.params['remove_keyfile'], self._module.params['remove_passphrase'])

def luks_remove(self):
Expand All @@ -833,6 +926,15 @@ def luks_remove(self):
self._crypthandler.is_luks(self.device))


def validate_keyslot(keyslot, luks_type):
if luks_type == 'luks1' and not 0 <= keyslot <= 7:
return False
elif luks_type == 'luks2' and not 0 <= keyslot <= 31:
return False
else:
return True


def run_module():
# available arguments/parameters that a user can pass
module_args = dict(
Expand All @@ -845,6 +947,9 @@ def run_module():
passphrase=dict(type='str', no_log=True),
new_passphrase=dict(type='str', no_log=True),
remove_passphrase=dict(type='str', no_log=True),
keyslot=dict(type='int', no_log=False),
new_keyslot=dict(type='int', no_log=False),
remove_keyslot=dict(type='int', no_log=False),
force_remove_last_key=dict(type='bool', default=False),
keysize=dict(type='int'),
label=dict(type='str'),
Expand Down Expand Up @@ -874,7 +979,7 @@ def run_module():
mutually_exclusive = [
('keyfile', 'passphrase'),
('new_keyfile', 'new_passphrase'),
('remove_keyfile', 'remove_passphrase')
('remove_keyfile', 'remove_passphrase', 'remove_keyslot')
]

# seed the result dict in the object
Expand Down Expand Up @@ -904,6 +1009,22 @@ def run_module():
if module.params['label'] is not None and module.params['type'] == 'luks1':
module.fail_json(msg='You cannot combine type luks1 with the label option.')

if module.params['remove_keyslot'] is not None:
if module.params['keyfile'] is None and module.params['passphrase'] is None:
module.fail_json(msg='Removing a keyslot requires the passphrase or keyfile of another slot')

luks_type = crypt.get_luks_type(conditions.get_device_name())
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
for parameter in ['keyslot', 'new_keyslot', 'remove_keyslot']:
if module.params[parameter] is not None and not validate_keyslot(module.params[parameter], luks_type):
if luks_type == 'luks1':
module.fail_json(msg='%s must be between 0 and 7 when using luks1' % (parameter))
else:
module.fail_json(msg='%s must be between 0 and 31 when using luks2' % (parameter))
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
if conditions.luks_create() and module.params['keyslot'] is not None and module.params['type'] is None:
keyslot = module.params['keyslot']
if validate_keyslot(keyslot, 'luks2') and not validate_keyslot(keyslot, 'luks1'):
module.fail_json(msg='You must specify type=luks2 when creating a new luks device to use keyslots 8-31')

# The conditions are in order to allow more operations in one run.
# (e.g. create luks and add a key to it)

Expand All @@ -914,6 +1035,7 @@ def run_module():
crypt.run_luks_create(conditions.device,
module.params['keyfile'],
module.params['passphrase'],
module.params['keyslot'],
module.params['keysize'],
module.params['cipher'],
module.params['hash'],
Expand Down Expand Up @@ -986,6 +1108,7 @@ def run_module():
module.params['passphrase'],
module.params['new_keyfile'],
module.params['new_passphrase'],
module.params['new_keyslot'],
module.params['pbkdf'])
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
Expand All @@ -1001,6 +1124,7 @@ def run_module():
crypt.run_luks_remove_key(conditions.device,
module.params['remove_keyfile'],
module.params['remove_passphrase'],
module.params['remove_keyslot'],
force_remove_last_key=last_key)
except ValueError as e:
module.fail_json(msg="luks_device error: %s" % e)
Expand Down
Loading
Loading