Skip to content

Commit

Permalink
[#2795] More fine-grained error handling in concurrent case fetching
Browse files Browse the repository at this point in the history
These changes are necessary because we do not want a failure to resolve
a single case to abort the thread for the whole API group. If one case
fails, we should still try and return the cases that properly resolve.
  • Loading branch information
swrichards committed Oct 3, 2024
1 parent f1745d2 commit 31ca881
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions src/open_inwoner/cms/cases/views/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,41 @@ def get_cases(self) -> list[ZaakWithApiGroup]:

return cases_with_api_group

def _get_cases_for_api_group(self, group: ZGWApiGroupConfig):
def _get_cases_for_api_group(self, group: ZGWApiGroupConfig) -> list[Zaak]:
raw_cases = group.zaken_client.fetch_cases(
**get_user_fetch_parameters(self.request)
)
preprocessed_cases = self.resolve_cases(raw_cases, group)
return preprocessed_cases
resolved_cases = self.resolve_cases(raw_cases, group)

filtered_cases = [
case for case in resolved_cases if case.status and is_zaak_visible(case)
]
filtered_cases.sort(key=lambda case: case.startdatum, reverse=True)
return filtered_cases

def resolve_cases(
self,
cases: list[Zaak],
group: ZGWApiGroupConfig,
) -> list[Zaak]:
resolved_cases = []
with parallel(max_workers=self._thread_limits["resolve_case_list"]) as executor:
futures = [
executor.submit(self.resolve_case, case, group) for case in cases
]

concurrent.futures.wait(futures, timeout=self._fetch_task_timeout)

cases = [case for case in cases if case.status and is_zaak_visible(case)]
cases.sort(key=lambda case: case.startdatum, reverse=True)
for task in concurrent.futures.as_completed(
futures, timeout=self._fetch_task_timeout
):
try:
resolved_case = task.result()
resolved_cases.append(resolved_case)
except BaseException:
logger.exception("Error while resolving case")

return cases
return resolved_cases

def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig):
def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig) -> Zaak:
logger.debug("Resolving case %s with group %s", case.identificatie, group)

functions = [
Expand Down Expand Up @@ -240,12 +250,12 @@ def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig):
for task in concurrent.futures.as_completed(
futures, timeout=self._fetch_task_timeout
):
if exc := task.exception():
logger.error("Error in resolving case: %s", exc, stack_info=True)

update_case = task.result()
if hasattr(update_case, "__call__"):
update_case(case)
try:
update_case = task.result()
if hasattr(update_case, "__call__"):
update_case(case)
except BaseException:
logger.exception("Error in resolving case", stack_info=True)

try:
zaaktype_config = ZaakTypeConfig.objects.filter_case_type(
Expand All @@ -271,6 +281,8 @@ def resolve_case(self, case: Zaak, group: ZGWApiGroupConfig):
exc_info=True,
)

return case

@staticmethod
def _resolve_zaak_type(
case: Zaak, *, client: CatalogiClient
Expand Down

0 comments on commit 31ca881

Please sign in to comment.