Skip to content

Commit

Permalink
[#2764] Resolve cases in case list concurrently
Browse files Browse the repository at this point in the history
Resolving cases involves several sequential network requests,
which creates a lot of latency when loading a lost of cases.
This commit runs the resolving concurrently, and also centralizes
the logic in the CaseListService.
  • Loading branch information
swrichards committed Sep 25, 2024
1 parent a7333e7 commit a9d8902
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 156 deletions.
268 changes: 244 additions & 24 deletions src/open_inwoner/cms/cases/views/cases.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import concurrent.futures
import functools
import logging
import os
from dataclasses import dataclass
from typing import Callable

from django.http import HttpRequest
from django.urls import reverse
Expand All @@ -14,11 +17,16 @@

from open_inwoner.htmx.mixins import RequiresHtmxMixin
from open_inwoner.openzaak.api_models import Zaak
from open_inwoner.openzaak.cases import preprocess_data
from open_inwoner.openzaak.clients import CatalogiClient, ZakenClient
from open_inwoner.openzaak.formapi import fetch_open_submissions
from open_inwoner.openzaak.models import OpenZaakConfig, ZGWApiGroupConfig
from open_inwoner.openzaak.models import (
OpenZaakConfig,
ZaakTypeConfig,
ZaakTypeStatusTypeConfig,
ZGWApiGroupConfig,
)
from open_inwoner.openzaak.types import UniformCase
from open_inwoner.openzaak.utils import get_user_fetch_parameters
from open_inwoner.openzaak.utils import get_user_fetch_parameters, is_zaak_visible
from open_inwoner.utils.mixins import PaginationMixin
from open_inwoner.utils.views import CommonPageMixin

Expand All @@ -45,57 +53,269 @@ def process_data(self) -> dict:


class CaseListService:
request: HttpRequest

_max_threads_get_cases_for_api_group: int
_max_threads_resolve_case_list: int
_max_threads_resolve_case_attributes: int

_cumulative_case_fetching_timeout: int = 60
_fetch_task_timeout: int = 6

def __init__(self, request: HttpRequest):
self.request = request

def get_cases_for_api_group(self, group: ZGWApiGroupConfig):
raw_cases = group.zaken_client.fetch_cases(
**get_user_fetch_parameters(self.request)
# Note that the the total number of threads is the product of all these
# thread counts. Hence, it is important to be cautious when increasing
# `_max_threads_get_cases_for_api_group`, as this will quickly
# increase the total number of threads used. It is also important to
# keep in mind that increasing `_max_threads_resolve_case_attributes``
# might have diminishing returns, as this involves fetching e.g. status
# and zaaktypes, which are cached by the client and may overlap between
# different cases. The additional concurrency ought to be balanced
# against the prospect of the cache being filled early.
cpu_count = os.cpu_count() or 0
self._max_threads_get_cases_for_api_group = 2
self._max_threads_resolve_case_list = max(2, cpu_count)
self._max_threads_resolve_case_attributes = max(2, cpu_count)
logger.info(
"Configured thread count limits from cpu_count=%d as "
"_max_threads_get_cases_for_api_group=%d _max_threads_resolve_case_list=%d "
"_max_threads_resolve_case_attributes=%d",
cpu_count,
self._max_threads_get_cases_for_api_group,
self._max_threads_resolve_case_list,
self._max_threads_resolve_case_attributes,
)
preprocessed_cases = preprocess_data(raw_cases, group)
return preprocessed_cases

def get_submissions(self):
subs = fetch_open_submissions(self.request.user.bsn)
subs.sort(key=lambda sub: sub.datum_laatste_wijziging, reverse=True)

return subs

def get_case_status_frequencies(self):
cases = self.get_cases()
submissions = self.get_submissions()

case_statuses = [case.zaak.status_text for case in cases]

# add static text for open submissions
case_statuses += [SUBMISSION_STATUS_OPEN for submission in submissions]

return {status: case_statuses.count(status) for status in case_statuses}

def get_cases(self) -> list[ZaakWithApiGroup]:
all_api_groups = list(ZGWApiGroupConfig.objects.all())

with parallel() as executor:
with parallel(
max_workers=self._max_threads_get_cases_for_api_group
) as executor:
futures = [
executor.submit(self.get_cases_for_api_group, group)
executor.submit(self._get_cases_for_api_group, group)
for group in all_api_groups
]

cases_with_api_group = []
for task in concurrent.futures.as_completed(futures):
for task in concurrent.futures.as_completed(
futures,
timeout=self._cumulative_case_fetching_timeout,
):
group_for_task = all_api_groups[futures.index(task)]
try:
group_for_task = all_api_groups[futures.index(task)]
for row in task.result():
cases_with_api_group.append(
ZaakWithApiGroup(zaak=row, api_group=group_for_task)
)
except BaseException:
logger.exception("Error fetching and pre-processing cases")
logger.exception(
"Error while fetching and pre-processing cases for API group %s",
group_for_task,
)

# Ensure stable sorting for pagination and testing purposes
cases_with_api_group.sort(key=lambda c: all_api_groups.index(c.api_group))

return cases_with_api_group

def get_submissions(self):
subs = fetch_open_submissions(self.request.user.bsn)
subs.sort(key=lambda sub: sub.datum_laatste_wijziging, reverse=True)
def _get_cases_for_api_group(self, group: ZGWApiGroupConfig):
raw_cases = group.zaken_client.fetch_cases(
**get_user_fetch_parameters(self.request)
)
preprocessed_cases = self.resolve_cases(raw_cases, group)
return preprocessed_cases

return subs
def resolve_cases(
self,
cases: list[Zaak],
group: ZGWApiGroupConfig,
) -> list[Zaak]:
with parallel(max_workers=self._max_threads_resolve_case_list) as executor:
futures = [
executor.submit(self.resolve_case, case, group) for case in cases
]

def get_case_status_frequencies(self):
cases = self.get_cases()
submissions = self.get_submissions()
concurrent.futures.wait(futures, timeout=self._fetch_task_timeout)

cases = [case for case in cases if case.status and is_zaak_visible(case)]
cases.sort(key=lambda case: case.startdatum, reverse=True)

return cases

def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig):
logger.debug("Resolving case %s with group %s", case.identificatie, group)

functions = [
functools.partial(
self._resolve_resultaat_and_resultaat_type,
zaken_client=group.zaken_client,
catalogi_client=group.catalogi_client,
),
functools.partial(
self._resolve_status_and_status_type,
zaken_client=group.zaken_client,
catalogi_client=group.catalogi_client,
),
functools.partial(
self._resolve_zaak_type_and_configs,
client=group.catalogi_client,
),
]

case_statuses = [case.zaak.status_text for case in cases]
# use contextmanager to ensure the `requests.Session` is reused
with group.catalogi_client, group.zaken_client:
with parallel(
max_workers=self._max_threads_resolve_case_attributes
) as executor:
futures = [executor.submit(func, case) for func in functions]

for task in concurrent.futures.as_completed(
futures, timeout=self._fetch_task_timeout
):
if exc := task.exception():
logger.error("Error in resolving case: %s", exc, stack_info=True)

update_case = task.result()
if hasattr(update_case, "__call__"):
update_case(case)

try:
zaaktype_config = ZaakTypeConfig.objects.filter_case_type(
case.zaaktype
).get()

case.zaaktype_config = zaaktype_config

if zaaktype_config:
statustype_config = ZaakTypeStatusTypeConfig.objects.get(
zaaktype_config=zaaktype_config,
statustype_url=case.status.statustype.url,
)
case.statustype_config = statustype_config
except (
ZaakTypeConfig.DoesNotExist,
AttributeError,
ZaakTypeStatusTypeConfig.DoesNotExist,
):
logger.exception(
"Unable to resolve zaaktype_config and statustype_config for case %s",
case.identificatie,
)

# add static text for open submissions
case_statuses += [SUBMISSION_STATUS_OPEN for submission in submissions]
@staticmethod
def _resolve_zaak_type_and_configs(
case: Zaak, *, client: CatalogiClient
) -> Callable[[Zaak], None] | None:
"""
Resolve `case.zaaktype` (`str`) to a `ZaakType(ZGWModel)` object
Note: the result of `fetch_single_case_type` is cached, hence a request
is only made for new case type urls
"""
if not isinstance(case.zaaktype, str):
logger.debug("Case %s already has a resolved zaaktype", case.identificatie)
return

case_type = client.fetch_single_case_type(case.zaaktype)
if not case_type:
logger.error("Unable to resolve zaaktype for url: %s", case.zaaktype)
return

def setter(target_case: Zaak):
target_case.zaaktype = case_type

return setter

@staticmethod
def _resolve_status_and_status_type(
case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient
) -> Callable[[Zaak], None] | None:
if not isinstance(case.status, str):
logger.error(
"`case.status` for case %s is not a str but %s",
case.identificatie,
type(case.status),
)
return

status = zaken_client.fetch_single_status(case.status)
if not status:
logger.error(
"Unable to resolve status %s for case %s",
case.status,
case.identificatie,
)
return None

status_type = catalogi_client.fetch_single_status_type(status.statustype)
if not status_type:
logger.error(
"Unable to resolve status_type %s for case %s",
status.statustype,
case.identificatie,
)
return None

def setter(target_case: Zaak):
target_case.status = status
target_case.status.statustype = status_type

return setter

@staticmethod
def _resolve_resultaat_and_resultaat_type(
case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient
) -> Callable[[Zaak], None] | None:
if case.resultaat is None:
return

if not isinstance(case.resultaat, str):
logger.error(
"`case.resultaat` for case %s is not a str but %s",
case.identificatie,
type(case.resultaat),
)
return

return {status: case_statuses.count(status) for status in case_statuses}
resultaat = zaken_client.fetch_single_result(case.resultaat)
if not resultaat:
logger.error("Unable to fetch resultaat for %s", case)
return

resultaattype = catalogi_client.fetch_single_resultaat_type(
resultaat.resultaattype
)
if not resultaattype:
logger.error(
"Unable to resolve resultaattype for %s", resultaat.resultaattype
)
return

def setter(target_case: Zaak):
target_case.resultaat = resultaat
target_case.resultaat.resultaattype = resultaattype

return setter


class OuterCaseListView(
Expand Down
5 changes: 0 additions & 5 deletions src/open_inwoner/conf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,3 @@
"description": "string representing the (absolute) path to a file, including file extension",
},
]

#
# Project specific settings
#
CASE_LIST_NUM_THREADS = 6
Loading

0 comments on commit a9d8902

Please sign in to comment.