Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add_user_to_shared_folder: team membership #1244

Merged
merged 1 commit into from
May 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions examples/add_user_to_shared_folder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import argparse
import logging
import sys
from typing import Optional, List

from keepercommander import api, utils, crypto
from keepercommander.__main__ import get_params_from_config
from keepercommander.proto import folder_pb2

parser = argparse.ArgumentParser(description='Add user to shared folder')
parser.add_argument('--debug', action='store_true', help='Enables debug logging')
parser.add_argument('--config', dest='config', action='store', help='Config file to use')
parser.add_argument('--debug', dest='debug', action='store_true', help='Enables debug logging')
parser.add_argument('--teams', dest='teams', action='store_true', help='Load teams')
parser.add_argument('-p', '--manage-records', dest='manage_records', action='store', choices=['on', 'off'],
help='account permission: can manage records.')
parser.add_argument('-o', '--manage-users', dest='manage_users', action='store', choices=['on', 'off'],
Expand All @@ -19,40 +22,101 @@

logging.basicConfig(level=logging.DEBUG if opts.debug is True else logging.WARNING, format='%(message)s')

my_params = get_params_from_config('')
my_params = get_params_from_config(opts.config or '')

api.login(my_params)
if not my_params.session_token:
exit(1)

shared_folder_uid = opts.shared_folder
team_uids = [] # type: List[str]
shared_folder_key = None # type: Optional[bytes]
can_manage_users = False

if opts.teams is True:
sd_rq = {
'command': 'sync_down',
'includes': ['teams', 'explicit']
}
sd_rs = api.communicate(my_params, sd_rq)
for team in sd_rs.get('teams', []):
if 'shared_folder_keys' in team:
sf_key = next((x for x in team['shared_folder_keys'] if x['shared_folder_uid'] == shared_folder_uid), None)
if sf_key:
team_uids.append(team['team_uid'])
if not shared_folder_key:
encrypted_key = utils.base64_url_decode(team['team_key'])
team_key_type = team['team_key_type']
try:
if team_key_type == 1:
team_key = crypto.decrypt_aes_v1(encrypted_key, my_params.data_key)
elif team_key_type == 2:
team_key = crypto.decrypt_rsa(encrypted_key, my_params.rsa_key2)
elif team_key_type == 3:
team_key = crypto.decrypt_aes_v2(encrypted_key, my_params.data_key)
elif team_key_type == 4:
team_key = crypto.decrypt_ec(encrypted_key, my_params.ecc_key)
else:
continue
except:
continue
encrypted_key = utils.base64_url_decode(sf_key['shared_folder_key'])
key_type = sf_key['key_type']
try:
if key_type == 1:
shared_folder_key = crypto.decrypt_aes_v1(encrypted_key, team_key)
elif key_type == 2:
if 'team_private_key' in team:
encrypted_rsa_key = utils.base64_url_decode(team['team_private_key'])
rsa_key = crypto.load_rsa_private_key(crypto.decrypt_aes_v1(encrypted_rsa_key, team_key))
shared_folder_key = crypto.decrypt_rsa(encrypted_key, rsa_key)
elif key_type == 3:
shared_folder_key = crypto.decrypt_aes_v2(encrypted_key, team_key)
elif key_type == 4:
if 'team_ecc_private_key' in team:
encrypted_ec_key = utils.base64_url_decode(team['team_ecc_private_key'])
ec_key = crypto.load_ec_private_key(crypto.decrypt_aes_v2(encrypted_ec_key, team_key))
shared_folder_key = crypto.decrypt_ec(encrypted_key, ec_key)
except:
continue

sf_rq = {
'shared_folder_uid': shared_folder_uid
}
if len(team_uids) > 0:
sf_rq['team_uid'] = team_uids[0]
rq = {
'command': 'get_shared_folders',
'shared_folders': [
{
'shared_folder_uid': shared_folder_uid
}
],
'include': ['sfheaders', 'sfusers']
'shared_folders': [sf_rq],
'include': ['sfheaders', 'sfusers', 'sfteams']
}

sf_rs = api.communicate(my_params, sf_rq)
sf_rs = api.communicate(my_params, rq)
if len(sf_rs['shared_folders']) == 0:
raise ValueError(f'Shared folder UID "{shared_folder_uid}" not found')

shared_folder_info = sf_rs['shared_folders'][0]
shared_folder_key = utils.base64_url_decode(shared_folder_info['shared_folder_key'])
key_type = shared_folder_info["key_type"]
if key_type == 1:
shared_folder_key = crypto.decrypt_aes_v1(shared_folder_key, my_params.data_key)
elif key_type == 2:
shared_folder_key = crypto.decrypt_rsa(shared_folder_key, my_params.rsa_key2)
elif key_type == 3:
shared_folder_key = crypto.decrypt_aes_v2(shared_folder_key, my_params.data_key)
elif key_type == 4:
shared_folder_key = crypto.decrypt_ec(shared_folder_key, my_params.data_key)

if not shared_folder_key:
if 'shared_folder_key' in shared_folder_info:
encrypted_key = utils.base64_url_decode(shared_folder_info['shared_folder_key'])
key_type = shared_folder_info["key_type"]
if key_type == 1:
shared_folder_key = crypto.decrypt_aes_v1(encrypted_key, my_params.data_key)
elif key_type == 2:
shared_folder_key = crypto.decrypt_rsa(encrypted_key, my_params.rsa_key2)
elif key_type == 3:
shared_folder_key = crypto.decrypt_aes_v2(encrypted_key, my_params.data_key)
elif key_type == 4:
shared_folder_key = crypto.decrypt_ec(encrypted_key, my_params.data_key)

if not shared_folder_key:
raise ValueError(f'Key cannot be resolved: shared folder UID "{shared_folder_uid}"')

team_uid = None # type: Optional[str]
permissions = next((x for x in shared_folder_info.get('users', []) if x['username'] == my_params.user and x.get('manage_users') is True), None)
if permissions and len(team_uids) > 0:
permissions = next((x for x in shared_folder_info.get('teams', []) if x['team_uid'] in team_uids and x.get('manage_users') is True), None)
if permissions:
team_uid = permissions['team_uid']

existing_users = set()
if isinstance(shared_folder_info.get('users'), list):
Expand Down
Loading