Skip to content

Commit

Permalink
Merge pull request datalad#353 from candleindark/ds-collection-info
Browse files Browse the repository at this point in the history
Provide stats about return collection
  • Loading branch information
yarikoptic authored Apr 25, 2024
2 parents c66a8ff + 49e2e61 commit dc9c54e
Show file tree
Hide file tree
Showing 15 changed files with 773 additions and 80 deletions.
15 changes: 8 additions & 7 deletions datalad_registry/blueprints/api/dataset_urls/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from flask_openapi3 import APIBlueprint, Tag
from lark.exceptions import GrammarError, UnexpectedInput
from psycopg2.errors import UniqueViolation
from sqlalchemy import ColumnElement, and_
from sqlalchemy import ColumnElement, and_, select
from sqlalchemy.exc import IntegrityError

from datalad_registry.models import RepoUrl, db
Expand All @@ -32,6 +32,7 @@
PathParams,
QueryParams,
)
from .tools import get_collection_stats
from .. import (
API_URL_PREFIX,
COMMON_API_RESPONSES,
Expand Down Expand Up @@ -92,7 +93,7 @@ def declare_dataset_url(body: DatasetURLSubmitModel):
url_as_str = str(body.url)

repo_url_row = db.session.execute(
db.select(RepoUrl).filter_by(url=url_as_str)
select(RepoUrl).filter_by(url=url_as_str)
).one_or_none()
if repo_url_row is None:
# == The URL requested to be created does not exist in the database ==
Expand All @@ -116,7 +117,7 @@ def declare_dataset_url(body: DatasetURLSubmitModel):
# of the URL in the database
db.session.rollback()
repo_url_added_by_another = (
db.session.execute(db.select(RepoUrl).filter_by(url=url_as_str))
db.session.execute(select(RepoUrl).filter_by(url=url_as_str))
.scalars()
.one_or_none()
)
Expand Down Expand Up @@ -285,11 +286,11 @@ def cache_path_trans(cache_path: Path) -> str:
ep = ".dataset_urls" # Endpoint of `dataset_urls`
base_qry = loads(query.json(exclude={"page"}, exclude_none=True))

base_select_stmt = select(RepoUrl).filter(and_(True, *constraints))

max_per_page = 100 # The overriding limit to `per_page` provided by the requester
pagination = db.paginate(
db.select(RepoUrl)
.filter(and_(True, *constraints))
.order_by(
base_select_stmt.order_by(
getattr(
_ORDER_KEY_TO_SQLA_ATTR[query.order_by], query.order_dir.value
)().nulls_last()
Expand Down Expand Up @@ -341,7 +342,6 @@ def cache_path_trans(cache_path: Path) -> str:
assert pagination.total is not None

page = DatasetURLPage(
total=pagination.total,
cur_pg_num=cur_pg_num,
prev_pg=(
url_for(ep, **base_qry, page=pagination.prev_num)
Expand All @@ -356,6 +356,7 @@ def cache_path_trans(cache_path: Path) -> str:
first_pg=url_for(ep, **base_qry, page=1),
last_pg=url_for(ep, **base_qry, page=1 if total_pages == 0 else total_pages),
dataset_urls=ds_urls,
collection_stats=get_collection_stats(base_select_stmt),
)

return json_resp_from_str(page.json(exclude_none=True))
Expand Down
72 changes: 69 additions & 3 deletions datalad_registry/blueprints/api/dataset_urls/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
BaseModel,
Field,
FileUrl,
NonNegativeInt,
PositiveInt,
StrictInt,
StrictStr,
Expand Down Expand Up @@ -256,14 +257,74 @@ class Config:
by_alias = False


class AnnexDsCollectionStats(BaseModel):
"""
Model with the base components of annex dataset collection statistics
"""

ds_count: NonNegativeInt = Field(description="The number of datasets")
annexed_files_size: Optional[NonNegativeInt] = Field(
None, description="The size of annexed files"
)
annexed_file_count: Optional[NonNegativeInt] = Field(
None, description="The number of annexed files"
)


class DataladDsCollectionStats(BaseModel):
"""
Model for DataLad dataset collection statistics
"""

unique_ds_stats: AnnexDsCollectionStats = Field(
description="Statistics for unique datasets"
)
stats: AnnexDsCollectionStats = Field(
description="Statistics for all datasets, as individual repos, "
"without any deduplication"
)


class NonAnnexDsCollectionStats(BaseModel):
"""
Model for non-annex dataset collection statistics
"""

ds_count: NonNegativeInt = Field(
description="The number of datasets, as individual repos, "
"without any deduplication"
)


class StatsSummary(BaseModel):
unique_ds_count: NonNegativeInt = Field(description="The number of unique datasets")
ds_count: NonNegativeInt = Field(
description="The number of datasets, as individual repos, "
"without any deduplication"
)


class CollectionStats(BaseModel):
datalad_ds_stats: DataladDsCollectionStats = Field(
description="Statistics for DataLad datasets"
)
pure_annex_ds_stats: AnnexDsCollectionStats = Field(
description="Statistics for pure annex datasets, as individual repos, "
"without any deduplication"
)
non_annex_ds_stats: NonAnnexDsCollectionStats = Field(
description="Statistics for non-annex datasets, as individual repos, "
"without any deduplication"
)

summary: StatsSummary = Field(description="Summary statistics")


class DatasetURLPage(BaseModel):
"""
Model for representing a page of dataset URLs in response communication
"""

total: StrictInt = Field(
description="The total number of dataset URLs across all pages"
)
cur_pg_num: StrictInt = Field(description="The number of the current page")
prev_pg: Optional[StrictStr] = Field(
None, description="The link to the previous page"
Expand All @@ -275,3 +336,8 @@ class DatasetURLPage(BaseModel):
dataset_urls: list[DatasetURLRespModel] = Field(
description="The list of dataset URLs in the current page"
)
collection_stats: CollectionStats = Field(
description="Statistics about the collection of dataset URLs, "
"not just the URLs in the current page but the entire collection "
"returned"
)
238 changes: 238 additions & 0 deletions datalad_registry/blueprints/api/dataset_urls/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from sqlalchemy import (
Select,
Subquery,
TableClause,
and_,
column,
func,
not_,
or_,
select,
table,
text,
)

from datalad_registry.models import RepoUrl, db

from .models import (
AnnexDsCollectionStats,
CollectionStats,
DataladDsCollectionStats,
NonAnnexDsCollectionStats,
StatsSummary,
)


def cache_result_to_tmp_tb(select_stmt: Select, tb_name: str) -> TableClause:
"""
Execute the given select statement and cache the result to a temporary table
with the given name
:param select_stmt: The given select statement to execute
:param tb_name: The string to use as the name of the temporary table
:return: A object representing the temporary table
Note: The execution of this function requires the Flask app's context
"""
create_tmp_tb_sql = f"""
CREATE TEMPORARY TABLE {tb_name} AS
{select_stmt.compile(bind=db.engine, compile_kwargs={'literal_binds': True})};
"""
db.session.execute(text(create_tmp_tb_sql))

return table(
tb_name,
*(column(name, c.type) for name, c in select_stmt.selected_columns.items()),
)


def _get_annex_ds_collection_stats(q: Subquery) -> AnnexDsCollectionStats:
"""
Get the stats of a collection of datasets that contains only of annex datasets
:param q: The query that specifies the collection of datasets under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""

ds_count, annexed_files_size, annexed_file_count = db.session.execute(
select(
func.count().label("ds_count"),
func.sum(q.c.annexed_files_in_wt_size).label("annexed_files_size"),
func.sum(q.c.annexed_files_in_wt_count).label("annexed_file_count"),
).select_from(q)
).one()

return AnnexDsCollectionStats(
ds_count=ds_count,
annexed_files_size=annexed_files_size,
annexed_file_count=annexed_file_count,
)


def get_unique_dl_ds_collection_stats(base_q: Subquery) -> AnnexDsCollectionStats:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets, considering datasets with the same `ds_id` as the same
dataset
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""

grp_by_id_q = (
select(
base_q.c.ds_id,
func.max(base_q.c.annexed_files_in_wt_size).label(
"max_annexed_files_in_wt_size"
),
)
.group_by(base_q.c.ds_id)
.subquery("grp_by_id_q")
)

grp_by_id_and_a_f_size_q = (
select(
RepoUrl.ds_id,
RepoUrl.annexed_files_in_wt_size,
func.max(RepoUrl.annexed_files_in_wt_count).label(
"annexed_files_in_wt_count"
),
)
.join(
grp_by_id_q,
and_(
RepoUrl.ds_id == grp_by_id_q.c.ds_id,
or_(
grp_by_id_q.c.max_annexed_files_in_wt_size.is_(None),
RepoUrl.annexed_files_in_wt_size
== grp_by_id_q.c.max_annexed_files_in_wt_size,
),
),
)
.group_by(RepoUrl.ds_id, RepoUrl.annexed_files_in_wt_size)
.subquery("grp_by_id_and_a_f_size_q")
)

return _get_annex_ds_collection_stats(grp_by_id_and_a_f_size_q)


def get_dl_ds_collection_stats_with_dups(base_q: Subquery) -> AnnexDsCollectionStats:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets, considering individual repos as a dataset regardless of
the value of `ds_id`.
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""

# Select statement for getting all the Datalad datasets
dl_ds_q = select(base_q).filter(base_q.c.ds_id.is_not(None)).subquery("dl_ds_q")

return _get_annex_ds_collection_stats(dl_ds_q)


def get_dl_ds_collection_stats(base_q: Subquery) -> DataladDsCollectionStats:
"""
Get the stats of the subset of the collection of datasets that contains only
of Datalad datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""

return DataladDsCollectionStats(
unique_ds_stats=get_unique_dl_ds_collection_stats(base_q),
stats=get_dl_ds_collection_stats_with_dups(base_q),
)


def get_pure_annex_ds_collection_stats(base_q: Subquery) -> AnnexDsCollectionStats:
"""
Get the stats of the subset of the collection of datasets that contains only
of pure annex datasets, the annex datasets that are not Datalad datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""
# Select statement for getting all the pure annex datasets
pure_annex_ds_q = (
select(base_q)
.filter(and_(base_q.c.branches.has_key("git-annex"), base_q.c.ds_id.is_(None)))
.subquery("pure_annex_ds_q")
)

return _get_annex_ds_collection_stats(pure_annex_ds_q)


def get_non_annex_ds_collection_stats(base_q: Subquery) -> NonAnnexDsCollectionStats:
"""
Get the stats of the subset of the collection of datasets that contains only
of non-annex datasets
:param base_q: The base query that specified the collection of datasets
under consideration
:return: The object representing the stats
Note: The execution of this function requires the Flask app's context
"""
# Select statement for getting all the non-annex datasets
non_annex_ds_q = (
select(base_q)
.filter(not_(base_q.c.branches.has_key("git-annex")))
.subquery("non_annex_ds_q")
)

return NonAnnexDsCollectionStats(
ds_count=db.session.execute(
select(func.count().label("ds_count")).select_from(non_annex_ds_q)
).scalar_one()
)


def get_collection_stats(select_stmt: Select) -> CollectionStats:
"""
Get the statistics of the collection of dataset URLs specified by the given select
statement
:param select_stmt: The given select statement
:return: The statistics of the collection of dataset URLs
Note: The execution of this function requires the Flask app's context
"""

# Cache the result of the select statement to a temporary table
tmp_tb = cache_result_to_tmp_tb(select_stmt, "tmp_tb")

# base_q = select_stmt.subquery("base_q")
base_q = select(tmp_tb).subquery("base_q")

datalad_ds_stats = get_dl_ds_collection_stats(base_q)

# Total number of datasets, as individual repos, without any deduplication
ds_count = db.session.execute(
select(func.count().label("ds_count")).select_from(base_q)
).scalar_one()

return CollectionStats(
datalad_ds_stats=datalad_ds_stats,
pure_annex_ds_stats=get_pure_annex_ds_collection_stats(base_q),
non_annex_ds_stats=get_non_annex_ds_collection_stats(base_q),
summary=StatsSummary(
unique_ds_count=datalad_ds_stats.unique_ds_stats.ds_count, ds_count=ds_count
),
)
Loading

0 comments on commit dc9c54e

Please sign in to comment.