-
Notifications
You must be signed in to change notification settings - Fork 1
Abstracted Prediction Pipeline
anishapai edited this page May 4, 2021
·
9 revisions
Because video, image, and audio files are processed very similarly, these have been abstracted into an "object" pipeline. Text prediction is processed slightly differently (as strings as opposed to files), and so have their own pipeline. An issue has been created to merge the text pipeline into the object pipeline.
File Name | Description |
---|---|
server/dependency.py | Added object_collection & UniversalMLPredictionObject object with a type-checked "type" field |
server/db_connection.py | Added object_collection functions, with type-checking as needed |
server/routers/prediction.py | Modified endpoint predict, and get result, added endpoint for listing models by video/image |
server/prediction_worker/utility/main.py | Updated predict function to be object agnostic |
prediction/models/PredictionVideoTemplate | An example video template using opencv2 and a custom dockerfile |
prediction/models/**/config.py | Added a requirement for model_type field. |
client/src/views/ImportVideo | Added two files to copy the Image Upload page in the front-end |
client/src/components/VideoDropzone/VideoDropzone.js | Mimics the ImageDropzone, but for video |
Summary: This is an endpoint at localhost:5000/model/predict
, which receives a list of file objects and predicts using given models, then store UniversalMLPredictionObject object in the object_collection. Note: You can upload any files in Postman but type checking is done within the function.
Params:
- objects: A list of files, of a particular type
- models: A list of strings which are model names, all for the same type
- model_type: A string indicating if the user is uploading files and models for "audio", "video" or "image" pipeline.
- current_user: the username of the current user
Return: List of hashes for different files, example below:
prediction objects: {[
"bd931a6a2262fbab85c18b5a9bfa5a78",
"cd931a6a2widjfidf5c18b5a9bfa5a78"
]}
@model_router.post("/predict")
def create_new_prediction(models: List[str] = (),
model_type: str = Form(...),
objects: List[UploadFile] = File(...),
current_user: User = Depends(current_user_investigator)):
"""
Create a new prediction request for any number of objects on any number of models. This will enqueue the jobs
and a worker will process them and get the results. Once this is complete, a user may later query the job
status by the unique key that is returned from this method for each object uploaded.
:param current_user: User object who is logged in
:param objects: List of file objects that will be used by the models for prediction
:param models: List of models to run on objects
:return: Unique keys for each object uploaded in objects.
"""
# Start with error checking on the models list.
# Ensure that all desired models are valid.
if not models:
return HTTPException(status_code=400, detail="You must specify models to process objects with")
invalid_models = []
for model in models:
if model not in get_available_prediction_models():
invalid_models.append(model)
# make sure models alighn with the model_type
if not model in get_models_by_type(model_type):
invalid_models.append(model)
if invalid_models:
error_message = "Invalid Models Specified: " + ''.join(list(set(invalid_models)))
return HTTPException(status_code=400, detail=error_message)
# Now we must hash each uploaded object
# After hashing, we will store the object file on the server.
buffer_size = 65536 # Read object data in 64KB Chunks for hashlib
hashes_md5 = {}
# Process uploaded objects
for upload_file in objects:
file = upload_file.file
file_type = filetype.guess(file).mime.split("/")[0]
if model_type != file_type:
error_message = "Invalid type for object: " + upload_file.filename + 'is type:' + file_type
return HTTPException(status_code=400, detail=error_message)
md5 = hashlib.md5()
while True:
data = file.read(buffer_size)
if not data:
break
md5.update(data)
# Process object
hash_md5 = md5.hexdigest()
hashes_md5[upload_file.filename] = hash_md5
file.seek(0)
if get_object_by_md5_hash_db(hash_md5):
prediction_obj = get_object_by_md5_hash_db(hash_md5)
else: # If object does not already exist in db
# Create a UniversalMLPredictionObject object to store data
prediction_obj = UniversalMLPredictionObject(**{
'file_names': [upload_file.filename],
'hash_md5': hash_md5,
'type': file_type,
'users': [current_user.username],
'models': {},
'user_role_able_to_tag': ['admin']
})
# Add created object to database
add_object_db(prediction_obj)
# Associate the current user with the object that was uploaded
add_user_to_object(prediction_obj, current_user.username)
# Associate the name the file was uploaded under to the object
add_filename_to_object(prediction_obj, upload_file.filename)
# Copy object to the temporary storage volume for prediction
new_filename = hash_md5 + os.path.splitext(upload_file.filename)[1]
stored_object_path = "/app/prediction/" + new_filename
stored_object = open(stored_object_path, 'wb+')
shutil.copyfileobj(file, stored_object)
for model in models:
Queue(name=model, connection=redis).enqueue(
'utility.main.predict_object', hash_md5, new_filename, job_id=hash_md5+model+str(uuid.uuid4())
)
return {"prediction objects": [hashes_md5[key] for key in hashes_md5]}
- Tests have not been written for this pipeline. Things to try breaking for tests:
- Uploading different files/model types via both the postman endpoint and the front-end.
- Changing the
model_type
parameter inconfig.py
to be something other than video, audio, image. - Uploading broken files and making sure the POST request fails.
- There may be opportunity to speed up certain aspects of this pipeline, especially for larger (video) files. e.g. when hashing, or storing/loading from the docker volume.
- Currently there are two different endpoints for
/list/image
and/list/video
. These can be combined and can take atype
parameter. - Currently there are also two different front-end views for Image and Video. These can also be combined, and the front-end
Upload
page can be re-designed to swap intuitively between different model types. - There is type checking in place for the
UniversalMLPredictionObject
andMicroserviceConnection
(may soon be deleted) independency.py
. There is opportunity to refine this by changing theavailable_types
list into an enum object. - There is no check in place to ensure the
model_type
parameter in a microservice'sconfig.py
is constrained to relevant types. This can be added. See above bullet for guidance. - We can modify the
/predict
endpoint so that when a video comes in, it is converted to audio and text as well, as per the user's request (this would involve thinking about the front-end UI first, and what is the most intuitive way to present options to the investigator before tackling the back-end). There is an issue for this here. - Currently the
/predict
endpoint takes the parametermodel_type
. This is to make sure that when objects are uploaded, they are of the correct type. We may be able to remove this parameter and re-think how to do type-checking in this function, especially if the above is implemented. - We can un-abstract the Docker file, and have each Microservice require their own docker file. This will work if the majority of models require their own docker requirements, which may happen as we incorporate video and audio models.
- We made use of the
filetype
library here. Its accuracy has not been tested. - If
filetype
is highly accurate, we could use it to enable users to upload several different filetypes at once, and sort them in the back-end. This gets into issues of ethics around security vs privacy, that need to be carefully considered before implementing this feature. - In Postman, the
/predict
endpoint sometimes fails because the content-type header is inaccurate. Especially with video. We have fixed this in the past by showing the hidden headers in postman and un-checking and re-checkingcontent-type
so that it refactors to the correct header.