forked from tuomaskivioja/File-Downloads-Automator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fileAutomator.py
109 lines (92 loc) · 4.24 KB
/
fileAutomator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from os import scandir, rename
from os.path import splitext, exists, join
from shutil import move
from time import sleep
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ! FILL IN BELOW
# ? folder to track e.g. Windows: "C:\\Users\\UserName\\Downloads"
source_dir = ""
dest_dir_sfx = ""
dest_dir_music = ""
dest_dir_video = ""
dest_dir_image = ""
dest_dir_documents = ""
# ? supported image types
image_extensions = [".jpg", ".jpeg", ".jpe", ".jif", ".jfif", ".jfi", ".png", ".gif", ".webp", ".tiff", ".tif", ".psd", ".raw", ".arw", ".cr2", ".nrw",
".k25", ".bmp", ".dib", ".heif", ".heic", ".ind", ".indd", ".indt", ".jp2", ".j2k", ".jpf", ".jpf", ".jpx", ".jpm", ".mj2", ".svg", ".svgz", ".ai", ".eps", ".ico"]
# ? supported Video types
video_extensions = [".webm", ".mpg", ".mp2", ".mpeg", ".mpe", ".mpv", ".ogg",
".mp4", ".mp4v", ".m4v", ".avi", ".wmv", ".mov", ".qt", ".flv", ".swf", ".avchd"]
# ? supported Audio types
audio_extensions = [".m4a", ".flac", "mp3", ".wav", ".wma", ".aac"]
# ? supported Document types
document_extensions = [".doc", ".docx", ".odt",
".pdf", ".xls", ".xlsx", ".ppt", ".pptx"]
def make_unique(dest, name):
filename, extension = splitext(name)
counter = 1
# * IF FILE EXISTS, ADDS NUMBER TO THE END OF THE FILENAME
while exists(f"{dest}/{name}"):
name = f"{filename}({str(counter)}){extension}"
counter += 1
return name
def move_file(dest, entry, name):
if exists(f"{dest}/{name}"):
unique_name = make_unique(dest, name)
oldName = join(dest, name)
newName = join(dest, unique_name)
rename(oldName, newName)
move(entry, dest)
class MoverHandler(FileSystemEventHandler):
# ? THIS FUNCTION WILL RUN WHENEVER THERE IS A CHANGE IN "source_dir"
# ? .upper is for not missing out on files with uppercase extensions
def on_modified(self, event):
with scandir(source_dir) as entries:
for entry in entries:
name = entry.name
self.check_audio_files(entry, name)
self.check_video_files(entry, name)
self.check_image_files(entry, name)
self.check_document_files(entry, name)
def check_audio_files(self, entry, name): # * Checks all Audio Files
for audio_extension in audio_extensions:
if name.endswith(audio_extension) or name.endswith(audio_extension.upper()):
if entry.stat().st_size < 10_000_000 or "SFX" in name: # ? 10Megabytes
dest = dest_dir_sfx
else:
dest = dest_dir_music
move_file(dest, entry, name)
logging.info(f"Moved audio file: {name}")
def check_video_files(self, entry, name): # * Checks all Video Files
for video_extension in video_extensions:
if name.endswith(video_extension) or name.endswith(video_extension.upper()):
move_file(dest_dir_video, entry, name)
logging.info(f"Moved video file: {name}")
def check_image_files(self, entry, name): # * Checks all Image Files
for image_extension in image_extensions:
if name.endswith(image_extension) or name.endswith(image_extension.upper()):
move_file(dest_dir_image, entry, name)
logging.info(f"Moved image file: {name}")
def check_document_files(self, entry, name): # * Checks all Document Files
for documents_extension in document_extensions:
if name.endswith(documents_extension) or name.endswith(documents_extension.upper()):
move_file(dest_dir_documents, entry, name)
logging.info(f"Moved document file: {name}")
# ! NO NEED TO CHANGE BELOW CODE
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = source_dir
event_handler = MoverHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
sleep(10)
except KeyboardInterrupt:
observer.stop()
observer.join()