-
Notifications
You must be signed in to change notification settings - Fork 0
/
train.py
423 lines (368 loc) · 17.6 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
import argparse
import json
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # turn off TF's tons of "debug info" and "warnings"
import os.path
import mlflow
from mlflow_callback import MlFlowCallback
from mlflow.models.signature import infer_signature
from mlflow.models import ModelSignature
from mlflow.types.schema import Schema
import mlflow.tensorflow
import numpy as np
import pandas as pd
import sqlalchemy as sa
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, Flatten, Input, Dropout, BatchNormalization
from tensorflow.keras.layers.experimental.preprocessing import RandomFlip
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.saved_model import signature_constants, tag_constants
from utils import CustomDataGenerator
# This IMAGE_SHAPE is used globally: in the define_data_generators which resize
# the image to this size (particularly for datasets with images of varying sizes)
# and in the define_network function where the input layer is defined, and in
# the model signature definition for logging the model in the MLflow registry.
IMAGE_SHAPE = (128, 128, 3)
def define_data_generator(batch_size, samples, train=True, aug=False, df=None):
""" General wrapper for data generator defintion: reference the subfunction
variation based on desired data source (tf dataset, directory of images,
database query of image filepaths and labels).
For now, just uncomment the desired wrapped define_data_generator_***()
function and comment out the others.
"""
return define_data_generator_tfdataset(batch_size, samples, train)
# return define_data_generator_imagedir(batch_size, samples, train)
# return define_data_generator_dataframe(batch_size, samples, train)
# return define_data_generator_dataframe_custom(batch_size, samples, train, aug=False, df=None)
def define_data_generator_dataframe(batch_size, samples, train=True):
""" Create generator based image path/label contents of pandas dataframe.
(eg from a database, but here for example just manually spec tiny df.)
"""
# This datadict and df are just to demonstrate without a database connected;
# in real usage populate df with contents pulled from a database query.
print("NOTE IF YOU HAVEN'T UPDATED THE CONTENTS OF files_dataframe.json")
print("FROM ITS EXAMPLE ENTRIES TO REAL IMAGES THAT EXIST AT THOSE PATHS")
print("THEN YOU WILL GET INVALID-IMAGE-FILENAME AND FOLLOW-ON ERRORS BELOW.")
with open("files_dataframe.json", "r") as f:
datadict = json.load(f)
df = pd.DataFrame(datadict)
# Note train_datagen includes image augmentations and val_datagen does not,
# otherwise we could just use one ImageDataGenerator with validation_split
# for the two generators.
if train:
train_datagen = ImageDataGenerator(
rescale=1. / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
validation_split=0.8,
)
generator = train_datagen.flow_from_dataframe(
dataframe=df,
x_col="imagepath", # no directory arg so path is absolute
y_col="label",
batch_size=batch_size,
shuffle=False,
class_mode="raw",
target_size=(IMAGE_SHAPE[0], IMAGE_SHAPE[1]),
)
else:
val_datagen = ImageDataGenerator(
rescale=1. / 255,
validation_split=0.2,
)
generator = val_datagen.flow_from_dataframe(
dataframe=df,
x_col="imagepath", # no directory arg so path is absolute
y_col="label",
batch_size=batch_size,
shuffle=False,
class_mode="raw",
target_size=(IMAGE_SHAPE[0], IMAGE_SHAPE[1]),
)
return generator
def define_data_generator_dataframe_custom(batch_size, samples, train=True, aug=False, df=None):
""" Create generator based image path/label contents of pandas dataframe.
"""
target_size = (IMAGE_SHAPE[0], IMAGE_SHAPE[1])
if train:
generator = CustomDataGenerator(
df,
X_col={'path': 'datafilename', 'bbox': 'bbox'},
y_col={'label': 'label'},
batch_size=batch_size,
input_size=target_size,
shuffle=True,
augmentation=aug,
)
else:
generator = CustomDataGenerator(
df,
X_col={'path': 'previewname', 'bbox': 'bbox'},
y_col={'label': 'label'},
batch_size=batch_size,
input_size=target_size,
shuffle=True,
augmentation=aug,
)
return generator
def define_data_generator_imagedir(batch_size, samples, train=True):
""" Create data generator based on contents of an image directory."""
print("WARNING, THIS FUNCTION HAS NOT YET BEEN TESTED; MAY NEED DEBUGGING.")
# Note train_datagen includes image augmentations and val_datagen does not,
# otherwise we could just use one ImageDataGenerator with validation_split
# for both.
if train:
train_datagen = ImageDataGenerator(
rescale=1. / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
validation_split=0.8,
)
generator = train_datagen.flow_from_directory(
"data/train",
target_size=(IMAGE_SHAPE[0], IMAGE_SHAPE[1]),
batch_size=batch_size,
class_mode="raw"
)
else:
val_datagen = ImageDataGenerator(
rescale=1. / 255,
validation_split=0.2,
)
generator = val_datagen.flow_from_directory(
"data/validation",
target_size=(IMAGE_SHAPE[0], IMAGE_SHAPE[1]),
batch_size=batch_size,
class_mode="raw"
)
return generator
def define_data_generator_tfdataset(batch_size, samples, train=True):
""" Create data generator based on a prefab Tensorflow dataset. There
aren't a ton of such datasets but they have their uses. """
global IMAGE_SHAPE
# Settings for some other datasets (match with yield lines below):
# Note /storage/tf_data is volume mapped from host file system
# ds = tfds.load("celeb_a", split=["train", "test"], data_dir="/storage/tf_data/")
# IMAGE_SHAPE = (218, 178, 3) # celeb_a
# ds = tfds.load("beans", split=["train", "test"], data_dir="/storage/tf_data/")
# IMAGE_SHAPE = (500, 500, 3) # beans
# ds = tfds.load("patch_camelyon", split=["train", "test"], data_dir="/storage/tf_data/")
# IMAGE_SHAPE = (96, 96, 3) # patch_camelyon
# Unlike the above datasets, 'malaria' only has 'train' section so we split that.
# Using slicing form (like [:50%]) rather than even_splits() for later flexibility.
ds = tfds.load("malaria", split=["train[:50%]", "train[50%:]"], data_dir="/storage/tf_data/")
# Fyi the split usage below works on split[0] and split[1] not names, so it
# doesn't matter that the word "test" is not there.
IMAGE_SHAPE = (100, 100, 3) # malaria
def gen_callable(train=True):
""" A callable function that returns a generator needed to form the
dataset. """
tindex = 0 if train else 1
def generator():
for sample in ds[tindex]:
# Settings for some other datasets (match with ds lines above):
# yield sample["image"] / 255, tf.map_fn(lambda label: 1 if label else 0, sample["attributes"]["Smiling"], dtype=tf.int32) # celeb_a
# yield sample["image"] / 255, tf.map_fn(lambda label: 1 if label == 2 else 0, sample["label"], dtype=tf.int32) # beans
# This block didn't work as intended - model.fit() still complained of varying image sizes:
# # Randomly crop images to same size so they can be batched together:
# # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize
# img = np.array(sample["image"]) # (W0, H0, 3) Numpy.array
# # boxes = tf.random.uniform(shape=(NUM_BOXES, 4)) # replace with cropsize
# # NUM_BOXES rows of [y1, x1, y2, x2]; these are normalize coords 0-1
# # ith row has coordinates of a box in the box_ind[i] image
# boxes = np.array([[0, 0, IMAGE_SHAPE[1], IMAGE_SHAPE[0]]]) # NUM_BOXES=1; note x,y order swapped here
# # box_indices = tf.random.uniform(shape=(NUM_BOXES,), minval=0, maxval=BATCH_SIZE, dtype=tf.int32)
# # NUM_BOXES rows with int32 values in [0,batch)
# # box_ind[i] specifies the image that the i-th box refers to.
# box_indices = [0] # NUM_BOXES=1
# print('boxes:', boxes)
# sample_image = tf.image.crop_and_resize(img, boxes, box_indices, (IMAGE_SHAPE[1], IMAGE_SHAPE[0]))
# print('sample_image.size:', sample_image)
# # [crop_height, crop_width]. All cropped image patches are resized to this size.
# # tf example used (crop_height, crop_width) ie tuple.
# # aspect ratio not preserved.
# # sample_image is [num_boxes, crop_height, crop_width, depth].
resized_image = tf.image.resize(sample["image"], [IMAGE_SHAPE[0], IMAGE_SHAPE[1]])
yield resized_image/ 255, sample["label"] # malaria
return generator
# The keras model.fit() function doesn't like the form of this generator
# directly, but accepts a dataset created from it, so creating that here
dataset = tf.data.Dataset.from_generator(
generator=gen_callable(train),
output_types=(tf.float32, tf.uint8),
output_shapes=(
tf.TensorShape([IMAGE_SHAPE[0], IMAGE_SHAPE[1], IMAGE_SHAPE[2]]),
tf.TensorShape([])
)
)
return dataset.batch(batch_size).repeat()
def define_dataframe():
""" Encapsulating the definition/query of the dataframe of filepaths/labels."""
N = max(samples, 1000) # samples per class to pull equally from database
engine = sa.create_engine("postgresql://myusername@mydbserver/mydatabase")
# For a database in which table mydata has a json tags column containing key
# 'mylabel' with boolean values, randomly pull equal number (N) of Trues and
# Falses:
sql = f"""SELECT * FROM (
SELECT (tags->>'mylabel')::boolean::int as label,
dataid,
datafilename,
null as bbox, -- placeholder to fit code for now
entrytime,
row_number() OVER (PARTITION BY tags->'mylabel'
ORDER BY random() DESC NULLS LAST) AS entries
FROM mydata WHERE tags@>'{{"have_file":true}}') AS p
WHERE entries<={N} and label is not null;"""
df = pd.read_sql(sql, engine)
df = df.sample(frac=1).reset_index(drop=True) # randomize row order
# verify data file exists to prevent crashing downstream...
df["fileexists"] = df.datafilename.str.apply(lambda x: os.path.isfile(x))
# print("Dataframe size before fileexists filtering:", df.shape)
df = df.loc[df["fileexists"], :]
# print("Dataframe size after fileexists filtering:", df.shape)
def define_network(randomize_images, convolutions):
""" Encapsulating the neural network definition. """
model = Sequential()
model.add(Input(shape=IMAGE_SHAPE))
if randomize_images:
model.add(RandomFlip())
if convolutions == 0: # this 0 is flag to use vgg16
VGG16_MODEL = VGG16(
input_shape=IMAGE_SHAPE,
include_top=False,
weights="imagenet"
)
# VGG16_MODEL.trainable=False # pin all layers
# VGG16_MODEL.layers[n].trainable=False # pin layer n
model.add(VGG16_MODEL)
model.add(Dense(256, activation="relu"))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Flatten())
model.add(Dense(1, activation="sigmoid"))
else:
for i in range(convolutions):
model.add(Conv2D(64 * (2**i), (3, 3), padding="same", activation="relu"))
model.add(MaxPool2D(strides=(2, 2)))
model.add(Dropout(0.5))
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Flatten())
model.add(Dense(1, activation="sigmoid"))
model.compile(
optimizer=Adam(learning_rate=0.001),
loss=BinaryCrossentropy(),
metrics=["accuracy", "AUC", "Precision", "Recall"]
)
return model
def train_model(
batch_size,
epochs,
convolutions,
train_samples,
val_samples,
randomize_images,
run_name,
experiment_name,
augmentation,
):
""" Run the model training and log performance and model to mlflow. """
# Generic call to define data; any changes for different data sources are
# found up in define_data_generator() (or the functions that it wraps).
# df = define_dataframe() # get the dataframe of filepaths and labels from database
df = None # using a tensorflow dataset instead
train_gen = define_data_generator(batch_size, train_samples, train=True, aug=augmentation, df=df)
valid_gen = define_data_generator(batch_size, val_samples, train=False, aug=augmentation, df=df)
with mlflow.start_run(run_name=run_name):
mlflow.tensorflow.autolog(
#registered_model_name=run_name + "_model",
#log_models=True,
log_datasets=True,
log_input_examples=True,
log_model_signatures=True,
)
# Alas log_models and registered_model_name args STILL not working in
# autolog above, even as of MLflow v2.4.1. So "manually" specifying via
# log_model at end instead. Also, setting run_name via arg
# in mlflow.start_run() STILL does not work either, so must set
# explicitly here. Leaving this note as a reminder to recheck in future.
mlflow.set_tags({"mlflow.runName": run_name})
# Define the network
model = define_network(randomize_images, convolutions)
# Log the model architecture summary as a run artifact.
stringlist = []
model.summary(line_length=78, print_fn=lambda x: stringlist.append(x))
model_summary = "\n".join(stringlist)
mlflow.log_text(model_summary, "model/model_arch.txt")
print("starting model.fit...", flush=True)
model.fit(train_gen,
validation_data=valid_gen,
epochs=epochs,
steps_per_epoch=train_samples / batch_size,
validation_steps=val_samples / batch_size,
callbacks=[MlFlowCallback()],
)
# Create an example batch with batch_size instances
#example_batch = np.random.rand(batch_size, IMAGE_SHAPE[0], IMAGE_SHAPE[1], IMAGE_SHAPE[2])
#input_schema = Schema([tf.TensorSpec(shape=example_batch.shape, dtype=tf.float32)])
#output_schema = Schema([tf.TensorSpec(shape=(batch_size, 1), dtype=tf.float32)])
#signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# Infer the signature
#signature = tf.keras.models.infer_signature(model, [input_signature])
mlflow.tensorflow.log_model(
model=model,
artifact_path="model",
pip_requirements="requirements.txt",
#signature=signature,
)
# Save and register the model in the registry.
# (weird, fyi while current mlflow version requires saved_model_dir to
# be set in tf.keras.models.save_model and mlflow.tensorflow.log_model,
# whatever I set there gets overwritten by "tfmodel". Since must have
# something set, just simply using "tfmodel" then, no problem it's just
# a subdir created in the mlflow run artifacts directory.)
#tag = [tag_constants.SERVING]
#key = signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
#tf.keras.models.save_model(model, "tfmodel")
#mlflow.tensorflow.log_model(tf_saved_model_dir="tfmodel",
# tf_meta_graph_tags=tag,
# tf_signature_def_key=key,
# artifact_path="model",
# registered_model_name=run_name + "_model")
if __name__ == "__main__":
""" Call train_model() from the commandline, as the default entrypoint
per standard usage of the MLflow projects call.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size")
parser.add_argument("--epochs")
parser.add_argument("--convolutions")
parser.add_argument("--training-samples")
parser.add_argument("--validation-samples")
parser.add_argument("--randomize-images")
parser.add_argument("--run-name")
parser.add_argument("--experiment-name")
parser.add_argument("--augmentation")
args = parser.parse_args()
train_model(
batch_size=int(args.batch_size),
epochs=int(args.epochs),
convolutions=int(args.convolutions),
train_samples=int(args.training_samples),
val_samples=int(args.validation_samples),
randomize_images=bool(args.randomize_images),
run_name=args.run_name,
experiment_name=args.experiment_name,
augmentation=args.augmentation,
)