diff --git a/src/open_inwoner/cms/cases/views/cases.py b/src/open_inwoner/cms/cases/views/cases.py index 4c8f3d686d..d7f806bdab 100644 --- a/src/open_inwoner/cms/cases/views/cases.py +++ b/src/open_inwoner/cms/cases/views/cases.py @@ -1,9 +1,6 @@ -import concurrent.futures -import enum import logging -from dataclasses import dataclass +from typing import Sequence -from django.http import HttpRequest from django.urls import reverse from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ @@ -11,149 +8,19 @@ from furl import furl from view_breadcrumbs import BaseBreadcrumbMixin -from zgw_consumers.concurrent import parallel from open_inwoner.htmx.mixins import RequiresHtmxMixin -from open_inwoner.openzaak.api_models import OpenSubmission, Zaak -from open_inwoner.openzaak.cases import preprocess_data -from open_inwoner.openzaak.models import OpenZaakConfig, ZGWApiGroupConfig +from open_inwoner.openzaak.models import OpenZaakConfig from open_inwoner.openzaak.types import UniformCase -from open_inwoner.openzaak.utils import get_user_fetch_parameters from open_inwoner.utils.mixins import PaginationMixin from open_inwoner.utils.views import CommonPageMixin from .mixins import CaseAccessMixin, CaseLogMixin, OuterCaseAccessMixin +from .services import CaseFilterFormOption, CaseListService logger = logging.getLogger(__name__) -class CaseFilterFormOption(enum.Enum): - OPEN_SUBMISSION = _("Openstaande formulieren") - OPEN_CASE = _("Lopende aanvragen") - CLOSED_CASE = _("Afgeronde aanvragen") - - -@dataclass(frozen=True) -class ZaakWithApiGroup(UniformCase): - zaak: Zaak - api_group: ZGWApiGroupConfig - - @property - def identifier(self): - return self.zaak.url - - def process_data(self) -> dict: - return {**self.zaak.process_data(), "api_group": self.api_group} - - -@dataclass(frozen=True) -class SubmissionWithApiGroup: - submission: OpenSubmission - api_group: ZGWApiGroupConfig - - @property - def identifier(self): - return self.submission.url - - def process_data(self) -> dict: - return {**self.submission.process_data(), "api_group": self.api_group} - - -class CaseListService: - def __init__(self, request: HttpRequest): - self.request = request - - def get_cases_for_api_group(self, group: ZGWApiGroupConfig) -> list[UniformCase]: - raw_cases = group.zaken_client.fetch_cases( - **get_user_fetch_parameters(self.request, check_rsin=True) - ) - preprocessed_cases = preprocess_data(raw_cases, group) - return preprocessed_cases - - def get_submissions_for_api_group( - self, group: ZGWApiGroupConfig - ) -> list[UniformCase]: - return group.forms_client.fetch_open_submissions( - **get_user_fetch_parameters(self.request, check_rsin=False) - ) - - def get_cases(self) -> list[ZaakWithApiGroup]: - all_api_groups = list(ZGWApiGroupConfig.objects.all()) - - with parallel() as executor: - futures = [ - executor.submit(self.get_cases_for_api_group, group) - for group in all_api_groups - ] - - cases_with_api_group = [] - for task in concurrent.futures.as_completed(futures): - try: - group_for_task = all_api_groups[futures.index(task)] - for row in task.result(): - cases_with_api_group.append( - ZaakWithApiGroup(zaak=row, api_group=group_for_task) - ) - except BaseException: - logger.exception("Error fetching and pre-processing cases") - - # Ensure stable sorting for pagination and testing purposes - cases_with_api_group.sort(key=lambda c: all_api_groups.index(c.api_group)) - - return cases_with_api_group - - def get_submissions(self): - all_api_groups = list( - ZGWApiGroupConfig.objects.exclude(form_service__isnull=True) - ) - - with parallel() as executor: - futures = [ - executor.submit(self.get_submissions_for_api_group, group) - for group in all_api_groups - ] - - subs_with_api_group = [] - for task in concurrent.futures.as_completed(futures): - try: - group_for_task = all_api_groups[futures.index(task)] - for row in task.result(): - subs_with_api_group.append( - SubmissionWithApiGroup( - submission=row, api_group=group_for_task - ) - ) - except BaseException: - logger.exception("Error fetching and pre-processing cases") - - # Sort submissions by date modified - subs_with_api_group.sort( - key=lambda sub: sub.submission.datum_laatste_wijziging, reverse=True - ) - - return subs_with_api_group - - @staticmethod - def get_case_filter_status(zaak: Zaak) -> CaseFilterFormOption: - if zaak.einddatum: - return CaseFilterFormOption.CLOSED_CASE - - return CaseFilterFormOption.OPEN_CASE - - def get_case_status_frequencies(self) -> dict[CaseFilterFormOption, int]: - cases = self.get_cases() - submissions = self.get_submissions() - - case_statuses = [self.get_case_filter_status(case.zaak) for case in cases] - - # add static text for open submissions - case_statuses += [CaseFilterFormOption.OPEN_SUBMISSION for _ in submissions] - - return { - status: case_statuses.count(status) for status in list(CaseFilterFormOption) - } - - class OuterCaseListView( OuterCaseAccessMixin, CommonPageMixin, BaseBreadcrumbMixin, TemplateView ): @@ -203,8 +70,8 @@ def get_context_data(self, **kwargs): context["filter_form_enabled"] = config.zaken_filter_enabled # update ctx with open submissions and cases (possibly fitered) - open_submissions: list[UniformCase] = case_service.get_submissions() - preprocessed_cases: list[UniformCase] = case_service.get_cases() + open_submissions: Sequence[UniformCase] = case_service.get_submissions() + preprocessed_cases: Sequence[UniformCase] = case_service.get_cases() if config.zaken_filter_enabled: case_status_frequencies = case_service.get_case_status_frequencies() diff --git a/src/open_inwoner/cms/cases/views/services.py b/src/open_inwoner/cms/cases/views/services.py new file mode 100644 index 0000000000..c1aa3a7f8e --- /dev/null +++ b/src/open_inwoner/cms/cases/views/services.py @@ -0,0 +1,367 @@ +import concurrent.futures +import enum +import functools +import logging +from dataclasses import dataclass +from typing import Callable, TypedDict + +from django.http import HttpRequest +from django.utils.translation import gettext_lazy as _ + +from zgw_consumers.concurrent import parallel + +from open_inwoner.openzaak.api_models import OpenSubmission, Zaak +from open_inwoner.openzaak.clients import CatalogiClient, ZakenClient +from open_inwoner.openzaak.models import ( + ZaakTypeConfig, + ZaakTypeStatusTypeConfig, + ZGWApiGroupConfig, +) +from open_inwoner.openzaak.utils import get_user_fetch_parameters, is_zaak_visible + +logger = logging.getLogger(__name__) + + +class CaseFilterFormOption(enum.Enum): + OPEN_SUBMISSION = _("Openstaande formulieren") + OPEN_CASE = _("Lopende aanvragen") + CLOSED_CASE = _("Afgeronde aanvragen") + + +@dataclass(frozen=True) +class ZaakWithApiGroup: + zaak: Zaak + api_group: ZGWApiGroupConfig + + @property + def identification(self) -> str: + return self.zaak.url + + def process_data(self) -> dict: + return {**self.zaak.process_data(), "api_group": self.api_group} + + +@dataclass(frozen=True) +class SubmissionWithApiGroup: + submission: OpenSubmission + api_group: ZGWApiGroupConfig + + @property + def identification(self) -> str: + return self.submission.url + + def process_data(self) -> dict: + return {**self.submission.process_data(), "api_group": self.api_group} + + +class ThreadLimits(TypedDict): + zgw_api_groups: int + resolve_case_list: int + resolve_case_instance: int + + +class CaseListService: + request: HttpRequest + _thread_limits: ThreadLimits + _cumulative_case_fetching_timeout: int = 60 + _fetch_task_timeout: int = 6 + + def __init__(self, request: HttpRequest): + self.request = request + + # TODO: Ideally, this would be configured in light of: + # - a configured maximum number of threads and + # - the number of API groups configured + # + # However, distributing the available threads optimally in light of both + # constraints is not trivial, due to the total thread count being + # subject to cascading effects (each nested count is a multiple of its + # parent count). Hence, we provide some sane defaults for both the 1 and + # >1 case of API group count. Even with a small CPU count, these numbers + # should be fine, as the threads are primarily IO-bound. + if ZGWApiGroupConfig.objects.count() > 1: + self._thread_limits = { + "zgw_api_groups": 2, + "resolve_case_list": 1, + "resolve_case_instance": 3, + } + else: + self._thread_limits = { + "zgw_api_groups": 1, + "resolve_case_list": 2, + "resolve_case_instance": 3, + } + + logger.info("Configured thread limits as %s", self._thread_limits) + + def get_submissions_for_api_group( + self, group: ZGWApiGroupConfig + ) -> list[OpenSubmission]: + if not group.forms_client: + raise ValueError(f"{group} has no `forms_client`") + + return group.forms_client.fetch_open_submissions( + **get_user_fetch_parameters(self.request, check_rsin=False) + ) + + def get_submissions(self) -> list[SubmissionWithApiGroup]: + all_api_groups = list( + ZGWApiGroupConfig.objects.exclude(form_service__isnull=True) + ) + + with parallel() as executor: + futures = [ + executor.submit(self.get_submissions_for_api_group, group) + for group in all_api_groups + ] + + subs_with_api_group: list[SubmissionWithApiGroup] = [] + for task in concurrent.futures.as_completed(futures): + try: + group_for_task = all_api_groups[futures.index(task)] + for row in task.result(): + subs_with_api_group.append( + SubmissionWithApiGroup( + submission=row, api_group=group_for_task + ) + ) + except BaseException: + logger.exception("Error fetching and pre-processing cases") + + # Sort submissions by date modified + subs_with_api_group.sort( + key=lambda sub: sub.submission.datum_laatste_wijziging, reverse=True + ) + + return subs_with_api_group + + @staticmethod + def get_case_filter_status(zaak: Zaak) -> CaseFilterFormOption: + if zaak.einddatum: + return CaseFilterFormOption.CLOSED_CASE + + return CaseFilterFormOption.OPEN_CASE + + def get_case_status_frequencies(self) -> dict[CaseFilterFormOption, int]: + cases = self.get_cases() + submissions = self.get_submissions() + + case_statuses = [self.get_case_filter_status(case.zaak) for case in cases] + + # add static text for open submissions + case_statuses += [CaseFilterFormOption.OPEN_SUBMISSION for _ in submissions] + + return { + status: case_statuses.count(status) for status in list(CaseFilterFormOption) + } + + def get_cases(self) -> list[ZaakWithApiGroup]: + all_api_groups = list(ZGWApiGroupConfig.objects.all()) + + with parallel(max_workers=self._thread_limits["zgw_api_groups"]) as executor: + futures = [ + executor.submit(self._get_cases_for_api_group, group) + for group in all_api_groups + ] + + cases_with_api_group = [] + for task in concurrent.futures.as_completed( + futures, + timeout=self._cumulative_case_fetching_timeout, + ): + group_for_task = all_api_groups[futures.index(task)] + try: + for row in task.result(): + cases_with_api_group.append( + ZaakWithApiGroup(zaak=row, api_group=group_for_task) + ) + except BaseException: + logger.exception( + "Error while fetching and pre-processing cases for API group %s", + group_for_task, + ) + + # Ensure stable sorting for pagination and testing purposes + cases_with_api_group.sort(key=lambda c: all_api_groups.index(c.api_group)) + + return cases_with_api_group + + def _get_cases_for_api_group(self, group: ZGWApiGroupConfig): + raw_cases = group.zaken_client.fetch_cases( + **get_user_fetch_parameters(self.request) + ) + preprocessed_cases = self.resolve_cases(raw_cases, group) + return preprocessed_cases + + def resolve_cases( + self, + cases: list[Zaak], + group: ZGWApiGroupConfig, + ) -> list[Zaak]: + with parallel(max_workers=self._thread_limits["resolve_case_list"]) as executor: + futures = [ + executor.submit(self.resolve_case, case, group) for case in cases + ] + + concurrent.futures.wait(futures, timeout=self._fetch_task_timeout) + + cases = [case for case in cases if case.status and is_zaak_visible(case)] + cases.sort(key=lambda case: case.startdatum, reverse=True) + + return cases + + def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig): + logger.debug("Resolving case %s with group %s", case.identificatie, group) + + functions = [ + functools.partial( + self._resolve_resultaat_and_resultaat_type, + zaken_client=group.zaken_client, + catalogi_client=group.catalogi_client, + ), + functools.partial( + self._resolve_status_and_status_type, + zaken_client=group.zaken_client, + catalogi_client=group.catalogi_client, + ), + functools.partial( + self._resolve_zaak_type, + client=group.catalogi_client, + ), + ] + + # use contextmanager to ensure the `requests.Session` is reused + with group.catalogi_client, group.zaken_client: + with parallel( + max_workers=self._thread_limits["resolve_case_instance"] + ) as executor: + futures = [executor.submit(func, case) for func in functions] + + for task in concurrent.futures.as_completed( + futures, timeout=self._fetch_task_timeout + ): + if exc := task.exception(): + logger.error("Error in resolving case: %s", exc, stack_info=True) + + update_case = task.result() + if hasattr(update_case, "__call__"): + update_case(case) + + try: + zaaktype_config = ZaakTypeConfig.objects.filter_case_type( + case.zaaktype + ).get() + + case.zaaktype_config = zaaktype_config + + if zaaktype_config: + statustype_config = ZaakTypeStatusTypeConfig.objects.get( + zaaktype_config=zaaktype_config, + statustype_url=case.status.statustype.url, + ) + case.statustype_config = statustype_config + except ( + ZaakTypeConfig.DoesNotExist, + AttributeError, + ZaakTypeStatusTypeConfig.DoesNotExist, + ): + logger.exception( + "Unable to resolve zaaktype_config and statustype_config for case %s", + case.identificatie, + exc_info=True, + ) + + @staticmethod + def _resolve_zaak_type( + case: Zaak, *, client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + """ + Resolve `case.zaaktype` (`str`) to a `ZaakType(ZGWModel)` object + + Note: the result of `fetch_single_case_type` is cached, hence a request + is only made for new case type urls + """ + if not isinstance(case.zaaktype, str): + logger.debug("Case %s already has a resolved zaaktype", case.identificatie) + return + + case_type = client.fetch_single_case_type(case.zaaktype) + if not case_type: + logger.error("Unable to resolve zaaktype for url: %s", case.zaaktype) + return + + def setter(target_case: Zaak): + target_case.zaaktype = case_type + + return setter + + @staticmethod + def _resolve_status_and_status_type( + case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + if not isinstance(case.status, str): + logger.error( + "`case.status` for case %s is not a str but %s", + case.identificatie, + type(case.status), + ) + return + + status = zaken_client.fetch_single_status(case.status) + if not status: + logger.error( + "Unable to resolve status %s for case %s", + case.status, + case.identificatie, + ) + return None + + status_type = catalogi_client.fetch_single_status_type(status.statustype) + if not status_type: + logger.error( + "Unable to resolve status_type %s for case %s", + status.statustype, + case.identificatie, + ) + return None + + def setter(target_case: Zaak): + target_case.status = status + target_case.status.statustype = status_type + + return setter + + @staticmethod + def _resolve_resultaat_and_resultaat_type( + case: Zaak, *, zaken_client: ZakenClient, catalogi_client: CatalogiClient + ) -> Callable[[Zaak], None] | None: + if case.resultaat is None: + return + + if not isinstance(case.resultaat, str): + logger.error( + "`case.resultaat` for case %s is not a str but %s", + case.identificatie, + type(case.resultaat), + ) + return + + resultaat = zaken_client.fetch_single_result(case.resultaat) + if not resultaat: + logger.error("Unable to fetch resultaat for %s", case) + return + + resultaattype = catalogi_client.fetch_single_resultaat_type( + resultaat.resultaattype + ) + if not resultaattype: + logger.error( + "Unable to resolve resultaattype for %s", resultaat.resultaattype + ) + return + + def setter(target_case: Zaak): + target_case.resultaat = resultaat + target_case.resultaat.resultaattype = resultaattype + + return setter diff --git a/src/open_inwoner/conf/base.py b/src/open_inwoner/conf/base.py index 5c10dab7f4..cd7f2f7675 100644 --- a/src/open_inwoner/conf/base.py +++ b/src/open_inwoner/conf/base.py @@ -1031,8 +1031,3 @@ "description": "string representing the (absolute) path to a file, including file extension", }, ] - -# -# Project specific settings -# -CASE_LIST_NUM_THREADS = 6 diff --git a/src/open_inwoner/openzaak/cases.py b/src/open_inwoner/openzaak/cases.py deleted file mode 100644 index 25045f708b..0000000000 --- a/src/open_inwoner/openzaak/cases.py +++ /dev/null @@ -1,127 +0,0 @@ -import concurrent.futures -import logging - -from django.conf import settings - -from zgw_consumers.concurrent import parallel - -from .api_models import Zaak -from .clients import CatalogiClient, ZakenClient -from .models import ZaakTypeConfig, ZaakTypeStatusTypeConfig, ZGWApiGroupConfig -from .utils import is_zaak_visible - -logger = logging.getLogger(__name__) - - -def resolve_zaak_type(case: Zaak, client: CatalogiClient) -> None: - """ - Resolve `case.zaaktype` (`str`) to a `ZaakType(ZGWModel)` object - - Note: the result of `fetch_single_case_type` is cached, hence a request - is only made for new case type urls - """ - case_type_url = case.zaaktype - if client: - case_type = client.fetch_single_case_type(case_type_url) - case.zaaktype = case_type - - -def resolve_status(case: Zaak, client: ZakenClient | None = None) -> None: - """ - Resolve `case.status` (`str`) to a `Status(ZGWModel)` object - """ - if client: - case.status = client.fetch_single_status(case.status) - - -def resolve_status_type(case: Zaak, client: CatalogiClient | None = None) -> None: - """ - Resolve `case.status.statustype` (`str`) to a `StatusType(ZGWModel)` object - """ - statustype_url = case.status.statustype - if client: - case.status.statustype = client.fetch_single_status_type(statustype_url) - - -def resolve_resultaat(case: Zaak, client: ZakenClient | None = None) -> None: - """ - Resolve `case.resultaat` (`str`) to a `Resultaat(ZGWModel)` object - """ - if case.resultaat: - case.resultaat = client.fetch_single_result(case.resultaat) - - -def resolve_resultaat_type(case: Zaak, client: CatalogiClient | None = None) -> None: - """ - Resolve `case.resultaat.resultaattype` (`str`) to a `ResultaatType(ZGWModel)` object - """ - if client and case.resultaat: - case.resultaat.resultaattype = client.fetch_single_resultaat_type( - case.resultaat.resultaattype - ) - - -def add_zaak_type_config(case: Zaak) -> None: - """ - Add `ZaakTypeConfig` corresponding to the zaaktype type url of the case - - Note: must be called after `resolve_zaak_type` since we're using the `uuid` and - `identificatie` from `case.zaaktype` - """ - try: - case.zaaktype_config = ZaakTypeConfig.objects.filter_case_type( - case.zaaktype - ).get() - except ZaakTypeConfig.DoesNotExist: - pass - - -def add_status_type_config(case: Zaak) -> None: - """ - Add `ZaakTypeStatusTypeConfig` corresponding to the status type url of the case - - Note: must be called after `resolve_status_type` since we're getting the - status type url from `case.status.statustype` - """ - try: - case.statustype_config = ZaakTypeStatusTypeConfig.objects.get( - zaaktype_config=case.zaaktype_config, - statustype_url=case.status.statustype.url, - ) - except (AttributeError, ZaakTypeStatusTypeConfig.DoesNotExist): - pass - - -def preprocess_data(cases: list[Zaak], group: ZGWApiGroupConfig) -> list[Zaak]: - """ - Resolve zaaktype and statustype, add status type config, filter for visibility - - Note: we need to iterate twice over `cases` because the `zaak_type` must be - resolved to a `ZaakType` object before we can filter by visibility - """ - - def preprocess_case(case: Zaak) -> None: - resolve_status(case, client=group.zaken_client) - resolve_status_type(case, client=group.catalogi_client) - resolve_resultaat(case, client=group.zaken_client) - resolve_resultaat_type(case, client=group.catalogi_client) - add_zaak_type_config(case) - add_status_type_config(case) - - # use contextmanager to ensure the `requests.Session` is reused - with group.catalogi_client, group.zaken_client: - with parallel(max_workers=settings.CASE_LIST_NUM_THREADS) as executor: - futures = [ - executor.submit(resolve_zaak_type, case, client=group.catalogi_client) - for case in cases - ] - concurrent.futures.wait(futures) - - cases = [case for case in cases if case.status and is_zaak_visible(case)] - - futures = [executor.submit(preprocess_case, case) for case in cases] - concurrent.futures.wait(futures) - - cases.sort(key=lambda case: case.startdatum, reverse=True) - - return cases diff --git a/src/open_inwoner/openzaak/clients.py b/src/open_inwoner/openzaak/clients.py index b727826c6b..34cf75626f 100644 --- a/src/open_inwoner/openzaak/clients.py +++ b/src/open_inwoner/openzaak/clients.py @@ -677,7 +677,7 @@ def fetch_open_submissions( vestigingsnummer: str | None = None, max_requests: int = 4, **kwargs, - ): + ) -> list[OpenSubmission]: if user_bsn and (user_kvk or vestigingsnummer): raise ValueError( "either `user_bsn` or `user_kvk` (optionally with `vestigingsnummer`) " diff --git a/src/open_inwoner/openzaak/models.py b/src/open_inwoner/openzaak/models.py index c44926f3b2..07edbaf54e 100644 --- a/src/open_inwoner/openzaak/models.py +++ b/src/open_inwoner/openzaak/models.py @@ -1,7 +1,7 @@ import logging import warnings from datetime import timedelta -from typing import Protocol +from typing import Protocol, cast from urllib.parse import urlparse from django.db import models, transaction @@ -199,7 +199,9 @@ def _build_client_from_attr(self, attr: str): @property def zaken_client(self): - return self._build_client_from_attr("zrc_service") + from .clients import ZakenClient + + return cast(ZakenClient, self._build_client_from_attr("zrc_service")) drc_service = models.ForeignKey( "zgw_consumers.Service", @@ -213,7 +215,9 @@ def zaken_client(self): @property def documenten_client(self): - return self._build_client_from_attr("drc_service") + from .clients import DocumentenClient + + return cast(DocumentenClient, self._build_client_from_attr("drc_service")) ztc_service = models.ForeignKey( "zgw_consumers.Service", @@ -227,7 +231,9 @@ def documenten_client(self): @property def catalogi_client(self): - return self._build_client_from_attr("ztc_service") + from .clients import CatalogiClient + + return cast(CatalogiClient, self._build_client_from_attr("ztc_service")) form_service = models.OneToOneField( "zgw_consumers.Service", @@ -241,8 +247,10 @@ def catalogi_client(self): @property def forms_client(self): + from .clients import FormClient + if self.form_service: - return self._build_client_from_attr("form_service") + return cast(FormClient, self._build_client_from_attr("form_service")) class Meta: verbose_name = _("ZGW API set") diff --git a/src/open_inwoner/openzaak/notifications.py b/src/open_inwoner/openzaak/notifications.py index 2a4bcca152..4b6ae822a5 100644 --- a/src/open_inwoner/openzaak/notifications.py +++ b/src/open_inwoner/openzaak/notifications.py @@ -17,7 +17,6 @@ ZaakInformatieObject, ZaakType, ) -from open_inwoner.openzaak.cases import resolve_status from open_inwoner.openzaak.clients import CatalogiClient, ZakenClient from open_inwoner.openzaak.documents import fetch_single_information_object_from_url from open_inwoner.openzaak.models import ( @@ -139,7 +138,6 @@ def send_case_update_email( status: Status | None = None, extra_context: dict = None, ): - """ send the actual mail """ @@ -529,7 +527,11 @@ def _handle_status_notification( if not (ztc := _check_zaaktype_config(notification, case, oz_config)): return - resolve_status(case, client=zaken_client) + status = zaken_client.fetch_single_status(case.status) + if not status: + logger.error("Unable to fetch status for %s", case.status) + + case.status = status if not (status_type_config := _check_statustype_config(notification, case, ztc)): return