-
Notifications
You must be signed in to change notification settings - Fork 1
/
image_processing.py
163 lines (125 loc) · 5.02 KB
/
image_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from __future__ import print_function
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseUpload
from PIL import Image
from io import BytesIO
import json
from controls import receipt_access, receipt_folder_id
def get_service(api_name='drive', api_version='v3', scopes='https://www.googleapis.com/auth/drive'):
"""Get a service that communicates to a Google API.
Args:
api_name: The name of the api to connect to.
api_version: The api version to connect to.
scopes: A list auth scopes to authorize for the application.
key_file_location: The path to a valid service account JSON key file.
Returns:
A service that is connected to the specified API.
"""
service_account_info = json.loads(receipt_access)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
scoped_credentials = credentials.with_scopes(scopes)
# Build the service object.
service = build(api_name, api_version, credentials=scoped_credentials)
return service
def upload(file, file_name):
"""Shows basic usage of the Drive v3 API.
Prints the names and ids of the first 10 files the user has access to.
"""
# Define the auth scopes to request.
scopes = ['https://www.googleapis.com/auth/drive']
try:
file_metadata = {
# name of file to be uploaded
'name': file_name,
# folder to which file is uploaded
# I'm too lazy to make it dynamic so if you share it a new folder
# list the files and choose change this to that folder id
'parents': [receipt_folder_id]
}
# Authenticate and construct service.
service = get_service(
api_name='drive',
api_version='v3',
scopes=scopes)
# Call the Drive v3 API
media = MediaIoBaseUpload(file, mimetype='image/png', resumable=True)
fileCreate = service.files().create(body=file_metadata, media_body=media).execute()
fileID = fileCreate['id']
return fileID
# list_files(key_file_location)
except HttpError as error:
print(f'An error occurred: {error}')
# Use only to clear files in root that cannot be manually deleted elsewhere
def purge(key_file_location, scope='https://www.googleapis.com/auth/drive'):
try:
# Authenticate and construct service.
service = get_service(
api_name='drive',
api_version='v3',
scopes=[scope],
)
# Call the Drive v3 API
results = service.files().list(
pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
if not items:
print('No files found.')
return
print('==deleting files==:')
for item in items:
try:
service.files().delete(fileId=item['id']).execute()
print("{} ({}) deleted successfully".format(item['name'], item['id']))
except Exception as exc:
print("Could not delete file: {} ({})".format(item['name'], item['id']))
except HttpError as error:
print(f'An error occurred: {error}')
def compress_file(file):
"""
:param file: receipt (FileStorage object)
:return: IO stream of compressed image as BytesIO object
"""
# change file to bytes
image_bytes = BytesIO(file.stream.read())
# make PIL image object
img = Image.open(image_bytes)
# convert to black and white
img = img.convert('1')
# compress pixel size
pixels = img.size[0] * img.size[1]
if pixels > 921600:
factor = 921600/pixels
img = img.resize((round(img.size[0]*factor), round(img.size[1]*factor)))
image_stream = BytesIO()
img.save(image_stream, format='PNG')
return image_stream
def list_files(key_file_location):
"""Shows basic usage of the Drive v3 API.
Prints the names and ids of the first 10 files the user has access to.
"""
# Define the auth scopes to request.
scope = 'https://www.googleapis.com/auth/drive.metadata.readonly'
try:
# Authenticate and construct service.
service = get_service(
api_name='drive',
api_version='v3',
scopes=[scope])
# Call the Drive v3 API
results = service.files().list(
pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
if not items:
print('No files found.')
return
print('Files:')
for item in items:
print(u'{0} ({1})'.format(item['name'], item['id']))
except HttpError as error:
print(f'An error occurred: {error}')
if __name__ == '__main__':
# this will delete everything, use at your own risk
# purge(key_file_location='secret_data_lol.txt')
list_files(key_file_location='secret_data_lol.txt')