Skip to content

Abstracted Prediction Pipeline

anishapai edited this page May 2, 2021 · 9 revisions

Abstracted Prediction Pipeline

Because video, image, and audio files are processed very similarly, these have been abstracted into an "object" pipeline. Text prediction is processed slightly differently (as strings as opposed to files), and so have their own pipeline. An issue has been created to merge the text pipeline into the object pipeline.

File Changes:

File Name Description
server/dependency.py Added object_collection & UniversalMLPredictionObject object with a type-checked "type" field
server/db_connection.py Added object_collection functions, with type-checking as needed
server/routers/prediction.py Modified endpoint predict, and get result, added endpoint for listing models by video/image
server/prediction_worker/utility/main.py Updated predict function to be object agnostic
prediction/models/PredictionVideoTemplate An example video template using opencv2 and a custom dockerfile
prediction/models/**/config.py Added a requirement for model_type field.
client/src/views/ImportVideo Added two files to copy the Image Upload page in the back-end
client/src/components/VideoDropzone/VideoDropzone.js Mimics the ImageDropzone, but for video

Server/prediction.py

Predict Function for Object

Summary: This is an endpoint at localhost:5000/model/predict, which receives a list of file objects and predicts using given models, then store UniversalMLPredictionObject object in the object_collection. Note: You can upload any files in Postman but type checking is done within the function.

Params:

  • objects: A list of files, of a particular type
  • models: A list of strings which are model names, all for the same type
  • model_type: A string indicating if the user is uploading files and models for "audio", "video" or "image" pipeline.
  • current_user: the username of the current user

Return: List of hashes for different files, example below:

prediction objects: {[
       "bd931a6a2262fbab85c18b5a9bfa5a78",
       "cd931a6a2widjfidf5c18b5a9bfa5a78"
]}
@model_router.post("/predict")
def create_new_prediction(models: List[str] = (),
                          model_type: str = Form(...),
                          objects: List[UploadFile] = File(...),
                          current_user: User = Depends(current_user_investigator)):
    """
    Create a new prediction request for any number of objects on any number of models. This will enqueue the jobs
    and a worker will process them and get the results. Once this is complete, a user may later query the job
    status by the unique key that is returned from this method for each object uploaded.
    :param current_user: User object who is logged in
    :param objects: List of file objects that will be used by the models for prediction
    :param models: List of models to run on objects
    :return: Unique keys for each object uploaded in objects.
    """

    # Start with error checking on the models list.
    # Ensure that all desired models are valid.
    if not models:
        return HTTPException(status_code=400, detail="You must specify models to process objects with")

    invalid_models = []
    for model in models:
        if model not in get_available_prediction_models():
            invalid_models.append(model)
        # make sure models alighn with the model_type
        if not model in get_models_by_type(model_type): 
            invalid_models.append(model)

    if invalid_models:
        error_message = "Invalid Models Specified: " + ''.join(list(set(invalid_models)))
        return HTTPException(status_code=400, detail=error_message)

    # Now we must hash each uploaded object
    # After hashing, we will store the object file on the server.

    buffer_size = 65536  # Read object data in 64KB Chunks for hashlib
    hashes_md5 = {}

    # Process uploaded objects
    for upload_file in objects:
        file = upload_file.file
        file_type = filetype.guess(file).mime.split("/")[0]
        if model_type != file_type:
            error_message = "Invalid type for object: " + upload_file.filename + 'is type:' + file_type
            return HTTPException(status_code=400, detail=error_message)
        md5 = hashlib.md5()
        while True:
            data = file.read(buffer_size)
            if not data:
                break
            md5.update(data)

        # Process object
        hash_md5 = md5.hexdigest()
        hashes_md5[upload_file.filename] = hash_md5

        file.seek(0)

        if get_object_by_md5_hash_db(hash_md5):
            prediction_obj = get_object_by_md5_hash_db(hash_md5)
        else:  # If object does not already exist in db

            # Create a UniversalMLPredictionObject object to store data
            prediction_obj = UniversalMLPredictionObject(**{
                'file_names': [upload_file.filename],
                'hash_md5': hash_md5,
                'type': file_type,
                'users': [current_user.username],
                'models': {},
                'user_role_able_to_tag': ['admin']
            })

            # Add created object to database
            add_object_db(prediction_obj)

        # Associate the current user with the object that was uploaded
        add_user_to_object(prediction_obj, current_user.username)

        # Associate the name the file was uploaded under to the object
        add_filename_to_object(prediction_obj, upload_file.filename)

        # Copy object to the temporary storage volume for prediction
        new_filename = hash_md5 + os.path.splitext(upload_file.filename)[1]
        stored_object_path = "/app/prediction/" + new_filename
        stored_object = open(stored_object_path, 'wb+')
        shutil.copyfileobj(file, stored_object)

        for model in models:
            Queue(name=model, connection=redis).enqueue(
                'utility.main.predict_object', hash_md5, new_filename, job_id=hash_md5+model+str(uuid.uuid4())
            )

    return {"prediction objects": [hashes_md5[key] for key in hashes_md5]}

Things to consider/ Future work

  • Tests have not been written for this pipeline. Things to try breaking for tests:
    • Uploading different files/model types via both the postman endpoint and the front-end.
    • Changing the model_type parameter in config.py to be something other than video, audio, image.
    • Uploading broken files and making sure the POST request fails.
  • Currently there are two different endpoints for /list/image and /list/video. These can be combined and can take a type parameter.
  • Currently there are also two different front-end views for Image and Video. These can also be combined, and the front-end Upload page can be re-designed to swap intuitively between different model types.
  • There is type checking in place for the UniversalMLPredictionObject and MicroserviceConnection (may soon be deleted) in dependency.py. There is opportunity to refine this by changing the available_types list into an enum object.
  • There is no check in place to ensure the model_type parameter in a microservice's config.py is constrained to relevant types. This can be added. See above bullet for guidance.
  • We can modify the /predict endpoint so that when a video comes in, it is converted to audio and text as well, as per the user's request (this would involve thinking about the front-end UI first, and what is the most intuitive way to present options to the investigator before tackling the back-end). There is an issue for this here.
  • We can un-abstract the Docker file, and have each Microservice require their own docker file. This will work if the majority of models require their own docker requirements, which may happen as we incorporate video and audio models.
  • We made use of the filetype library here. Its accuracy has not been tested.
  • If filetype is highly accurate, we could use it to enable users to upload several different filetypes at once, and sort them in the back-end. This gets into issues of ethics around security vs privacy, that need to be carefully considered before implementing this feature.
  • In Postman, the /predict endpoint sometimes fails because the content-type header is inaccurate. Especially with video. We have fixed this in the past by showing the hidden headers in postman and un-checking and re-checking content-type so that it refactors to the correct header.