Skip to content

Commit

Permalink
Merge pull request #197 from olcf/ngin_uximprove
Browse files Browse the repository at this point in the history
Improve user experience
  • Loading branch information
Noah Ginsburg authored Feb 3, 2022
2 parents a1f8e35 + 8765c87 commit 6a671ef
Show file tree
Hide file tree
Showing 24 changed files with 271 additions and 210 deletions.
2 changes: 1 addition & 1 deletion libpkpass/commands/card.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _run_command_execution(self):
####################################################################
return print_card_info(
self.args["card_slot"],
self.identity,
self.iddb.id,
2,
self.args["color"],
self.args["theme_map"],
Expand Down
19 changes: 13 additions & 6 deletions libpkpass/commands/clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from libpkpass import LOGGER
from libpkpass.commands.command import Command
from libpkpass.password import PasswordEntry
from libpkpass.errors import CliArgumentError
from libpkpass.errors import CliArgumentError, PasswordIOError
from libpkpass.models.recipient import Recipient


Expand Down Expand Up @@ -35,18 +35,18 @@ def _run_command_execution(self):
password.read_password_data(
path.join(self.args["pwstore"], self.args["pwname"])
)
distributor = password.recipients[self.identity["name"]]["distributor"]
distributor = password.recipients[self.iddb.id["name"]]["distributor"]
plaintext_pw = password.decrypt_entry(
identity=self.identity,
identity=self.iddb.id,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
)
if not self.args["noverify"]:
result = password.verify_entry(
self.identity["name"],
self.identities,
self.iddb.id["name"],
self.iddb,
distributor,
self.session.query(Recipient)
self.iddb.session.query(Recipient)
.filter(Recipient.name == distributor)
.first()
.certs,
Expand All @@ -73,3 +73,10 @@ def _validate_args(self):
for argument in ["keypath"]:
if argument not in self.args or self.args[argument] is None:
raise CliArgumentError(f"'{argument}' is a required argument")

def _pre_check(self):
if path.exists(path.join(self.args["pwstore"], self.args["pwname"])):
return True
raise PasswordIOError(
f"{path.join(self.args['pwstore'], self.args['pwname'])} does not exist"
)
134 changes: 36 additions & 98 deletions libpkpass/commands/command.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
"""This module is a generic for all pkpass commands"""
import getpass
from os import getcwd, path, sep, remove, rename
from os import getcwd, path, sep
from sqlalchemy.orm import sessionmaker
from libpkpass import LOGGER
from libpkpass.commands.arguments import ARGUMENTS as arguments
from libpkpass.crypto import print_card_info
from libpkpass.errors import (
NullRecipientError,
CliArgumentError,
GroupDefinitionError,
PasswordIOError,
NotThePasswordOwnerError,
)
from libpkpass.identities import IdentityDB
from libpkpass.passworddb import PasswordDB
from libpkpass.password import PasswordEntry
from libpkpass.util import collect_args, color_prepare
from libpkpass.util import collect_args, color_prepare, build_recipient_list
from libpkpass.models.recipient import Recipient


Expand All @@ -42,16 +40,13 @@ def __init__(self, cli, iddb=None, pwdb=None):
##################################################################
"""Intialization function for class. Register with argparse"""
##################################################################
self.cli = cli
# default certpath to none because connect string is allowed
self.args = {}
self.recipient_list = []
self.escrow_and_recipient_list = []
self.identities = iddb if iddb else IdentityDB()
self.iddb = iddb if iddb else IdentityDB()
self.pwdbcached = pwdb is not None
self.passworddb = pwdb if pwdb else PasswordDB()
self.session = None
self.identity = None
cli.register(self, self.name, self.description)

def register(self, parser):
Expand All @@ -66,37 +61,51 @@ def run(self, parsedargs):
"""Passes the argparse Namespace object of parsed arguments"""
##################################################################
self._run_command_setup(parsedargs)
self._pre_check()
self._passphrase_check()
return self._run_command_execution()

def _passphrase_check(self):
if "nopassphrase" in self.selected_args and not self.args["nopassphrase"]:
for mesg in print_card_info(
self.args["card_slot"],
self.iddb.id,
self.args["verbosity"],
self.args["color"],
self.args["theme_map"],
):
LOGGER.info(mesg)
self.passphrase = getpass.getpass("Enter Pin/Passphrase: ")

def _run_command_setup(self, parsedargs):
##################################################################
"""Passes the argparse Namespace object of parsed arguments"""
##################################################################
self.args = collect_args(parsedargs)
self.session = sessionmaker(bind=self.args["db"]["engine"])()
self.iddb.session = sessionmaker(bind=self.args["db"]["engine"])()
self._validate_combinatorial_args()
self._validate_args()

# Build the list of recipients that this command will act on
self._build_recipient_list()
self.recipient_list, self.escrow_and_recipient_list = build_recipient_list(self.args)

# If there are defined repositories of keys and certificates, load them
self.identities.args = self.args
self.identities.cabundle = self.args["cabundle"]
self.identities.load_certs_from_directory(
self.iddb.args = self.args
self.iddb.cabundle = self.args["cabundle"]
self.iddb.load_certs_from_directory(
self.args["certpath"],
connectmap=self.args["connect"],
nocache=self.args["no_cache"],
)
if self.args["keypath"]:
self.identities.load_keys_from_directory(self.args["keypath"])
self.identity = (
self.session.query(Recipient)
self.iddb.load_keys_from_directory(self.args["keypath"])
self.iddb.id = (
self.iddb.session.query(Recipient)
.filter(Recipient.name == self.args["identity"])
.first()
)
self._validate_identities()
self.identity = dict(self.identity)
self.iddb.id = dict(self.iddb.id)

if (
self.args["subparser_name"]
Expand All @@ -107,16 +116,6 @@ def _run_command_setup(self, parsedargs):
if "pwname" in self.args and self.args["pwname"]:
self._resolve_directory_path()
self.args["card_slot"] = self.args["card_slot"] if self.args["card_slot"] else 0
if "nopassphrase" in self.selected_args and not self.args["nopassphrase"]:
for mesg in print_card_info(
self.args["card_slot"],
self.identity,
self.args["verbosity"],
self.args["color"],
self.args["theme_map"],
):
LOGGER.info(mesg)
self.passphrase = getpass.getpass("Enter Pin/Passphrase: ")

def _resolve_directory_path(self):
####################################################################
Expand Down Expand Up @@ -157,7 +156,7 @@ def update_pass(self, pass_value):
secret=pass_value,
distributor=self.args["identity"],
recipients=[self.args["identity"]],
session=self.session,
session=self.iddb.session,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
escrow_users=self.args["escrow_users"],
Expand Down Expand Up @@ -192,7 +191,7 @@ def create_pass(self, password1, description, authorizer, recipient_list=None):
secret=password1,
distributor=self.args["identity"],
recipients=recipient_list,
session=self.session,
session=self.iddb.session,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
escrow_users=self.args["escrow_users"],
Expand Down Expand Up @@ -221,80 +220,19 @@ def create_or_update_pass(
self.args["identity"], owner, self.args["pwname"]
)

def delete_pass(self):
##################################################################
"""This deletes a password that the user has created, useful for testing"""
##################################################################
filepath = path.join(self.args["pwstore"], self.args["pwname"])
try:
remove(filepath)
except OSError as err:
raise PasswordIOError(
f"Password '{self.args['pwname']}' not found"
) from err

def rename_pass(self):
##################################################################
"""This renames a password that the user has created"""
##################################################################
oldpath = path.join(self.args["pwstore"], self.args["pwname"])
newpath = path.join(self.args["pwstore"], self.args["rename"])
try:
rename(oldpath, newpath)
password = PasswordEntry()
password.read_password_data(newpath)
password["metadata"]["name"] = self.args["rename"]
password.write_password_data(newpath)

except OSError as err:
raise PasswordIOError(
f"Password '{self.args['pwname']}' not found"
) from err

def _run_command_execution(self):
##################################################################
"""Passes the argparse Namespace object of parsed arguments"""
##################################################################
raise NotImplementedError

def _build_recipient_list(self):
##################################################################
"""take groups and users and make a SOA for the recipients"""
##################################################################
try:
if "groups" in self.args and self.args["groups"]:
self.recipient_list += self._parse_group_membership()
if "users" in self.args and self.args["users"]:
self.recipient_list += self.args["users"]
self.recipient_list = [x.strip() for x in list(set(self.recipient_list))]
self.escrow_and_recipient_list = (
self.recipient_list + self.args["escrow_users"]
)
for user in self.escrow_and_recipient_list:
if str(user) == "":
raise NullRecipientError
except KeyError: # If this is a command with no users, don't worry about it
pass

def _parse_group_membership(self):
##################################################################
"""Concat membership of supplied groups"""
##################################################################
member_list = []
try:
for group in self.args["groups"]:
member_list += [
user.strip()
for user in self.args[group.strip()].split(",")
if user.strip()
]
return member_list
except KeyError as err:
raise GroupDefinitionError(str(err)) from err

def _validate_args(self):
raise NotImplementedError

def _pre_check(self):
"""Pre check likely won't happen on all commands so we'll allow a
pass"""

def _validate_combinatorial_args(self):
##################################################################
"""This is a weird function name so: combinatorial in this case
Expand Down Expand Up @@ -324,16 +262,16 @@ def _validate_identities(self, swap_list=None):
if not swap_list:
swap_list = self.escrow_and_recipient_list
for recipient in swap_list:
self.identities.verify_identity(recipient)
self.iddb.verify_identity(recipient)
if recipient not in [
x[0] for x in self.session.query(Recipient.name).all()
x[0] for x in self.iddb.session.query(Recipient.name).all()
]:
raise CliArgumentError(
f"Error: Recipient '{recipient}' is not in the recipient database"
)

if self.identity:
self.identities.verify_identity(self.args["identity"])
if self.iddb.id:
self.iddb.verify_identity(self.args["identity"])
else:
raise CliArgumentError(
f"Error: Your user '{self.args['identity']}' is not in the recipient database"
Expand Down
15 changes: 14 additions & 1 deletion libpkpass/commands/delete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""This module allows for the deletion of passwords"""
from os import path, remove
from sys import exit as sysexit
from libpkpass.commands.command import Command
from libpkpass.errors import CliArgumentError, NotThePasswordOwnerError
from libpkpass.errors import CliArgumentError, NotThePasswordOwnerError, PasswordIOError


class Delete(Command):
Expand Down Expand Up @@ -33,6 +34,18 @@ def _run_command_execution(self):
# necessary for print statement
yield ""

def delete_pass(self):
##################################################################
"""This deletes a password that the user has created, useful for testing"""
##################################################################
filepath = path.join(self.args["pwstore"], self.args["pwname"])
try:
remove(filepath)
except OSError as err:
raise PasswordIOError(
f"Password '{self.args['pwname']}' not found"
) from err

def _confirmation(self):
####################################################################
"""Verify password to modify"""
Expand Down
6 changes: 3 additions & 3 deletions libpkpass/commands/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ def _run_command_execution(self):
password.read_password_data(dist_pass)
if self.args["identity"] in password.recipients.keys():
plaintext_pw = password.decrypt_entry(
self.identity,
self.iddb.id,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
)
password.add_recipients(
secret=plaintext_pw,
distributor=self.args["identity"],
recipients=self.recipient_list,
session=self.session,
session=self.iddb.session,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
escrow_users=self.args["escrow_users"],
Expand Down Expand Up @@ -84,7 +84,7 @@ def _confirm_pdb(self):

def _confirm_recipients(self):
not_in_db = []
in_db = [x.name for x in self.session.query(Recipient).all()]
in_db = [x.name for x in self.iddb.session.query(Recipient).all()]
for recipient in self.recipient_list:
if recipient not in in_db:
not_in_db.append(recipient)
Expand Down
4 changes: 2 additions & 2 deletions libpkpass/commands/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ def _iterate_pdb(self, passworddb, crypt_pass=False):
####################################################################
"""Iterate through the passwords that we can decrypt"""
####################################################################
uid = self.identity["name"]
uid = self.iddb.id["name"]
all_passwords = {
k: v for (k, v) in passworddb.pwdb.items() if uid in v.recipients.keys()
}
for _, password in tqdm(all_passwords.items()):
plaintext_pw = password.decrypt_entry(
identity=self.identity,
identity=self.iddb.id,
passphrase=self.passphrase,
card_slot=self.args["card_slot"],
)
Expand Down
2 changes: 1 addition & 1 deletion libpkpass/commands/fileimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _yaml_file(self, password):
####################################################################
"""This function handles the yaml format of pkpass"""
####################################################################
uid = self.identity["name"]
uid = self.iddb.id["name"]
pwstore = self.args["pwstore"]

self.args["pwname"] = password["metadata"]["name"]
Expand Down
Loading

0 comments on commit 6a671ef

Please sign in to comment.