Skip to content

Commit

Permalink
Certify criteria selected during the past 6 months.
Browse files Browse the repository at this point in the history
Certify stored selected criteria each day at noon

The Particulier API returns many 503 without any useful
explanation.

Enhance tests to reflect real world API returns
  • Loading branch information
celine-m-s authored and francoisfreitag committed Oct 22, 2024
1 parent 61131e5 commit 13824e0
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 43 deletions.
1 change: 1 addition & 0 deletions clevercloud/cron.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"0 */6 * * * $ROOT/clevercloud/run_management_command.sh sync_s3_files",

"1 0 * * * $ROOT/clevercloud/run_management_command.sh update_prescriber_organization_with_api_entreprise --verbosity 2",
"15 0 * * * $ROOT/clevercloud/run_management_command.sh certify_selected_administrative_criteria --wet-run",
"30 0 * * * $ROOT/clevercloud/run_management_command.sh collect_analytics_data --save",
"45 0 * * * $ROOT/clevercloud/run_management_command.sh import_advisor_information shared_bucket/imports-gps/export_gps.xlsx --wet-run",
"30 1 * * * $ROOT/clevercloud/run_management_command.sh new_users_to_brevo --wet-run",
Expand Down
3 changes: 2 additions & 1 deletion config/settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
# Don't use json formatter in dev
del LOGGING["handlers"]["console"]["formatter"] # noqa: F405

API_PARTICULIER_BASE_URL = "https://staging.particulier.api.gouv.fr/api/"
API_PARTICULIER_BASE_URL = os.getenv("API_PARTICULIER_BASE_URL", "https://staging.particulier.api.gouv.fr/api/")
API_PARTICULIER_TOKEN = os.getenv("API_PARTICULIER_TOKEN")
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import concurrent
import datetime
import logging
from math import ceil

from django.db.models import Exists, OuterRef, Q
from django.utils.timezone import make_aware

from itou.eligibility.models.geiq import GEIQAdministrativeCriteria, GEIQSelectedAdministrativeCriteria
from itou.eligibility.models.iae import AdministrativeCriteria, SelectedAdministrativeCriteria
from itou.users.models import User
from itou.utils.command import BaseCommand
from itou.utils.iterators import chunks


logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""
Certify selected administrative criteria from eligiility diagnosis made during the last 6 months
by calling the Particulier API."""

def add_arguments(self, parser):
parser.add_argument("--limit", dest="limit", action="store", type=int)
parser.add_argument("--synchronously", dest="synchronously", action="store_true")
parser.add_argument("--verbose", dest="verbose", action="store_true")
parser.add_argument("--wet-run", dest="wet_run", action="store_true")

def call_api_and_store_result(
self,
SelectedAdministrativeCriteriaModel,
AdministrativeCriteriaModel,
limit=None,
synchronously=False,
verbose=False,
wet_run=False,
):
if not verbose:
# Don't log HTTP requests detail.
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

total_criteria = 0
total_criteria_with_certification = 0
found_beneficiaries = set()
found_not_beneficiaries = set()
not_found_users = set() # 404
server_errors = 0 # 429, 503, 504

criteria = AdministrativeCriteriaModel.objects.certifiable()
# TODO: add test.
period = (make_aware(datetime.datetime(2024, 1, 1)), make_aware(datetime.datetime(2024, 10, 1)))
criteria_pks = (
SelectedAdministrativeCriteriaModel.objects.filter(
administrative_criteria__in=criteria,
eligibility_diagnosis__created_at__range=period,
eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country__isnull=False,
eligibility_diagnosis__job_seeker__jobseeker_profile__birthdate__isnull=False,
eligibility_diagnosis__job_seeker__first_name__isnull=False,
eligibility_diagnosis__job_seeker__last_name__isnull=False,
eligibility_diagnosis__job_seeker__title__isnull=False,
)
.exclude(Q(certified__isnull=False) | Q(data_returned_by_api__error__contains="not_found")) # exclude 404
.order_by("pk")
.values_list("pk", flat=True)
)
if limit:
criteria_pks = criteria_pks[:limit]

total_criteria += len(criteria_pks)
if total_criteria == 0:
logger.info("No criteria to certify. Stop now and enjoy your day! ")
return

users_count = User.objects.filter(
Exists(
SelectedAdministrativeCriteria.objects.filter(
eligibility_diagnosis__job_seeker_id=OuterRef("pk"),
id__in=criteria_pks,
)
)
).count()
logger.info(
f"Candidats à certifier pour le modèle {SelectedAdministrativeCriteriaModel.__name__}: {users_count}"
)

chunks_total = ceil(total_criteria / 1000)
chunks_count = 0
for criteria_pk_subgroup in chunks(criteria_pks, 1000):
criteria = SelectedAdministrativeCriteriaModel.objects.filter(pk__in=criteria_pk_subgroup).select_related(
"administrative_criteria",
"eligibility_diagnosis__job_seeker",
"eligibility_diagnosis__job_seeker__jobseeker_profile",
"eligibility_diagnosis__job_seeker__jobseeker_profile__birth_place",
"eligibility_diagnosis__job_seeker__jobseeker_profile__birth_country",
)

# Tenacity's retry feature does not seem to work with ThreadPoolExecutor.
# httpx.RequestError, which should retry, does not.
# So leave the possibility to certify more criteria even if it lasts longer.
if synchronously:
for criterion in criteria:
criterion.certify(save=False)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
batch_futures = [executor.submit(criterion.certify, False) for criterion in criteria]
concurrent.futures.wait(batch_futures, timeout=3600)

for criterion in criteria:
data_returned_by_api = criterion.data_returned_by_api
# 429, 503 and 504
# See api_particulier.py
if type(criterion.data_returned_by_api) is not dict:
data_returned_by_api = {"error": criterion.data_returned_by_api}

if data_returned_by_api.get("status"):
total_criteria_with_certification += 1
if data_returned_by_api["status"] == "beneficiaire":
found_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk)
else:
found_not_beneficiaries.add(criterion.eligibility_diagnosis.job_seeker.pk)

if data_returned_by_api.get("error"):
if data_returned_by_api["error"] == "not_found":
not_found_users.add(criterion.eligibility_diagnosis.job_seeker.pk)
else:
server_errors += 1

if wet_run:
SelectedAdministrativeCriteriaModel.objects.bulk_update(
criteria,
fields=[
"data_returned_by_api",
"certified",
"certification_period",
"certified_at",
],
)

chunks_count += 1
logger.info(f"########### {chunks_count/chunks_total*100:.2f}%")

logger.info(f"Total criteria to be certified: {total_criteria}")
logger.info(f"Total criteria with certification: {total_criteria_with_certification}")
logger.info(f"Not beneficiaries: {len(found_not_beneficiaries)}")
logger.info(f"Beneficiaries: {len(found_beneficiaries)}")
logger.info(f"Not found: {len(not_found_users)}")
logger.info(f"Server errors: {server_errors}")
users_found = total_criteria_with_certification / total_criteria * 100
logger.info(f"That's {users_found:.2f}% users found.")

def handle(self, limit, synchronously, verbose, wet_run, *args, **kwargs):
options = {"wet_run": wet_run, "limit": limit, "synchronously": synchronously, "verbose": verbose}
self.call_api_and_store_result(GEIQSelectedAdministrativeCriteria, GEIQAdministrativeCriteria, **options)
self.call_api_and_store_result(SelectedAdministrativeCriteria, AdministrativeCriteria, **options)
76 changes: 50 additions & 26 deletions itou/utils/apis/api_particulier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
logger = logging.getLogger("APIParticulierClient")


class ShouldRetryException(httpx.HTTPStatusError):
"""
This exception can be used to ask Tenacity to retry
while attaching a response and a request to it.
"""

pass


class APIParticulierClient:
def __init__(self, job_seeker=None):
self.client = httpx.Client(
Expand Down Expand Up @@ -57,48 +66,63 @@ def _build_params_from(cls, job_seeker):
@tenacity.retry(
wait=tenacity.wait_fixed(2),
stop=tenacity.stop_after_attempt(4),
retry=tenacity.retry_if_exception_type(httpx.RequestError),
retry=tenacity.retry_if_exception_type(ShouldRetryException),
)
def _request(self, endpoint, params=None):
params = self._build_params_from(job_seeker=self.job_seeker)
response = self.client.get(endpoint, params=params)
if response.status_code == 504:
reason = response.json().get("reason")
logger.error(f"{response.url=} {reason=}")
raise httpx.RequestError(message=reason)
elif response.status_code == 503:
error_message = None
if response.status_code == 404:
return response.json()
# Too Many Requests
elif response.status_code == 429:
errors = response.json().get("errors")
if errors:
error_message = errors[0]
raise ShouldRetryException(message=error_message, request=response.request, response=response)
# Bad params.
# Same as 503 except we don't retry.
elif response.status_code in (400, 401):
errors = response.json()["errors"]
reason = errors[0].get("title")
for error in errors:
logger.error(f"{response.url=} {error['title']}")
raise httpx.RequestError(message=reason)
raise httpx.HTTPStatusError(message=error_message, request=response.request, response=response)
# Service unavailable
elif response.status_code == 503:
error_message = response.json().get("error")
if error_message:
error_message = response.json().get("reason")
else:
errors = response.json().get("errors")
error_message = errors[0].get("title")
raise ShouldRetryException(message=error_message, request=response.request, response=response)
# Server error
elif response.status_code == 504:
error_message = response.json().get("reason")
raise ShouldRetryException(message=error_message, request=response.request, response=response)
else:
response.raise_for_status()
return response.json()

def revenu_solidarite_active(self):
data = {
"start_at": "",
"end_at": "",
"is_certified": "",
"raw_response": "",
}
data = {"start_at": "", "end_at": "", "is_certified": "", "raw_response": ""}
error_message = None
try:
data = self._request("/v2/revenu-solidarite-active")
response = self._request("/v2/revenu-solidarite-active")
except httpx.HTTPStatusError as exc: # not 5XX.
logger.info(f"Beneficiary not found. {self.job_seeker.public_id=}")
data["raw_response"] = exc.response.json()
except tenacity.RetryError as retry_err: # 503 or 504
error_message = f"{exc.response.status_code}: {exc.response.json()} {exc.response.url=}"
except tenacity.RetryError as retry_err: # 429, 503 or 504
exc = retry_err.last_attempt._exception
data["raw_response"] = str(exc)
error_message = f"{exc.response.status_code}: {exc.response.json()} {exc.response.url=}"
except KeyError as exc:
data["raw_response"] = str(exc)
error_message = f"KeyError: {exc=}"
else:
data = {
"start_at": self.format_date(data["dateDebut"]),
"end_at": self.format_date(data["dateFin"]),
"is_certified": data["status"] == "beneficiaire",
"raw_response": data,
"start_at": self.format_date(response["dateDebut"]),
"end_at": self.format_date(response["dateFin"]),
"is_certified": response["status"] == "beneficiaire",
"raw_response": response,
}
finally:
if error_message:
data["raw_response"] = error_message
logger.warning(error_message)
return data
Loading

0 comments on commit 13824e0

Please sign in to comment.