From 28dfe3598167127943241e64050c8ebb24f745ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9line=20MS?= Date: Wed, 17 Jul 2024 15:14:23 +0200 Subject: [PATCH] Certify criteria selected during the past 6 months. Certify stored selected criteria each day at noon The Particulier API returns many 503 without any useful explanation. Enhance tests to reflect real world API returns --- clevercloud/cron.json | 1 + config/settings/dev.py | 3 +- ...ertify_selected_administrative_criteria.py | 145 ++++++++++++++++++ itou/utils/apis/api_particulier.py | 51 ++++-- tests/utils/apis/test_api_particulier.py | 138 +++++++++++++++-- 5 files changed, 307 insertions(+), 31 deletions(-) create mode 100755 itou/users/management/commands/certify_selected_administrative_criteria.py diff --git a/clevercloud/cron.json b/clevercloud/cron.json index d088f0bfe9e..48457a47151 100644 --- a/clevercloud/cron.json +++ b/clevercloud/cron.json @@ -17,6 +17,7 @@ "0 */6 * * * $ROOT/clevercloud/run_management_command.sh sync_s3_files", "1 0 * * * $ROOT/clevercloud/run_management_command.sh update_prescriber_organization_with_api_entreprise --verbosity 2", + "15 0 * * * $ROOT/clevercloud/run_management_command.sh certify_selected_administrative_criteria --wet-run", "30 0 * * * $ROOT/clevercloud/run_management_command.sh collect_analytics_data --save", "45 0 * * * $ROOT/clevercloud/run_management_command.sh import_advisor_information shared_bucket/imports-gps/export_gps.xlsx --wet-run", "30 1 * * * $ROOT/clevercloud/run_management_command.sh new_users_to_brevo --wet-run", diff --git a/config/settings/dev.py b/config/settings/dev.py index 6350989c3b5..c255348b66a 100644 --- a/config/settings/dev.py +++ b/config/settings/dev.py @@ -85,4 +85,5 @@ # Don't use json formatter in dev del LOGGING["handlers"]["console"]["formatter"] # noqa: F405 -API_PARTICULIER_BASE_URL = "https://staging.particulier.api.gouv.fr/api/" +API_PARTICULIER_BASE_URL = os.getenv("API_PARTICULIER_BASE_URL", "https://staging.particulier.api.gouv.fr/api/") +API_PARTICULIER_TOKEN = os.getenv("API_PARTICULIER_TOKEN") diff --git a/itou/users/management/commands/certify_selected_administrative_criteria.py b/itou/users/management/commands/certify_selected_administrative_criteria.py new file mode 100755 index 00000000000..2578e676a47 --- /dev/null +++ b/itou/users/management/commands/certify_selected_administrative_criteria.py @@ -0,0 +1,145 @@ +import datetime +import logging +from math import ceil + +from django.db.models import Exists, OuterRef, Q +from django.utils.timezone import make_aware + +from itou.eligibility.models.geiq import GEIQAdministrativeCriteria, GEIQSelectedAdministrativeCriteria +from itou.eligibility.models.iae import AdministrativeCriteria, SelectedAdministrativeCriteria +from itou.users.models import User +from itou.utils.apis import api_particulier +from itou.utils.command import BaseCommand +from itou.utils.iterators import chunks + + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Certify selected administrative criteria from eligiility diagnosis made during the last 6 months + by calling the Particulier API.""" + + def add_arguments(self, parser): + parser.add_argument("--limit", dest="limit", action="store", type=int) + parser.add_argument("--verbose", dest="verbose", action="store_true") + parser.add_argument("--wet-run", dest="wet_run", action="store_true") + + def call_api_and_store_result( + self, + SelectedAdministrativeCriteriaModel, + AdministrativeCriteriaModel, + limit=None, + verbose=False, + wet_run=False, + ): + if not verbose: + # Don't log HTTP requests detail. + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + + total_criteria = 0 + total_criteria_with_certification = 0 + found_beneficiaries = set() + found_not_beneficiaries = set() + not_found_users = set() # 404 + server_errors = 0 # 429, 503, 504 + + criteria = AdministrativeCriteriaModel.objects.certifiable() + # TODO: add test. + period = (make_aware(datetime.datetime(2024, 1, 1)), make_aware(datetime.datetime(2024, 10, 1))) + criteria_pks = ( + SelectedAdministrativeCriteriaModel.objects.filter( + administrative_criteria__in=criteria, + eligibility_diagnosis__created_at__range=period, + eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country__isnull=False, + eligibility_diagnosis__job_seeker__jobseeker_profile__birthdate__isnull=False, + eligibility_diagnosis__job_seeker__first_name__isnull=False, + eligibility_diagnosis__job_seeker__last_name__isnull=False, + eligibility_diagnosis__job_seeker__title__isnull=False, + ) + .exclude(Q(certified__isnull=False) | Q(data_returned_by_api__error__contains="not_found")) # exclude 404 + .order_by("pk") + .values_list("pk", flat=True) + ) + if limit: + criteria_pks = criteria_pks[:limit] + + total_criteria += len(criteria_pks) + if total_criteria == 0: + logger.info("No criteria to certify. Stop now and enjoy your day! ") + return + + users_count = User.objects.filter( + Exists( + SelectedAdministrativeCriteria.objects.filter( + eligibility_diagnosis__job_seeker_id=OuterRef("pk"), + id__in=criteria_pks, + ) + ) + ).count() + logger.info( + f"Candidats à certifier pour le modèle {SelectedAdministrativeCriteriaModel.__name__}: {users_count}" + ) + + chunks_total = ceil(total_criteria / 1000) + chunks_count = 0 + for criteria_pk_subgroup in chunks(criteria_pks, 1000): + criteria = SelectedAdministrativeCriteriaModel.objects.filter(pk__in=criteria_pk_subgroup).select_related( + "administrative_criteria", + "eligibility_diagnosis__job_seeker", + "eligibility_diagnosis__job_seeker__jobseeker_profile", + "eligibility_diagnosis__job_seeker__jobseeker_profile__birth_place", + "eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country", + ) + + with api_particulier.client() as client: + for criterion in criteria: + criterion.certify(client, save=False) + data_returned_by_api = criterion.data_returned_by_api + # 429, 503 and 504 + # See api_particulier.py + if type(data_returned_by_api) is not dict: + data_returned_by_api = {"error": criterion.data_returned_by_api} + + if data_returned_by_api.get("status"): + total_criteria_with_certification += 1 + if data_returned_by_api["status"] == "beneficiaire": + found_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk) + else: + found_not_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk) + + if data_returned_by_api.get("error"): + if data_returned_by_api["error"] == "not_found": + not_found_users.add(criterion.eligibility_diagnosis.job_seeker.pk) + else: + server_errors += 1 + + if wet_run: + SelectedAdministrativeCriteriaModel.objects.bulk_update( + criteria, + fields=[ + "data_returned_by_api", + "certified", + "certification_period", + "certified_at", + ], + ) + + chunks_count += 1 + logger.info(f"########### {chunks_count/chunks_total*100:.2f}%") + + logger.info(f"Total criteria to be certified: {total_criteria}") + logger.info(f"Total criteria with certification: {total_criteria_with_certification}") + logger.info(f"Not beneficiaries: {len(found_not_beneficiaries)}") + logger.info(f"Beneficiaries: {len(found_beneficiaries)}") + logger.info(f"Not found: {len(not_found_users)}") + logger.info(f"Server errors: {server_errors}") + users_found = total_criteria_with_certification / total_criteria * 100 + logger.info(f"That's {users_found:.2f}% users found.") + + def handle(self, limit, verbose, wet_run, *args, **kwargs): + options = {"wet_run": wet_run, "limit": limit, "verbose": verbose} + self.call_api_and_store_result(GEIQSelectedAdministrativeCriteria, GEIQAdministrativeCriteria, **options) + self.call_api_and_store_result(SelectedAdministrativeCriteria, AdministrativeCriteria, **options) diff --git a/itou/utils/apis/api_particulier.py b/itou/utils/apis/api_particulier.py index eb355e27a37..1f9672f1e39 100644 --- a/itou/utils/apis/api_particulier.py +++ b/itou/utils/apis/api_particulier.py @@ -11,6 +11,13 @@ logger = logging.getLogger("APIParticulierClient") +class ShouldRetryException(httpx.HTTPStatusError): + """ + This exception can be used to ask Tenacity to retry + while attaching a response and a request to it. + """ + + def client(): return httpx.Client( base_url=settings.API_PARTICULIER_BASE_URL, @@ -57,21 +64,40 @@ def _build_params_from(job_seeker): @tenacity.retry( wait=tenacity.wait_fixed(2), stop=tenacity.stop_after_attempt(4), - retry=tenacity.retry_if_exception_type(httpx.RequestError), + retry=tenacity.retry_if_exception_type(ShouldRetryException), ) def _request(client, endpoint, job_seeker): params = _build_params_from(job_seeker=job_seeker) response = client.get(endpoint, params=params) - if response.status_code == 504: - reason = response.json().get("reason") - logger.error(f"{response.url=} {reason=}") - raise httpx.RequestError(message=reason) - elif response.status_code == 503: + error_message = None + # Too Many Requests + if response.status_code == 429: + errors = response.json().get("errors") + if errors: + error_message = errors[0] + logger.error(error_message) + raise ShouldRetryException(message=error_message, request=response.request, response=response) + # Bad params. + # Same as 503 except we don't retry. + elif response.status_code in (400, 401): errors = response.json()["errors"] - reason = errors[0].get("title") - for error in errors: - logger.error(f"{response.url=} {error['title']}") - raise httpx.RequestError(message=reason) + logger.error(error_message) + raise httpx.HTTPStatusError(message=error_message, request=response.request, response=response) + # Service unavailable + elif response.status_code == 503: + error_message = response.json().get("error") + if error_message: + error_message = response.json().get("reason") + else: + errors = response.json().get("errors") + error_message = errors[0].get("title") + logger.error(error_message) + raise ShouldRetryException(message=error_message, request=response.request, response=response) + # Server error + elif response.status_code == 504: + error_message = response.json().get("reason") + logger.error(error_message) + raise ShouldRetryException(message=error_message, request=response.request, response=response) else: response.raise_for_status() return response.json() @@ -87,11 +113,10 @@ def revenu_solidarite_active(client, job_seeker): try: data = _request(client, "/v2/revenu-solidarite-active", job_seeker) except httpx.HTTPStatusError as exc: # not 5XX. - logger.info(f"Beneficiary not found. {job_seeker.public_id=}") data["raw_response"] = exc.response.json() - except tenacity.RetryError as retry_err: # 503 or 504 + except tenacity.RetryError as retry_err: # 429, 503 or 504 exc = retry_err.last_attempt._exception - data["raw_response"] = str(exc) + data["raw_response"] = exc.response.json() except KeyError as exc: data["raw_response"] = str(exc) else: diff --git a/tests/utils/apis/test_api_particulier.py b/tests/utils/apis/test_api_particulier.py index f64341105ca..dc7afad02dd 100644 --- a/tests/utils/apis/test_api_particulier.py +++ b/tests/utils/apis/test_api_particulier.py @@ -63,11 +63,12 @@ def test_certify_brsa__missing_information(respx_mock, caplog): def test_not_found(respx_mock): - respx_mock.get(RSA_ENDPOINT).respond(404, json=rsa_not_found_mocker()) + json = rsa_not_found_mocker() + respx_mock.get(RSA_ENDPOINT).respond(404, json=json) job_seeker = JobSeekerFactory(born_in_france=True) with api_particulier.client() as client: response = api_particulier.revenu_solidarite_active(client, job_seeker) - assert response["raw_response"] == rsa_not_found_mocker() + assert response["raw_response"] == json assert response["is_certified"] is None assert response["start_at"] is None assert response["end_at"] is None @@ -76,20 +77,21 @@ def test_not_found(respx_mock): def test_service_unavailable(settings, respx_mock, mocker, caplog): mocker.patch("tenacity.nap.time.sleep") reason = "Erreur inconnue du fournisseur de données" + json = { + "errors": [ + { + "code": "37999", + "title": reason, + "detail": "La réponse retournée par le fournisseur de données est invalide et inconnue de " + "notre service. L'équipe technique a été notifiée de cette erreur pour investigation.", + "source": None, + "meta": {"provider": "CNAV"}, + } + ] + } respx_mock.get(RSA_ENDPOINT).respond( 503, - json={ - "errors": [ - { - "code": "37999", - "title": reason, - "detail": "La réponse retournée par le fournisseur de données est invalide et inconnue de notre" - "service. L'équipe technique a été notifiée de cette erreur pour investigation.", - "source": "null", - "meta": {"provider": "CNAV"}, - } - ] - }, + json=json, ) job_seeker = JobSeekerFactory(born_in_france=True) with api_particulier.client() as client: @@ -97,7 +99,90 @@ def test_service_unavailable(settings, respx_mock, mocker, caplog): assert reason in caplog.text assert RSA_ENDPOINT in caplog.text - assert response["raw_response"] == reason + assert response["raw_response"] == json + assert response["is_certified"] is None + assert response["start_at"] is None + assert response["end_at"] is None + + +def test_provider_unknown(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = ( + "La réponse retournée par le fournisseur de données est invalide et inconnue de notre service. L'équipe " + "technique a été notifiée de cette erreur pour investigation." + ) + json = { + "error": "provider_unknown_error", + "reason": reason, + "message": reason, + } + respx_mock.get(RSA_ENDPOINT).respond( + 503, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + with api_particulier.client() as client: + response = api_particulier.revenu_solidarite_active(client, job_seeker) + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"] == json + assert response["is_certified"] is None + assert response["start_at"] is None + assert response["end_at"] is None + + +def test_bad_params(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = "Entité non traitable" + json = { + "errors": [ + { + "code": "00364", + "title": reason, + "detail": "Le sexe n'est pas correctement formaté (m ou f)", + "source": None, + "meta": {}, + } + ] + } + respx_mock.get(RSA_ENDPOINT).respond( + 400, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + with api_particulier.client() as client: + response = api_particulier.revenu_solidarite_active(client, job_seeker) + + assert response["raw_response"] == json + assert response["is_certified"] is None + assert response["start_at"] is None + assert response["end_at"] is None + + +def test_forbidden(settings, respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep") + reason = "Accès non autorisé" + json = { + "errors": [ + { + "code": "50002", + "title": reason, + "detail": "Le jeton d'accès n'a pas été trouvé ou est expiré.", + "source": None, + "meta": {}, + } + ] + } + respx_mock.get(RSA_ENDPOINT).respond( + 401, + json=json, + ) + job_seeker = JobSeekerFactory(born_in_france=True) + with api_particulier.client() as client: + response = api_particulier.revenu_solidarite_active(client, job_seeker) + + assert response["raw_response"] == json assert response["is_certified"] is None assert response["start_at"] is None assert response["end_at"] is None @@ -106,7 +191,26 @@ def test_service_unavailable(settings, respx_mock, mocker, caplog): def test_gateway_timeout(respx_mock, mocker, caplog): mocker.patch("tenacity.nap.time.sleep", mocker.MagicMock()) reason = "The read operation timed out" - respx_mock.get(RSA_ENDPOINT).respond(504, json={"error": "null", "reason": reason, "message": "null"}) + json = {"error": None, "reason": reason, "message": "null"} + respx_mock.get(RSA_ENDPOINT).respond(504, json=json) + + job_seeker = JobSeekerFactory(born_in_france=True) + with api_particulier.client() as client: + response = api_particulier.revenu_solidarite_active(client, job_seeker) + + assert reason in caplog.text + assert RSA_ENDPOINT in caplog.text + assert response["raw_response"] == json + assert response["is_certified"] is None + assert response["start_at"] is None + assert response["end_at"] is None + + +def test_too_many_requests(respx_mock, mocker, caplog): + mocker.patch("tenacity.nap.time.sleep", mocker.MagicMock()) + reason = "Vous avez effectué trop de requêtes" + json = {"errors": ["Vous avez effectué trop de requêtes"]} + respx_mock.get(RSA_ENDPOINT).respond(429, json=json) job_seeker = JobSeekerFactory(born_in_france=True) with api_particulier.client() as client: @@ -114,7 +218,7 @@ def test_gateway_timeout(respx_mock, mocker, caplog): assert reason in caplog.text assert RSA_ENDPOINT in caplog.text - assert response["raw_response"] == reason + assert response["raw_response"] == json assert response["is_certified"] is None assert response["start_at"] is None assert response["end_at"] is None