From 13824e0e4ab0d97fbf6f6544de80eedfb52ed8dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9line=20MS?= Date: Wed, 17 Jul 2024 15:14:23 +0200 Subject: [PATCH] Certify criteria selected during the past 6 months. Certify stored selected criteria each day at noon The Particulier API returns many 503 without any useful explanation. Enhance tests to reflect real world API returns --- clevercloud/cron.json | 1 + config/settings/dev.py | 3 +- ...ertify_selected_administrative_criteria.py | 156 ++++++++++++++++++ itou/utils/apis/api_particulier.py | 76 ++++++--- tests/utils/apis/test_api_particulier.py | 139 ++++++++++++++-- 5 files changed, 332 insertions(+), 43 deletions(-) create mode 100755 itou/users/management/commands/certify_selected_administrative_criteria.py diff --git a/clevercloud/cron.json b/clevercloud/cron.json index d088f0bfe9e..48457a47151 100644 --- a/clevercloud/cron.json +++ b/clevercloud/cron.json @@ -17,6 +17,7 @@ "0 */6 * * * $ROOT/clevercloud/run_management_command.sh sync_s3_files", "1 0 * * * $ROOT/clevercloud/run_management_command.sh update_prescriber_organization_with_api_entreprise --verbosity 2", + "15 0 * * * $ROOT/clevercloud/run_management_command.sh certify_selected_administrative_criteria --wet-run", "30 0 * * * $ROOT/clevercloud/run_management_command.sh collect_analytics_data --save", "45 0 * * * $ROOT/clevercloud/run_management_command.sh import_advisor_information shared_bucket/imports-gps/export_gps.xlsx --wet-run", "30 1 * * * $ROOT/clevercloud/run_management_command.sh new_users_to_brevo --wet-run", diff --git a/config/settings/dev.py b/config/settings/dev.py index 6350989c3b5..c255348b66a 100644 --- a/config/settings/dev.py +++ b/config/settings/dev.py @@ -85,4 +85,5 @@ # Don't use json formatter in dev del LOGGING["handlers"]["console"]["formatter"] # noqa: F405 -API_PARTICULIER_BASE_URL = "https://staging.particulier.api.gouv.fr/api/" +API_PARTICULIER_BASE_URL = os.getenv("API_PARTICULIER_BASE_URL", "https://staging.particulier.api.gouv.fr/api/") +API_PARTICULIER_TOKEN = os.getenv("API_PARTICULIER_TOKEN") diff --git a/itou/users/management/commands/certify_selected_administrative_criteria.py b/itou/users/management/commands/certify_selected_administrative_criteria.py new file mode 100755 index 00000000000..9754adbba8d --- /dev/null +++ b/itou/users/management/commands/certify_selected_administrative_criteria.py @@ -0,0 +1,156 @@ +import concurrent +import datetime +import logging +from math import ceil + +from django.db.models import Exists, OuterRef, Q +from django.utils.timezone import make_aware + +from itou.eligibility.models.geiq import GEIQAdministrativeCriteria, GEIQSelectedAdministrativeCriteria +from itou.eligibility.models.iae import AdministrativeCriteria, SelectedAdministrativeCriteria +from itou.users.models import User +from itou.utils.command import BaseCommand +from itou.utils.iterators import chunks + + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Certify selected administrative criteria from eligiility diagnosis made during the last 6 months + by calling the Particulier API.""" + + def add_arguments(self, parser): + parser.add_argument("--limit", dest="limit", action="store", type=int) + parser.add_argument("--synchronously", dest="synchronously", action="store_true") + parser.add_argument("--verbose", dest="verbose", action="store_true") + parser.add_argument("--wet-run", dest="wet_run", action="store_true") + + def call_api_and_store_result( + self, + SelectedAdministrativeCriteriaModel, + AdministrativeCriteriaModel, + limit=None, + synchronously=False, + verbose=False, + wet_run=False, + ): + if not verbose: + # Don't log HTTP requests detail. + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + + total_criteria = 0 + total_criteria_with_certification = 0 + found_beneficiaries = set() + found_not_beneficiaries = set() + not_found_users = set() # 404 + server_errors = 0 # 429, 503, 504 + + criteria = AdministrativeCriteriaModel.objects.certifiable() + # TODO: add test. + period = (make_aware(datetime.datetime(2024, 1, 1)), make_aware(datetime.datetime(2024, 10, 1))) + criteria_pks = ( + SelectedAdministrativeCriteriaModel.objects.filter( + administrative_criteria__in=criteria, + eligibility_diagnosis__created_at__range=period, + eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country__isnull=False, + eligibility_diagnosis__job_seeker__jobseeker_profile__birthdate__isnull=False, + eligibility_diagnosis__job_seeker__first_name__isnull=False, + eligibility_diagnosis__job_seeker__last_name__isnull=False, + eligibility_diagnosis__job_seeker__title__isnull=False, + ) + .exclude(Q(certified__isnull=False) | Q(data_returned_by_api__error__contains="not_found")) # exclude 404 + .order_by("pk") + .values_list("pk", flat=True) + ) + if limit: + criteria_pks = criteria_pks[:limit] + + total_criteria += len(criteria_pks) + if total_criteria == 0: + logger.info("No criteria to certify. Stop now and enjoy your day! ") + return + + users_count = User.objects.filter( + Exists( + SelectedAdministrativeCriteria.objects.filter( + eligibility_diagnosis__job_seeker_id=OuterRef("pk"), + id__in=criteria_pks, + ) + ) + ).count() + logger.info( + f"Candidats à certifier pour le modèle {SelectedAdministrativeCriteriaModel.__name__}: {users_count}" + ) + + chunks_total = ceil(total_criteria / 1000) + chunks_count = 0 + for criteria_pk_subgroup in chunks(criteria_pks, 1000): + criteria = SelectedAdministrativeCriteriaModel.objects.filter(pk__in=criteria_pk_subgroup).select_related( + "administrative_criteria", + "eligibility_diagnosis__job_seeker", + "eligibility_diagnosis__job_seeker__jobseeker_profile", + "eligibility_diagnosis__job_seeker__jobseeker_profile__birth_place", + "eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country", + ) + + # Tenacity's retry feature does not seem to work with ThreadPoolExecutor. + # httpx.RequestError, which should retry, does not. + # So leave the possibility to certify more criteria even if it lasts longer. + if synchronously: + for criterion in criteria: + criterion.certify(save=False) + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + batch_futures = [executor.submit(criterion.certify, False) for criterion in criteria] + concurrent.futures.wait(batch_futures, timeout=3600) + + for criterion in criteria: + data_returned_by_api = criterion.data_returned_by_api + # 429, 503 and 504 + # See api_particulier.py + if type(criterion.data_returned_by_api) is not dict: + data_returned_by_api = {"error": criterion.data_returned_by_api} + + if data_returned_by_api.get("status"): + total_criteria_with_certification += 1 + if data_returned_by_api["status"] == "beneficiaire": + found_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk) + else: + found_not_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk) + + if data_returned_by_api.get("error"): + if data_returned_by_api["error"] == "not_found": + not_found_users.add(criterion.eligibility_diagnosis.job_seeker.pk) + else: + server_errors += 1 + + if wet_run: + SelectedAdministrativeCriteriaModel.objects.bulk_update( + criteria, + fields=[ + "data_returned_by_api", + "certified", + "certification_period", + "certified_at", + ], + ) + + chunks_count += 1 + logger.info(f"########### {chunks_count/chunks_total*100:.2f}%") + + logger.info(f"Total criteria to be certified: {total_criteria}") + logger.info(f"Total criteria with certification: {total_criteria_with_certification}") + logger.info(f"Not beneficiaries: {len(found_not_beneficiaries)}") + logger.info(f"Beneficiaries: {len(found_beneficiaries)}") + logger.info(f"Not found: {len(not_found_users)}") + logger.info(f"Server errors: {server_errors}") + users_found = total_criteria_with_certification / total_criteria * 100 + logger.info(f"That's {users_found:.2f}% users found.") + + def handle(self, limit, synchronously, verbose, wet_run, *args, **kwargs): + options = {"wet_run": wet_run, "limit": limit, "synchronously": synchronously, "verbose": verbose} + self.call_api_and_store_result(GEIQSelectedAdministrativeCriteria, GEIQAdministrativeCriteria, **options) + self.call_api_and_store_result(SelectedAdministrativeCriteria, AdministrativeCriteria, **options) diff --git a/itou/utils/apis/api_particulier.py b/itou/utils/apis/api_particulier.py index 72a8c3d6bd7..89696929eba 100644 --- a/itou/utils/apis/api_particulier.py +++ b/itou/utils/apis/api_particulier.py @@ -11,6 +11,15 @@ logger = logging.getLogger("APIParticulierClient") +class ShouldRetryException(httpx.HTTPStatusError): + """ + This exception can be used to ask Tenacity to retry + while attaching a response and a request to it. + """ + + pass + + class APIParticulierClient: def __init__(self, job_seeker=None): self.client = httpx.Client( @@ -57,48 +66,63 @@ def _build_params_from(cls, job_seeker): @tenacity.retry( wait=tenacity.wait_fixed(2), stop=tenacity.stop_after_attempt(4), - retry=tenacity.retry_if_exception_type(httpx.RequestError), + retry=tenacity.retry_if_exception_type(ShouldRetryException), ) def _request(self, endpoint, params=None): params = self._build_params_from(job_seeker=self.job_seeker) response = self.client.get(endpoint, params=params) - if response.status_code == 504: - reason = response.json().get("reason") - logger.error(f"{response.url=} {reason=}") - raise httpx.RequestError(message=reason) - elif response.status_code == 503: + error_message = None + if response.status_code == 404: + return response.json() + # Too Many Requests + elif response.status_code == 429: + errors = response.json().get("errors") + if errors: + error_message = errors[0] + raise ShouldRetryException(message=error_message, request=response.request, response=response) + # Bad params. + # Same as 503 except we don't retry. + elif response.status_code in (400, 401): errors = response.json()["errors"] - reason = errors[0].get("title") - for error in errors: - logger.error(f"{response.url=} {error['title']}") - raise httpx.RequestError(message=reason) + raise httpx.HTTPStatusError(message=error_message, request=response.request, response=response) + # Service unavailable + elif response.status_code == 503: + error_message = response.json().get("error") + if error_message: + error_message = response.json().get("reason") + else: + errors = response.json().get("errors") + error_message = errors[0].get("title") + raise ShouldRetryException(message=error_message, request=response.request, response=response) + # Server error + elif response.status_code == 504: + error_message = response.json().get("reason") + raise ShouldRetryException(message=error_message, request=response.request, response=response) else: response.raise_for_status() return response.json() def revenu_solidarite_active(self): - data = { - "start_at": "", - "end_at": "", - "is_certified": "", - "raw_response": "", - } + data = {"start_at": "", "end_at": "", "is_certified": "", "raw_response": ""} + error_message = None try: - data = self._request("/v2/revenu-solidarite-active") + response = self._request("/v2/revenu-solidarite-active") except httpx.HTTPStatusError as exc: # not 5XX. - logger.info(f"Beneficiary not found. {self.job_seeker.public_id=}") - data["raw_response"] = exc.response.json() - except tenacity.RetryError as retry_err: # 503 or 504 + error_message = f"{exc.response.status_code}: {exc.response.json()} {exc.response.url=}" + except tenacity.RetryError as retry_err: # 429, 503 or 504 exc = retry_err.last_attempt._exception - data["raw_response"] = str(exc) + error_message = f"{exc.response.status_code}: {exc.response.json()} {exc.response.url=}" except KeyError as exc: - data["raw_response"] = str(exc) + error_message = f"KeyError: {exc=}" else: data = { - "start_at": self.format_date(data["dateDebut"]), - "end_at": self.format_date(data["dateFin"]), - "is_certified": data["status"] == "beneficiaire", - "raw_response": data, + "start_at": self.format_date(response["dateDebut"]), + "end_at": self.format_date(response["dateFin"]), + "is_certified": response["status"] == "beneficiaire", + "raw_response": response, } finally: + if error_message: + data["raw_response"] = error_message + logger.warning(error_message) return data diff --git a/tests/utils/apis/test_api_particulier.py b/tests/utils/apis/test_api_particulier.py index bf51777c9ed..8633a0da417 100644 --- a/tests/utils/apis/test_api_particulier.py +++ b/tests/utils/apis/test_api_particulier.py @@ -67,7 +67,7 @@ def test_not_found(respx_mock): job_seeker = JobSeekerFactory(born_in_france=True) client = APIParticulierClient(job_seeker=job_seeker) response = client.revenu_solidarite_active() - assert response["raw_response"] == rsa_not_found_mocker() + assert response["raw_response"].startswith(f"404: {rsa_not_found_mocker()}") assert response["is_certified"] == "" assert response["start_at"] == "" assert response["end_at"] == "" @@ -76,20 +76,21 @@ def test_not_found(respx_mock): def test_service_unavailable(settings, respx_mock, mocker, caplog): mocker.patch("tenacity.nap.time.sleep") reason = "Erreur inconnue du fournisseur de données" + json = { + "errors": [ + { + "code": "37999", + "title": reason, + "detail": "La réponse retournée par le fournisseur de données est invalide et inconnue de " + "notre service. L'équipe technique a été notifiée de cette erreur pour investigation.", + "source": None, + "meta": {"provider": "CNAV"}, + } + ] + } respx_mock.get(RSA_ENDPOINT).respond( 503, - json={ - "errors": [ - { - "code": "37999", - "title": reason, - "detail": "La réponse retournée par le fournisseur de données est invalide et inconnue de notre" - "service. L'équipe technique a été notifiée de cette erreur pour investigation.", - "source": "null", - "meta": {"provider": "CNAV"}, - } - ] - }, + json=json, ) job_seeker = JobSeekerFactory(born_in_france=True) client = APIParticulierClient(job_seeker=job_seeker) @@ -97,7 +98,94 @@ def test_service_unavailable(settings, respx_mock, mocker, caplog): assert reason in caplog.text assert RSA_ENDPOINT in caplog.text - assert response["raw_response"] == reason + assert response["raw_response"].startswith(f"503: {json}") + assert response["is_certified"] == "" + assert response["start_at"] == "" + assert response["end_at"] == "" + + +def test_provider_unknown(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = ( + "La réponse retournée par le fournisseur de données est invalide et inconnue de notre service. L'équipe " + "technique a été notifiée de cette erreur pour investigation." + ) + json = { + "error": "provider_unknown_error", + "reason": reason, + "message": reason, + } + respx_mock.get(RSA_ENDPOINT).respond( + 503, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + client = APIParticulierClient(job_seeker=job_seeker) + response = client.revenu_solidarite_active() + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"].startswith(f"503: {json}") + assert response["is_certified"] == "" + assert response["start_at"] == "" + assert response["end_at"] == "" + + +def test_bad_params(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = "Entité non traitable" + json = { + "errors": [ + { + "code": "00364", + "title": reason, + "detail": "Le sexe n'est pas correctement formaté (m ou f)", + "source": None, + "meta": {}, + } + ] + } + respx_mock.get(RSA_ENDPOINT).respond( + 400, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + client = APIParticulierClient(job_seeker=job_seeker) + response = client.revenu_solidarite_active() + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"].startswith(f"400: {json}") + assert response["is_certified"] == "" + assert response["start_at"] == "" + assert response["end_at"] == "" + + +def test_forbidden(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = "Accès non autorisé" + json = { + "errors": [ + { + "code": "50002", + "title": reason, + "detail": "Le jeton d'accès n'a pas été trouvé ou est expiré.", + "source": None, + "meta": {}, + } + ] + } + respx_mock.get(RSA_ENDPOINT).respond( + 401, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + client = APIParticulierClient(job_seeker=job_seeker) + response = client.revenu_solidarite_active() + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"].startswith(f"401: {json}") assert response["is_certified"] == "" assert response["start_at"] == "" assert response["end_at"] == "" @@ -106,7 +194,26 @@ def test_service_unavailable(settings, respx_mock, mocker, caplog): def test_gateway_timeout(respx_mock, mocker, caplog): mocker.patch("tenacity.nap.time.sleep", mocker.MagicMock()) reason = "The read operation timed out" - respx_mock.get(RSA_ENDPOINT).respond(504, json={"error": "null", "reason": reason, "message": "null"}) + json = {"error": None, "reason": reason, "message": "null"} + respx_mock.get(RSA_ENDPOINT).respond(504, json=json) + + job_seeker = JobSeekerFactory(born_in_france=True) + client = APIParticulierClient(job_seeker=job_seeker) + response = client.revenu_solidarite_active() + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"].startswith(f"504: {json}") + assert response["is_certified"] == "" + assert response["start_at"] == "" + assert response["end_at"] == "" + + +def test_too_many_requests(respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep", mocker.MagicMock()) + reason = "Vous avez effectué trop de requêtes" + json = {"errors": ["Vous avez effectué trop de requêtes"]} + respx_mock.get(RSA_ENDPOINT).respond(429, json=json) job_seeker = JobSeekerFactory(born_in_france=True) client = APIParticulierClient(job_seeker=job_seeker) @@ -114,7 +221,7 @@ def test_gateway_timeout(respx_mock, mocker, caplog): assert reason in caplog.text assert RSA_ENDPOINT in caplog.text - assert response["raw_response"] == reason + assert response["raw_response"].startswith(f"429: {json}") assert response["is_certified"] == "" assert response["start_at"] == "" assert response["end_at"] == ""