forked from kalomaze/externalcolabcode
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackups_test.py
138 lines (120 loc) · 6.6 KB
/
backups_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import shutil
import hashlib
import time
LOGS_FOLDER = '/content/Retrieval-based-Voice-Conversion-WebUI/logs'
WEIGHTS_FOLDER = '/content/Retrieval-based-Voice-Conversion-WebUI/weights'
GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup'
def import_google_drive_backup():
print("Importing Google Drive backup...")
GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' # change this to your Google Drive path
LOGS_FOLDER = '/content/Retrieval-based-Voice-Conversion-WebUI/logs'
WEIGHTS_FOLDER = '/content/Retrieval-based-Voice-Conversion-WebUI/weights'
weights_exist = False
files_to_copy = []
weights_to_copy = []
def handle_files(root, files, is_weight_files=False):
for filename in files:
filepath = os.path.join(root, filename)
if filename.endswith('.pth') and is_weight_files:
weights_exist = True
backup_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH))
else:
backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH))
backup_folderpath = os.path.dirname(backup_filepath)
if not os.path.exists(backup_folderpath):
os.makedirs(backup_folderpath)
print(f'Created folder: {backup_folderpath}', flush=True)
if is_weight_files:
weights_to_copy.append((filepath, backup_filepath))
else:
files_to_copy.append((filepath, backup_filepath))
for root, dirs, files in os.walk(os.path.join(GOOGLE_DRIVE_PATH, 'logs')):
handle_files(root, files)
for root, dirs, files in os.walk(os.path.join(GOOGLE_DRIVE_PATH, 'weights')):
handle_files(root, files, True)
# Copy files in batches
total_files = len(files_to_copy)
start_time = time.time()
for i, (source, dest) in enumerate(files_to_copy, start=1):
with open(source, 'rb') as src, open(dest, 'wb') as dst:
shutil.copyfileobj(src, dst, 1024*1024) # 1MB buffer size
# Report progress every 5 seconds or after every 100 files, whichever is less frequent
if time.time() - start_time > 5 or i % 100 == 0:
print(f'\rCopying file {i} of {total_files} ({i * 100 / total_files:.2f}%)', end="")
start_time = time.time()
print(f'\nImported {len(files_to_copy)} files from Google Drive backup')
# Copy weights in batches
total_weights = len(weights_to_copy)
start_time = time.time()
for i, (source, dest) in enumerate(weights_to_copy, start=1):
with open(source, 'rb') as src, open(dest, 'wb') as dst:
shutil.copyfileobj(src, dst, 1024*1024) # 1MB buffer size
# Report progress every 5 seconds or after every 100 files, whichever is less frequent
if time.time() - start_time > 5 or i % 100 == 0:
print(f'\rCopying weight file {i} of {total_weights} ({i * 100 / total_weights:.2f}%)', end="")
start_time = time.time()
if weights_exist:
print(f'\nImported {len(weights_to_copy)} weight files')
print("Copied weights from Google Drive backup to local weights folder.")
else:
print("\nNo weights found in Google Drive backup.")
print("Google Drive backup import completed.")
def backup_files():
print("\n Starting backup loop...")
last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt')
fully_updated = False # boolean to track if all files are up to date
try:
with open(last_backup_timestamps_path, 'r') as f:
last_backup_timestamps = dict(line.strip().split(':') for line in f)
except:
last_backup_timestamps = {}
while True:
updated = False
files_to_copy = []
files_to_delete = []
for root, dirs, files in os.walk(LOGS_FOLDER):
for filename in files:
if filename != 'last_backup_timestamps.txt':
filepath = os.path.join(root, filename)
if os.path.isfile(filepath):
backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER))
backup_folderpath = os.path.dirname(backup_filepath)
if not os.path.exists(backup_folderpath):
os.makedirs(backup_folderpath)
print(f'Created backup folder: {backup_folderpath}', flush=True)
# check if file has changed since last backup
last_backup_timestamp = last_backup_timestamps.get(filepath)
current_timestamp = os.path.getmtime(filepath)
if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp:
files_to_copy.append((filepath, backup_filepath)) # add to list of files to copy
last_backup_timestamps[filepath] = str(current_timestamp) # update last backup timestamp
updated = True
fully_updated = False # if a file is updated, all files are not up to date
# check if any files were deleted in Colab and delete them from the backup drive
for filepath in list(last_backup_timestamps.keys()):
if not os.path.exists(filepath):
backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER))
if os.path.exists(backup_filepath):
files_to_delete.append(backup_filepath) # add to list of files to delete
del last_backup_timestamps[filepath]
updated = True
fully_updated = False # if a file is deleted, all files are not up to date
# Copy files in batches
if files_to_copy:
for source, dest in files_to_copy:
shutil.copy2(source, dest)
print(f'Copied or updated {len(files_to_copy)} files')
# Delete files in batches
if files_to_delete:
for file in files_to_delete:
os.remove(file)
print(f'Deleted {len(files_to_delete)} files')
if not updated and not fully_updated:
print("Files are up to date.")
fully_updated = True # if all files are up to date, set the boolean to True
copy_weights_folder_to_drive()
with open(last_backup_timestamps_path, 'w') as f:
for filepath, timestamp in last_backup_timestamps.items():
f.write(f'{filepath}:{timestamp}\n')
time.sleep(15) # wait for 15 seconds before checking again