From a9d8902fc325305a5588bdc0347f4e850118e60c Mon Sep 17 00:00:00 2001 From: Sidney Richards Date: Thu, 19 Sep 2024 14:35:35 +0200 Subject: [PATCH] [#2764] Resolve cases in case list concurrently Resolving cases involves several sequential network requests, which creates a lot of latency when loading a lost of cases. This commit runs the resolving concurrently, and also centralizes the logic in the CaseListService. --- src/open_inwoner/cms/cases/views/cases.py | 268 ++++++++++++++++++++-- src/open_inwoner/conf/base.py | 5 - src/open_inwoner/openzaak/cases.py | 127 ---------- 3 files changed, 244 insertions(+), 156 deletions(-) delete mode 100644 src/open_inwoner/openzaak/cases.py diff --git a/src/open_inwoner/cms/cases/views/cases.py b/src/open_inwoner/cms/cases/views/cases.py index 4dce8399ac..61d1864a4d 100644 --- a/src/open_inwoner/cms/cases/views/cases.py +++ b/src/open_inwoner/cms/cases/views/cases.py @@ -1,6 +1,9 @@ import concurrent.futures +import functools import logging +import os from dataclasses import dataclass +from typing import Callable from django.http import HttpRequest from django.urls import reverse @@ -14,11 +17,16 @@ from open_inwoner.htmx.mixins import RequiresHtmxMixin from open_inwoner.openzaak.api_models import Zaak -from open_inwoner.openzaak.cases import preprocess_data +from open_inwoner.openzaak.clients import CatalogiClient, ZakenClient from open_inwoner.openzaak.formapi import fetch_open_submissions -from open_inwoner.openzaak.models import OpenZaakConfig, ZGWApiGroupConfig +from open_inwoner.openzaak.models import ( + OpenZaakConfig, + ZaakTypeConfig, + ZaakTypeStatusTypeConfig, + ZGWApiGroupConfig, +) from open_inwoner.openzaak.types import UniformCase -from open_inwoner.openzaak.utils import get_user_fetch_parameters +from open_inwoner.openzaak.utils import get_user_fetch_parameters, is_zaak_visible from open_inwoner.utils.mixins import PaginationMixin from open_inwoner.utils.views import CommonPageMixin @@ -45,57 +53,269 @@ def process_data(self) -> dict: class CaseListService: + request: HttpRequest + + _max_threads_get_cases_for_api_group: int + _max_threads_resolve_case_list: int + _max_threads_resolve_case_attributes: int + + _cumulative_case_fetching_timeout: int = 60 + _fetch_task_timeout: int = 6 + def __init__(self, request: HttpRequest): self.request = request - def get_cases_for_api_group(self, group: ZGWApiGroupConfig): - raw_cases = group.zaken_client.fetch_cases( - **get_user_fetch_parameters(self.request) + # Note that the the total number of threads is the product of all these + # thread counts. Hence, it is important to be cautious when increasing + # `_max_threads_get_cases_for_api_group`, as this will quickly + # increase the total number of threads used. It is also important to + # keep in mind that increasing `_max_threads_resolve_case_attributes`` + # might have diminishing returns, as this involves fetching e.g. status + # and zaaktypes, which are cached by the client and may overlap between + # different cases. The additional concurrency ought to be balanced + # against the prospect of the cache being filled early. + cpu_count = os.cpu_count() or 0 + self._max_threads_get_cases_for_api_group = 2 + self._max_threads_resolve_case_list = max(2, cpu_count) + self._max_threads_resolve_case_attributes = max(2, cpu_count) + logger.info( + "Configured thread count limits from cpu_count=%d as " + "_max_threads_get_cases_for_api_group=%d _max_threads_resolve_case_list=%d " + "_max_threads_resolve_case_attributes=%d", + cpu_count, + self._max_threads_get_cases_for_api_group, + self._max_threads_resolve_case_list, + self._max_threads_resolve_case_attributes, ) - preprocessed_cases = preprocess_data(raw_cases, group) - return preprocessed_cases + + def get_submissions(self): + subs = fetch_open_submissions(self.request.user.bsn) + subs.sort(key=lambda sub: sub.datum_laatste_wijziging, reverse=True) + + return subs + + def get_case_status_frequencies(self): + cases = self.get_cases() + submissions = self.get_submissions() + + case_statuses = [case.zaak.status_text for case in cases] + + # add static text for open submissions + case_statuses += [SUBMISSION_STATUS_OPEN for submission in submissions] + + return {status: case_statuses.count(status) for status in case_statuses} def get_cases(self) -> list[ZaakWithApiGroup]: all_api_groups = list(ZGWApiGroupConfig.objects.all()) - with parallel() as executor: + with parallel( + max_workers=self._max_threads_get_cases_for_api_group + ) as executor: futures = [ - executor.submit(self.get_cases_for_api_group, group) + executor.submit(self._get_cases_for_api_group, group) for group in all_api_groups ] cases_with_api_group = [] - for task in concurrent.futures.as_completed(futures): + for task in concurrent.futures.as_completed( + futures, + timeout=self._cumulative_case_fetching_timeout, + ): + group_for_task = all_api_groups[futures.index(task)] try: - group_for_task = all_api_groups[futures.index(task)] for row in task.result(): cases_with_api_group.append( ZaakWithApiGroup(zaak=row, api_group=group_for_task) ) except BaseException: - logger.exception("Error fetching and pre-processing cases") + logger.exception( + "Error while fetching and pre-processing cases for API group %s", + group_for_task, + ) # Ensure stable sorting for pagination and testing purposes cases_with_api_group.sort(key=lambda c: all_api_groups.index(c.api_group)) return cases_with_api_group - def get_submissions(self): - subs = fetch_open_submissions(self.request.user.bsn) - subs.sort(key=lambda sub: sub.datum_laatste_wijziging, reverse=True) + def _get_cases_for_api_group(self, group: ZGWApiGroupConfig): + raw_cases = group.zaken_client.fetch_cases( + **get_user_fetch_parameters(self.request) + ) + preprocessed_cases = self.resolve_cases(raw_cases, group) + return preprocessed_cases - return subs + def resolve_cases( + self, + cases: list[Zaak], + group: ZGWApiGroupConfig, + ) -> list[Zaak]: + with parallel(max_workers=self._max_threads_resolve_case_list) as executor: + futures = [ + executor.submit(self.resolve_case, case, group) for case in cases + ] - def get_case_status_frequencies(self): - cases = self.get_cases() - submissions = self.get_submissions() + concurrent.futures.wait(futures, timeout=self._fetch_task_timeout) + + cases = [case for case in cases if case.status and is_zaak_visible(case)] + cases.sort(key=lambda case: case.startdatum, reverse=True) + + return cases + + def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig): + logger.debug("Resolving case %s with group %s", case.identificatie, group) + + functions = [ + functools.partial( + self._resolve_resultaat_and_resultaat_type, + zaken_client=group.zaken_client, + catalogi_client=group.catalogi_client, + ), + functools.partial( + self._resolve_status_and_status_type, + zaken_client=group.zaken_client, + catalogi_client=group.catalogi_client, + ), + functools.partial( + self._resolve_zaak_type_and_configs, + client=group.catalogi_client, + ), + ] - case_statuses = [case.zaak.status_text for case in cases] + # use contextmanager to ensure the `requests.Session` is reused + with group.catalogi_client, group.zaken_client: + with parallel( + max_workers=self._max_threads_resolve_case_attributes + ) as executor: + futures = [executor.submit(func, case) for func in functions] + + for task in concurrent.futures.as_completed( + futures, timeout=self._fetch_task_timeout + ): + if exc := task.exception(): + logger.error("Error in resolving case: %s", exc, stack_info=True) + + update_case = task.result() + if hasattr(update_case, "__call__"): + update_case(case) + + try: + zaaktype_config = ZaakTypeConfig.objects.filter_case_type( + case.zaaktype + ).get() + + case.zaaktype_config = zaaktype_config + + if zaaktype_config: + statustype_config = ZaakTypeStatusTypeConfig.objects.get( + zaaktype_config=zaaktype_config, + statustype_url=case.status.statustype.url, + ) + case.statustype_config = statustype_config + except ( + ZaakTypeConfig.DoesNotExist, + AttributeError, + ZaakTypeStatusTypeConfig.DoesNotExist, + ): + logger.exception( + "Unable to resolve zaaktype_config and statustype_config for case %s", + case.identificatie, + ) - # add static text for open submissions - case_statuses += [SUBMISSION_STATUS_OPEN for submission in submissions] + @staticmethod + def _resolve_zaak_type_and_configs( + case: Zaak, *, client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + """ + Resolve `case.zaaktype` (`str`) to a `ZaakType(ZGWModel)` object + + Note: the result of `fetch_single_case_type` is cached, hence a request + is only made for new case type urls + """ + if not isinstance(case.zaaktype, str): + logger.debug("Case %s already has a resolved zaaktype", case.identificatie) + return + + case_type = client.fetch_single_case_type(case.zaaktype) + if not case_type: + logger.error("Unable to resolve zaaktype for url: %s", case.zaaktype) + return + + def setter(target_case: Zaak): + target_case.zaaktype = case_type + + return setter + + @staticmethod + def _resolve_status_and_status_type( + case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + if not isinstance(case.status, str): + logger.error( + "`case.status` for case %s is not a str but %s", + case.identificatie, + type(case.status), + ) + return + + status = zaken_client.fetch_single_status(case.status) + if not status: + logger.error( + "Unable to resolve status %s for case %s", + case.status, + case.identificatie, + ) + return None + + status_type = catalogi_client.fetch_single_status_type(status.statustype) + if not status_type: + logger.error( + "Unable to resolve status_type %s for case %s", + status.statustype, + case.identificatie, + ) + return None + + def setter(target_case: Zaak): + target_case.status = status + target_case.status.statustype = status_type + + return setter + + @staticmethod + def _resolve_resultaat_and_resultaat_type( + case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + if case.resultaat is None: + return + + if not isinstance(case.resultaat, str): + logger.error( + "`case.resultaat` for case %s is not a str but %s", + case.identificatie, + type(case.resultaat), + ) + return - return {status: case_statuses.count(status) for status in case_statuses} + resultaat = zaken_client.fetch_single_result(case.resultaat) + if not resultaat: + logger.error("Unable to fetch resultaat for %s", case) + return + + resultaattype = catalogi_client.fetch_single_resultaat_type( + resultaat.resultaattype + ) + if not resultaattype: + logger.error( + "Unable to resolve resultaattype for %s", resultaat.resultaattype + ) + return + + def setter(target_case: Zaak): + target_case.resultaat = resultaat + target_case.resultaat.resultaattype = resultaattype + + return setter class OuterCaseListView( diff --git a/src/open_inwoner/conf/base.py b/src/open_inwoner/conf/base.py index 5c10dab7f4..cd7f2f7675 100644 --- a/src/open_inwoner/conf/base.py +++ b/src/open_inwoner/conf/base.py @@ -1031,8 +1031,3 @@ "description": "string representing the (absolute) path to a file, including file extension", }, ] - -# -# Project specific settings -# -CASE_LIST_NUM_THREADS = 6 diff --git a/src/open_inwoner/openzaak/cases.py b/src/open_inwoner/openzaak/cases.py deleted file mode 100644 index 25045f708b..0000000000 --- a/src/open_inwoner/openzaak/cases.py +++ /dev/null @@ -1,127 +0,0 @@ -import concurrent.futures -import logging - -from django.conf import settings - -from zgw_consumers.concurrent import parallel - -from .api_models import Zaak -from .clients import CatalogiClient, ZakenClient -from .models import ZaakTypeConfig, ZaakTypeStatusTypeConfig, ZGWApiGroupConfig -from .utils import is_zaak_visible - -logger = logging.getLogger(__name__) - - -def resolve_zaak_type(case: Zaak, client: CatalogiClient) -> None: - """ - Resolve `case.zaaktype` (`str`) to a `ZaakType(ZGWModel)` object - - Note: the result of `fetch_single_case_type` is cached, hence a request - is only made for new case type urls - """ - case_type_url = case.zaaktype - if client: - case_type = client.fetch_single_case_type(case_type_url) - case.zaaktype = case_type - - -def resolve_status(case: Zaak, client: ZakenClient | None = None) -> None: - """ - Resolve `case.status` (`str`) to a `Status(ZGWModel)` object - """ - if client: - case.status = client.fetch_single_status(case.status) - - -def resolve_status_type(case: Zaak, client: CatalogiClient | None = None) -> None: - """ - Resolve `case.status.statustype` (`str`) to a `StatusType(ZGWModel)` object - """ - statustype_url = case.status.statustype - if client: - case.status.statustype = client.fetch_single_status_type(statustype_url) - - -def resolve_resultaat(case: Zaak, client: ZakenClient | None = None) -> None: - """ - Resolve `case.resultaat` (`str`) to a `Resultaat(ZGWModel)` object - """ - if case.resultaat: - case.resultaat = client.fetch_single_result(case.resultaat) - - -def resolve_resultaat_type(case: Zaak, client: CatalogiClient | None = None) -> None: - """ - Resolve `case.resultaat.resultaattype` (`str`) to a `ResultaatType(ZGWModel)` object - """ - if client and case.resultaat: - case.resultaat.resultaattype = client.fetch_single_resultaat_type( - case.resultaat.resultaattype - ) - - -def add_zaak_type_config(case: Zaak) -> None: - """ - Add `ZaakTypeConfig` corresponding to the zaaktype type url of the case - - Note: must be called after `resolve_zaak_type` since we're using the `uuid` and - `identificatie` from `case.zaaktype` - """ - try: - case.zaaktype_config = ZaakTypeConfig.objects.filter_case_type( - case.zaaktype - ).get() - except ZaakTypeConfig.DoesNotExist: - pass - - -def add_status_type_config(case: Zaak) -> None: - """ - Add `ZaakTypeStatusTypeConfig` corresponding to the status type url of the case - - Note: must be called after `resolve_status_type` since we're getting the - status type url from `case.status.statustype` - """ - try: - case.statustype_config = ZaakTypeStatusTypeConfig.objects.get( - zaaktype_config=case.zaaktype_config, - statustype_url=case.status.statustype.url, - ) - except (AttributeError, ZaakTypeStatusTypeConfig.DoesNotExist): - pass - - -def preprocess_data(cases: list[Zaak], group: ZGWApiGroupConfig) -> list[Zaak]: - """ - Resolve zaaktype and statustype, add status type config, filter for visibility - - Note: we need to iterate twice over `cases` because the `zaak_type` must be - resolved to a `ZaakType` object before we can filter by visibility - """ - - def preprocess_case(case: Zaak) -> None: - resolve_status(case, client=group.zaken_client) - resolve_status_type(case, client=group.catalogi_client) - resolve_resultaat(case, client=group.zaken_client) - resolve_resultaat_type(case, client=group.catalogi_client) - add_zaak_type_config(case) - add_status_type_config(case) - - # use contextmanager to ensure the `requests.Session` is reused - with group.catalogi_client, group.zaken_client: - with parallel(max_workers=settings.CASE_LIST_NUM_THREADS) as executor: - futures = [ - executor.submit(resolve_zaak_type, case, client=group.catalogi_client) - for case in cases - ] - concurrent.futures.wait(futures) - - cases = [case for case in cases if case.status and is_zaak_visible(case)] - - futures = [executor.submit(preprocess_case, case) for case in cases] - concurrent.futures.wait(futures) - - cases.sort(key=lambda case: case.startdatum, reverse=True) - - return cases