-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy paths3_utils.py
109 lines (89 loc) · 3.39 KB
/
s3_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import json
import boto3
from dotenv import load_dotenv
import os
import logging
load_dotenv()
logger = logging.getLogger('discord')
logger.setLevel(logging.INFO) #set logging level to INFO, DEBUG if we want the full dump
BUCKET = os.getenv('BUCKET')
EXISTING_KEY_ERROR_MESSAGE = 'An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.'
def get_s3_client():
ACCESS_KEY = os.getenv('ACCESS_KEY')
SECRET_KEY = os.getenv('SECRET_KEY')
s3_client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)
return s3_client
def info_from_s3(key, s3_client):
if not key:
return
try:
file = s3_client.get_object(
Bucket=BUCKET,
Key=f'{key}.json'
)
#Handle Replicate functionality
#if file.get('Metadata', 'replicate'):
if file['Metadata'].get('replicate'):
split_key = key.split("/")
key = split_key[0] + "/" + file['Metadata']['replicate'] + "/" + split_key[2]
#key = file['Metadata']['replicate']
try:
file = s3_client.get_object(
Bucket=BUCKET,
Key=f'{key}.json'
)
if file.get('Body'):
file_body = file.get('Body')
return_dict = json.loads(file_body.read())
return_dict['replicate_key'] = key
return return_dict
except Exception as e:
if f'{e}' == EXISTING_KEY_ERROR_MESSAGE:
return None
logger.error(f'An error occurred while interacting with s3:\n{e}')
raise e
#Handles normal case
if file.get('Body'):
file_body = file.get('Body')
return json.loads(file_body.read())
except Exception as e:
if f'{e}' == EXISTING_KEY_ERROR_MESSAGE:
return None
logger.error(f'An error occurred while interacting with s3:\n{e}')
raise e
def get_files_from_dir(key, s3_client):
if not key:
return
return s3_client.list_objects_v2(Bucket=BUCKET, Prefix=f'{key}/')
def get_char_files_from_dir(key, s3_client):
if not key:
return
response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=f'{key}')
for content in response.get('Contents', []):
yield content.get('Key')
meta = s3_client.head_object(Bucket=BUCKET, Key=content.get('Key'))
return
def get_bytes_from_json(json_to_parse):
return bytearray(json.dumps(json_to_parse), 'latin-1')
def upload_to_s3(content, key, s3_client):
#Take the replicate key from the json, kill it and move it to metadata
if content.get('replicate_key'):
key = content['replicate_key']
content.pop('replicate_key')
split_key = key.split("/")
# replicate key needs to be just the channel id
s3_client.put_object(
Body=get_bytes_from_json(content), Bucket=BUCKET, Key=f'{key}.json', Metadata={'replicate': split_key[1]}
)
else: #normal case for any other json updates
s3_client.put_object(
Body=get_bytes_from_json(content), Bucket=BUCKET, Key=f'{key}.json'
)
logger.info('Finished uploading')
def s3_delete(key, s3_client):
s3_client.delete_object(Bucket=BUCKET, Key=f'{key}.json')
logger.info('Deleting channel file ' + key )