Skip to content

Commit

Permalink
Adding digest_unanalyzed route + debug Operation being linked to Read…
Browse files Browse the repository at this point in the history
…set when ingesting transfer and GenPipes
  • Loading branch information
paulstretenowich committed Oct 19, 2023
1 parent 67a6aaf commit 588803a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 20 deletions.
34 changes: 26 additions & 8 deletions project_tracking/api/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,14 @@ def ingest_transfer(project_id: str):
Add new location to file that has already been moved before
the db was created
"""
try:
ingest_data = request.get_json(force=True)
except:
flash('Data does not seems to be json')
return redirect(request.url)
if request.method == 'POST':
try:
ingest_data = request.get_json(force=True)
except:
flash('Data does not seems to be json')
return redirect(request.url)

return [i.flat_dict for i in db_action.ingest_transfer(project_id=project_id, ingest_data=ingest_data)]
return [i.flat_dict for i in db_action.ingest_transfer(project_id=project_id, ingest_data=ingest_data)]

@bp.route('/<string:project_id>/ingest_genpipes', methods=['GET', 'POST'])
# @capitalize
Expand All @@ -463,13 +464,30 @@ def ingest_genpipes(project_id: str):
return redirect(request.url)

project_id_from_name = db_action.name_to_id("Project", ingest_data[vc.PROJECT_NAME].upper())
if project_id != project_id_from_name:
if [int(project_id)] != project_id_from_name:
return abort(
400,
f"project name in POST {ingest_data[vc.PROJECT_NAME].upper()} not Valid, {project_id} requires"
f"project name in POST {ingest_data[vc.PROJECT_NAME].upper()} not in the database, {project_id} required"
)

output = db_action.ingest_genpipes(project_id=project_id, ingest_data=ingest_data)
operation = output[0].flat_dict
jobs = [job.flat_dict for job in output[1]]
return [operation, jobs]

@bp.route('/<string:project_id>/digest_unanalyzed', methods=['POST'])
def digest_unanalyzed(project_id: str):
"""
POST: list of Readset/Sample Name or id
return: Readsets or Samples unanalyzed
"""
logger.debug(f"\n\n{project_id}\n\n")
if request.method == 'POST':
try:
ingest_data = request.get_json(force=True)
except:
flash('Data does not seems to be json')
return redirect(request.url)

return db_action.digest_unanalyzed(project_id=project_id, digest_data=ingest_data)
# return [i.flat_dict for i in db_action.digest_unanalyzed(project_id=project_id, digest_data=ingest_data)]
90 changes: 80 additions & 10 deletions project_tracking/db_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def create_project(project_name, fms_id=None, session=None):
return session.scalars(select(Project).where(Project.name == project_name)).one()


def ingest_run_processing(project_id, ingest_data, session=None):
def ingest_run_processing(project_id: str, ingest_data, session=None):
"""Ingesting run for MoH"""
if not isinstance(ingest_data, dict):
ingest_data = json.loads(ingest_data)
Expand Down Expand Up @@ -658,7 +658,7 @@ def ingest_run_processing(project_id, ingest_data, session=None):
return [operation, job]


def ingest_transfer(project_id, ingest_data, session=None, check_readset_name=True):
def ingest_transfer(project_id: str, ingest_data, session=None, check_readset_name=True):
"""Ingesting transfer"""
if not isinstance(ingest_data, dict):
ingest_data = json.loads(ingest_data)
Expand All @@ -682,18 +682,19 @@ def ingest_transfer(project_id, ingest_data, session=None, check_readset_name=Tr
stop=datetime.now(),
operation=operation
)

readset_list = []
for readset_json in ingest_data[vb.READSET]:
readset_name = readset_json[vb.READSET_NAME]
readset_list.append(session.scalars(select(Readset).where(Readset.name == readset_name)).unique().first())
for file_json in readset_json[vb.FILE]:
src_uri = file_json[vb.SRC_LOCATION_URI]
dest_uri = file_json[vb.DEST_LOCATION_URI]
if check_readset_name:
file = session.scalars(
select(File)
.join(File.readsets)
.where(Readset.name == readset_name )
.join(File.locations )
.where(Readset.name == readset_name)
.join(File.locations)
.where(Location.uri == src_uri)
).unique().first()
if not file:
Expand All @@ -712,6 +713,7 @@ def ingest_transfer(project_id, ingest_data, session=None, check_readset_name=Tr
new_location = Location.from_uri(uri=dest_uri, file=file, session=session)
file.jobs.append(job)
session.add(new_location)
operation.readsets = readset_list

session.add(job)
session.flush()
Expand All @@ -733,7 +735,7 @@ def ingest_transfer(project_id, ingest_data, session=None, check_readset_name=Tr
return [operation, job]


def digest_readset_file(project_id, digest_data, session=None):
def digest_readset_file(project_id: str, digest_data, session=None):
"""Digesting readset file fields for GenPipes"""
if not session:
session = database.get_session()
Expand Down Expand Up @@ -840,7 +842,7 @@ def digest_readset_file(project_id, digest_data, session=None):
output.append(readset_line)
return json.dumps(output)

def digest_pair_file(project_id, digest_data, session=None):
def digest_pair_file(project_id: str, digest_data, session=None):
"""Digesting pair file fields for GenPipes"""
if not session:
session = database.get_session()
Expand Down Expand Up @@ -904,7 +906,7 @@ def digest_pair_file(project_id, digest_data, session=None):

return json.dumps(output)

def ingest_genpipes(project_id, ingest_data, session=None):
def ingest_genpipes(project_id: str, ingest_data, session=None):
"""Ingesting GenPipes run"""
if not isinstance(ingest_data, dict):
ingest_data = json.loads(ingest_data)
Expand All @@ -914,7 +916,7 @@ def ingest_genpipes(project_id, ingest_data, session=None):

project = projects(project_id=project_id, session=session)[0]

operation_config = OperationConfig(
operation_config = OperationConfig.from_attributes(
name=ingest_data[vb.OPERATION_CONFIG_NAME],
version=ingest_data[vb.OPERATION_CONFIG_VERSION],
md5sum=ingest_data[vb.OPERATION_CONFIG_MD5SUM],
Expand All @@ -930,6 +932,7 @@ def ingest_genpipes(project_id, ingest_data, session=None):
operation_config=operation_config
)

readset_list = []
for sample_json in ingest_data[vb.SAMPLE]:
sample = session.scalars(
select(Sample)
Expand All @@ -942,6 +945,7 @@ def ingest_genpipes(project_id, ingest_data, session=None):
select(Readset)
.where(Readset.name == readset_json[vb.READSET_NAME])
).unique().first()
readset_list.append(readset)
if not readset:
raise DidNotFindError(f"No readset named {readset_json[vb.READSET_NAME]}")
if readset.sample != sample:
Expand Down Expand Up @@ -1007,7 +1011,7 @@ def ingest_genpipes(project_id, ingest_data, session=None):

session.add(job)
session.flush()

operation.readsets = readset_list
operation_id = operation.id
job_ids = [job.id for job in operation.jobs]
try:
Expand All @@ -1022,3 +1026,69 @@ def ingest_genpipes(project_id, ingest_data, session=None):
jobs = [session.scalars(select(Job).where(Job.id == job_id)).first() for job_id in job_ids]

return [operation, jobs]


def digest_unanalyzed(project_id: str, digest_data, session=None):
"""
Getting unanalyzed samples or readsets
"""
if not session:
session = database.get_session()

session = database.get_session()

if isinstance(project_id, str):
project_id = [project_id]

sample_name_flag = digest_data["sample_name"]
sample_id_flag = digest_data["sample_id"]
readset_name_flag = digest_data["readset_name"]
readset_id_flag = digest_data["readset_id"]
run_id = digest_data["run_id"]
run_name = digest_data["run_name"]
if run_name:
run_id = name_to_id("Run", run_name)[0]
experiment_sequencing_technology = digest_data["experiment_sequencing_technology"]
location_endpoint = digest_data["location_endpoint"]

if sample_name_flag:
stmt = select(Sample.name)
key = "sample_name"
elif sample_id_flag:
stmt = select(Sample.id)
key = "sample_id"
elif readset_name_flag:
stmt = select(Readset.name)
key = "readset_name"
elif readset_id_flag:
stmt = select(Readset.id)
key = "readset_id"

stmt = (
stmt.join(Sample.readsets)
.join(Readset.operations)
.where(Operation.name.ilike(f"%genpipes%"))
.join(Sample.patient)
.join(Patient.project)
.where(Project.id.in_(project_id))
)

if run_id:
stmt = (
stmt.where(Run.id == run_id)
.join(Readset.run)
)
if experiment_sequencing_technology:
stmt = (
stmt.where(Experiment.sequencing_technology == experiment_sequencing_technology)
.join(Readset.experiment)
)

# logger.debug(f"\n\n{stmt}\n\n")
output = {
"location_endpoint": location_endpoint,
key: session.scalars(stmt).unique().all()
}
# logger.debug(f"\n\n{session.scalars(stmt).unique().all()}\n\n")

return json.dumps(output)
23 changes: 23 additions & 0 deletions project_tracking/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,29 @@ def config_data(cls, data):
"""
pass

@classmethod
def from_attributes(cls, name=None, version=None, md5sum=None, data=None, session=None):
"""
get operation_config if it exist, set it if it does not exist
"""
if not session:
session = database.get_session()
operation_config = session.scalars(
select(cls)
.where(cls.name == name)
.where(cls.version == version)
.where(cls.md5sum == md5sum)
.where(cls.data == data)
).first()
if not operation_config:
operation_config = cls(
name=name,
version=version,
md5sum=md5sum,
data=data
)
return operation_config


class Job(BaseTable):
"""
Expand Down
Loading

0 comments on commit 588803a

Please sign in to comment.