diff --git a/src/ngohub/core.py b/src/ngohub/core.py index f2c31cd..fff51b1 100644 --- a/src/ngohub/core.py +++ b/src/ngohub/core.py @@ -1,10 +1,20 @@ from abc import ABC, abstractmethod -from copy import deepcopy -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union +from urllib.parse import urlencode from ngohub.exceptions import HubHTTPException, MissingUserException +from ngohub.models.checks import CheckOrganizationUserApplication +from ngohub.models.organization import Application, Organization, OrganizationApplication +from ngohub.models.public import Version +from ngohub.models.user import User from ngohub.network import HTTPClient, HTTPClientResponse -from ngohub.normalization.organization import normalize_organization_data +from ngohub.normalization.organization import ( + normalize_application_list, + normalize_organization_applications, + normalize_organization_data, +) +from ngohub.normalization.public import normalize_version +from ngohub.normalization.user import normalize_user class BaseHub(ABC): @@ -23,144 +33,109 @@ def __init__(self, api_base_url: str) -> None: self.client: HTTPClient = HTTPClient(self.api_base_url) - def get_raw_organization_profile(self, ngo_token: str) -> Dict[str, Any]: - response: HTTPClientResponse = self.client.api_get("/organization-profile/", token=ngo_token) - - return response.to_dict() - - def get_raw_organization(self, admin_token: str, organization_id: int) -> Dict[str, Any]: - response: HTTPClientResponse = self.client.api_get(f"/organization/{organization_id}/", token=admin_token) - - return response.to_dict() - - -class NGOHub(NGOHubRaw): - def __init__(self, api_base_url: str) -> None: - super().__init__(api_base_url) - - def is_healthy(self) -> bool: + def get_raw_health(self) -> str: response: HTTPClientResponse = self.client.api_get("/health/") - response_is_ok: bool = response.to_str() == "OK" - - return response_is_ok + return response.to_str() - def get_version(self) -> Dict[str, str]: + def get_raw_version(self) -> Dict[str, str]: response: HTTPClientResponse = self.client.api_get("/version/") - response_dict: Dict = response.to_dict() - version_revision: Dict[str, str] = { - "version": response_dict["version"], - "revision": response_dict["revision"], - } - - return version_revision + return response.to_dict() - def get_file_url(self, path: str) -> str: - response: HTTPClientResponse = self.client.api_get(f"/file?path={path}") + def get_raw_file_url(self, path: str) -> str: + url_params: Dict[str, str] = {"path": path} + response: HTTPClientResponse = self.client.api_get(f"/file?{urlencode(url_params)}") return response.to_str() - def _get_nomenclature(self, nomenclature: str) -> Any: + def _get_raw_nomenclature(self, nomenclature: str) -> Any: response: HTTPClientResponse = self.client.api_get(f"/nomenclatures/{nomenclature}") return response.to_dict() - def get_cities_nomenclatures( + def get_raw_cities_nomenclatures( self, search: str = None, county_id: int = None, city_id: int = None ) -> List[Dict[str, Any]]: mandatory_params: List[Any] = [search, county_id] if all(param is None for param in mandatory_params): raise ValueError("Please provide at least one of the following: county_id, search") - search_query: List[str] = [] + url_params: Dict[str, Any] = {} if search: - search_query.append(f"search={search}") + url_params["search"] = search if county_id: - search_query.append(f"countyId={county_id}") + url_params["countyId"] = county_id if city_id: - search_query.append(f"cityId={city_id}") + url_params["cityId"] = city_id - return self._get_nomenclature(f"cities?{'&'.join(search_query)}") + return self._get_raw_nomenclature(f"cities?{urlencode(url_params)}") - def get_counties_nomenclatures(self) -> List[Dict[str, Any]]: - return self._get_nomenclature("counties") + def get_raw_counties_nomenclatures(self) -> List[Dict[str, Any]]: + return self._get_raw_nomenclature("counties") - def get_domains_nomenclatures(self): - return self._get_nomenclature("domains") + def get_raw_domains_nomenclatures(self): + return self._get_raw_nomenclature("domains") - def get_regions_nomenclatures(self): - return self._get_nomenclature("regions") + def get_raw_regions_nomenclatures(self): + return self._get_raw_nomenclature("regions") - def get_federations_nomenclatures(self): - return self._get_nomenclature("federations") + def get_raw_federations_nomenclatures(self): + return self._get_raw_nomenclature("federations") - def get_coalitions_nomenclatures(self): - return self._get_nomenclature("coalitions") + def get_raw_coalitions_nomenclatures(self): + return self._get_raw_nomenclature("coalitions") - def get_faculties_nomenclatures(self): - return self._get_nomenclature("faculties") + def get_raw_faculties_nomenclatures(self): + return self._get_raw_nomenclature("faculties") - def get_skills_nomenclatures(self): - return self._get_nomenclature("skills") + def get_raw_skills_nomenclatures(self): + return self._get_raw_nomenclature("skills") - def get_practice_domains_nomenclatures(self): - return self._get_nomenclature("practice-domains") + def get_raw_practice_domains_nomenclatures(self): + return self._get_raw_nomenclature("practice-domains") - def get_service_domains_nomenclatures(self): - return self._get_nomenclature("service-domains") + def get_raw_service_domains_nomenclatures(self): + return self._get_raw_nomenclature("service-domains") - def get_beneficiaries_nomenclatures(self): - return self._get_nomenclature("beneficiaries") + def get_raw_beneficiaries_nomenclatures(self): + return self._get_raw_nomenclature("beneficiaries") - def get_issuers_nomenclatures(self): - return self._get_nomenclature("issuers") + def get_raw_issuers_nomenclatures(self): + return self._get_raw_nomenclature("issuers") - # User related methods - def get_profile(self, user_token: str) -> Dict[str, Any]: + def get_raw_profile(self, user_token: str) -> Dict[str, Any]: response: HTTPClientResponse = self.client.api_get("/profile/", token=user_token) return response.to_dict() - # Organization related methods - def get_organization_profile(self, ngo_token: str) -> Dict[str, Any]: - response: Dict[str, Any] = self.get_raw_organization_profile(ngo_token) + def get_raw_organization_profile(self, ngo_token: str) -> Dict[str, Any]: + response: HTTPClientResponse = self.client.api_get("/organization-profile/", token=ngo_token) - return normalize_organization_data(response) + return response.to_dict() - def get_user_organization_applications(self, ngo_token: str) -> List[Dict[str, Any]]: - response: HTTPClientResponse = self.client.api_get("/organizations/application/", token=ngo_token) + def get_raw_application_list(self, admin_token: str) -> List[Dict[str, Any]]: + response: HTTPClientResponse = self.client.api_get("/application/list/", token=admin_token) return list(response.to_dict()) - def check_user_organization_has_application(self, ngo_token: str, login_link: str) -> Dict[str, Any]: - organization_applications: List[Dict[str, Any]] = self.get_user_organization_applications(ngo_token) - - for app in organization_applications: - if app["loginLink"].startswith(login_link) and app["status"] == "active" and app["ongStatus"] == "active": - return app - - return {} - - # Admin related methods - def get_application_list(self, admin_token: str) -> List[Dict[str, Any]]: - response: HTTPClientResponse = self.client.api_get("/application/list/", token=admin_token) + def get_raw_user_organization_applications(self, ngo_token: str) -> List[Dict[str, Any]]: + response: HTTPClientResponse = self.client.api_get("/organizations/application/", token=ngo_token) return list(response.to_dict()) - def get_organization(self, admin_token: str, organization_id: int) -> Dict[str, Any]: - response: Dict[str, Any] = self.get_raw_organization(admin_token=admin_token, organization_id=organization_id) + def get_raw_organization(self, admin_token: str, organization_id: int) -> Dict[str, Any]: + response: HTTPClientResponse = self.client.api_get(f"/organization/{organization_id}/", token=admin_token) - return normalize_organization_data(response) + return response.to_dict() - def get_organization_applications(self, admin_token: str, organization_id: int) -> List[Dict[str, Any]]: + def get_raw_organization_applications(self, admin_token, organization_id) -> List[Dict[str, Any]]: response: HTTPClientResponse = self.client.api_get( f"/application/organization/{organization_id}/", token=admin_token ) - return list(response.to_dict()) - def get_user(self, admin_token: str, user_id: int) -> Dict[str, Any]: + def get_user_raw(self, admin_token: str, user_id: int) -> Dict[str, Any]: try: response: HTTPClientResponse = self.client.api_get(f"/user/{user_id}/", token=admin_token) except HubHTTPException as e: @@ -171,7 +146,7 @@ def get_user(self, admin_token: str, user_id: int) -> Dict[str, Any]: return response.to_dict() - def get_users( + def get_raw_users( self, admin_token: str, organization_id: int, @@ -185,53 +160,117 @@ def get_users( status: str = None, available_apps_ids: List[int] = None, ) -> Dict[str, Any]: - request_url: str = f"/user?organization_id={organization_id}&limit={limit}&page={page}" + base_url: str = "/user" + url_params: dict = {"organization_id": organization_id, "limit": limit, "page": page} + if search: - request_url += f"&search={search}" + url_params["search"] = search + if order_by: - request_url += f"&orderBy={order_by}" + url_params["orderBy"] = order_by if order_direction and order_direction.upper() in ["ASC", "DESC"]: - request_url += f"&orderDirection={order_direction.upper()}" + url_params["orderDirection"] = order_direction.upper() + if start: - request_url += f"&start={start}" + url_params["start"] = start if end: - request_url += f"&end={end}" + url_params["end"] = end + if status and status.lower() in ["active", "pending", "restricted"]: - request_url += f"&status={status}" + url_params["status"] = status.lower() + if available_apps_ids: - for app_id in available_apps_ids: - request_url += f"&availableAppsIds={app_id}" + url_params["availableAppsIDs"] = [str(app_id) for app_id in available_apps_ids] + request_url: str = f"{base_url}?{urlencode(url_params)}" response: HTTPClientResponse = self.client.api_get(request_url, token=admin_token) return response.to_dict() + +class NGOHub(NGOHubRaw): + def __init__(self, api_base_url: str) -> None: + super().__init__(api_base_url) + + def is_healthy(self) -> bool: + return bool(self.get_raw_health() == "OK") + + def get_version(self) -> Version: + return normalize_version(self.get_raw_version()) + + def get_file_url(self, path: str) -> str: + return self.get_raw_file_url(path) + + # Organization related methods + def get_organization_profile(self, ngo_token: str) -> Organization: + response: Dict[str, Any] = self.get_raw_organization_profile(ngo_token) + + return normalize_organization_data(response) + + def get_user_organization_applications(self, ngo_token: str) -> List[OrganizationApplication]: + response: List[Dict[str, Any]] = self.get_raw_user_organization_applications(ngo_token) + + return normalize_organization_applications(response) + + def check_user_organization_has_application( + self, ngo_token: str, login_link: str + ) -> Union[OrganizationApplication, None]: + organization_applications: List[OrganizationApplication] = self.get_user_organization_applications(ngo_token) + + for app in organization_applications: + if app.login_link.startswith(login_link) and app.status == "active" and app.ngo_status == "active": + return app + + return None + + # Admin related methods + def get_application_list(self, admin_token: str) -> List[Application]: + response: List[Dict[str, Any]] = self.get_raw_application_list(admin_token) + + return normalize_application_list(response) + + def get_organization(self, admin_token: str, organization_id: int) -> Organization: + response: Dict[str, Any] = self.get_raw_organization(admin_token=admin_token, organization_id=organization_id) + + return normalize_organization_data(response) + + def get_organization_applications(self, admin_token: str, organization_id: int) -> List[OrganizationApplication]: + response = self.get_raw_organization_applications(admin_token, organization_id) + + return normalize_organization_applications(response) + + def get_user(self, admin_token: str, user_id: int) -> User: + response: Dict[str, Any] = self.get_user_raw(admin_token, user_id) + + return normalize_user(response) + def check_organization_has_application( self, admin_token: str, organization_id: int, login_link: str - ) -> Dict[str, Any]: - organization_applications: List[Dict[str, Any]] = self.get_organization_applications( + ) -> Optional[OrganizationApplication]: + organization_applications: List[OrganizationApplication] = self.get_organization_applications( admin_token, organization_id ) for application in organization_applications: if ( - application["loginLink"].startswith(login_link) - and application["status"] == "active" - and application["ongStatus"] == "active" + application.login_link.startswith(login_link) + and application.status == "active" + and application.ngo_status == "active" ): return application - return {} + return None - def _check_user_has_application(self, admin_token, organization_id, user_id, response) -> Dict[str, Any]: + def _check_user_has_application( + self, admin_token: str, organization_id: int, user_id: int, response: CheckOrganizationUserApplication + ) -> CheckOrganizationUserApplication: continue_searching: bool = True page: int = 1 - response: Dict[str, Any] = deepcopy(response) - searched_application_id = response["application"]["id"] + searched_application_id = response.application.id while continue_searching: - organization_users: Dict[str, Union[Dict, List]] = self.get_users( + organization_users: Dict[str, Union[Dict, List]] = self.get_raw_users( admin_token=admin_token, organization_id=organization_id, page=page ) @@ -245,14 +284,14 @@ def _check_user_has_application(self, admin_token, organization_id, user_id, res for user in response_users: if user["id"] == int(user_id): - response["user"] = user + response.user = user user_applications: List[Dict[str, Any]] = user["availableAppsIDs"] if searched_application_id in user_applications: - response["access"] = True + response.has_access = True return response - response["access"] = False + response.has_access = False return response return response @@ -263,30 +302,30 @@ def check_organization_user_has_application( organization_id: int, user_id: int, login_link: str, - ) -> Dict[str, Any]: + ) -> CheckOrganizationUserApplication: - response: Dict[str, Any] = {"user": None, "application": None, "access": None} + response: CheckOrganizationUserApplication = CheckOrganizationUserApplication() - organization_application: Dict = self.check_organization_has_application( + organization_application: OrganizationApplication = self.check_organization_has_application( admin_token, organization_id, login_link ) if not organization_application: return response - response["application"] = organization_application + response.application = organization_application try: - user_information: Dict = self.get_user(admin_token, user_id) + user: User = self.get_user(admin_token, user_id) except MissingUserException: return response - if not user_information or user_information.get("organizationId") != int(organization_id): + if not user or user.organization_id != int(organization_id): return response - response["user"] = user_information + response.user = user - if user_information.get("role") == "admin": - response["access"] = True + if user.role == "admin": + response.has_access = True return response diff --git a/src/ngohub/models/__init__.py b/src/ngohub/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ngohub/models/checks.py b/src/ngohub/models/checks.py new file mode 100644 index 0000000..8a2ac8d --- /dev/null +++ b/src/ngohub/models/checks.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from typing import Dict, Union + +from ngohub.models.organization import OrganizationApplication + + +@dataclass +class CheckOrganizationUserApplication: + user: Union[Dict, None] + application: Union[OrganizationApplication, None] + has_access: Union[bool, None] + + def __init__(self) -> None: + self.user = None + self.application = None + self.has_access = None diff --git a/src/ngohub/models/core.py b/src/ngohub/models/core.py new file mode 100644 index 0000000..87cca31 --- /dev/null +++ b/src/ngohub/models/core.py @@ -0,0 +1,5 @@ +from dataclasses import dataclass + + +@dataclass +class BaseDataclass: ... diff --git a/src/ngohub/models/locations.py b/src/ngohub/models/locations.py new file mode 100644 index 0000000..ca1031a --- /dev/null +++ b/src/ngohub/models/locations.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class City: + id: int + name: str + county_id: int + created_on: datetime + + +@dataclass +class County: + id: int + name: str + abbreviation: str + region_id: int + created_on: datetime diff --git a/src/ngohub/models/nomenclatures.py b/src/ngohub/models/nomenclatures.py new file mode 100644 index 0000000..65a2997 --- /dev/null +++ b/src/ngohub/models/nomenclatures.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class Domain: + id: int + name: str + created_on: datetime + updated_on: datetime diff --git a/src/ngohub/models/organization.py b/src/ngohub/models/organization.py new file mode 100644 index 0000000..45db29b --- /dev/null +++ b/src/ngohub/models/organization.py @@ -0,0 +1,197 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import List + +from ngohub.models.core import BaseDataclass +from ngohub.models.locations import City, County +from ngohub.models.nomenclatures import Domain + + +@dataclass +class OrganizationContact(BaseDataclass): + email: str + phone: str + full_name: str + + +@dataclass +class OrganizationLegalReprezentative(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + full_name: str + email: str + phone: str + role: str + + +@dataclass +class OrganizationDirector(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + full_name: str + email: str + phone: str + role: str + + +@dataclass +class OrganizationReportFile(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + year: int + status: str + + +@dataclass +class OrganizationReports(OrganizationReportFile): + number_of_volunteers: int + number_of_contractors: int + report: str + + +@dataclass +class OrganizationPartners(OrganizationReportFile): + number_of_partners: int + path: str + + +@dataclass +class OrganizationInvestors(OrganizationReportFile): + number_of_investors: int + path: str + + +@dataclass +class OrganizationGeneral(BaseDataclass): + id: str + created_on: datetime + updated_on: datetime + name: str + alias: str + type: str + email: str + phone: str + year_created: str + cui: str + association_registry_number: str + association_registry_part: str + association_registry_section: str + association_registry_issuer_id: str + national_registry_number: str + raf_number: str + short_description: str + description: str + address: str + logo: str + website: str + facebook: str + instagram: str + twitter: str + linkedin: str + tiktok: str + donation_website: str + redirect_link: str + donation_sms: str + donation_keyword: str + contact: OrganizationContact + organization_address: str + city: City + county: County + organization_city: str + organization_county: str + association_registry_issuer: str + + +@dataclass +class OrganizationActivity(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + area: str + is_part_of_federation: bool + is_part_of_coalition: bool + is_part_of_international_organization: bool + international_organization_name: str + is_social_service_viable: bool + offers_grants: bool + is_public_interest_organization: bool + has_branches: bool + federations: List[str] + coalitions: List[str] + domains: List[Domain] + cities: List[str] + branches: List[str] + regions: List[str] + + +@dataclass +class OrganizationLegal(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + others: str + organization_statute: str + non_political_affiliation_file: str + balance_sheet_file: str + legal_reprezentative: OrganizationLegalReprezentative + directors: List[OrganizationDirector] + + +@dataclass +class OrganizationFinancial(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + type: str + number_of_employees: int + year: int + total: int + status: str + report_status: str + synced_anaf: bool + data: str + + +@dataclass +class OrganizationReport(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + reports: List[OrganizationReports] + partners: List[OrganizationPartners] + investors: List[OrganizationInvestors] + + +@dataclass +class Organization(BaseDataclass): + id: int + created_on: datetime + updated_on: datetime + status: str + general_data: OrganizationGeneral + activity_data: OrganizationActivity + legal_data: OrganizationLegal + financial_data: List[OrganizationFinancial] + report_data: OrganizationReport + + +@dataclass +class Application(BaseDataclass): + id: int + name: str + + +@dataclass +class OrganizationApplication(Application): + logo: str + short_description: str + login_link: str + website: str + status: str + ngo_status: str + created_on: datetime + type: str + application_label: str diff --git a/src/ngohub/models/public.py b/src/ngohub/models/public.py new file mode 100644 index 0000000..13ed2ff --- /dev/null +++ b/src/ngohub/models/public.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + +from ngohub.models.core import BaseDataclass + + +@dataclass +class Version(BaseDataclass): + version: str + revision: str diff --git a/src/ngohub/models/user.py b/src/ngohub/models/user.py new file mode 100644 index 0000000..380ed15 --- /dev/null +++ b/src/ngohub/models/user.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class User: + id: int + created_on: datetime + updated_on: datetime + cognito_id: str + name: str + email: str + phone: str + role: str + status: str + organization_id: int diff --git a/src/ngohub/normalization/helpers.py b/src/ngohub/normalization/helpers.py deleted file mode 100644 index 5afbf64..0000000 --- a/src/ngohub/normalization/helpers.py +++ /dev/null @@ -1,22 +0,0 @@ -from datetime import datetime -from typing import Any, Dict - - -def extract_key(data: Dict[str, Any], key: str, *back_up_keys: str) -> Any: - try: - return data[key] - except KeyError: - if not back_up_keys: - return None - - for back_up in back_up_keys: - try: - return data[back_up] - except KeyError: - continue - - return None - - -def convert_date(date: str) -> datetime: - return datetime.fromisoformat(date) diff --git a/src/ngohub/normalization/organization.py b/src/ngohub/normalization/organization.py index b6e9559..340b48c 100644 --- a/src/ngohub/normalization/organization.py +++ b/src/ngohub/normalization/organization.py @@ -1,238 +1,137 @@ from typing import Any, Dict, List -from ngohub.normalization.helpers import convert_date, extract_key - - -def _normalize_organization_general(org_general_data: Dict) -> Dict: - normal_data: Dict = { - "id": extract_key(org_general_data, "id"), - "created_on": convert_date(extract_key(org_general_data, "createdOn")), - "updated_on": convert_date(extract_key(org_general_data, "updatedOn")), - "name": extract_key(org_general_data, "name"), - "alias": extract_key(org_general_data, "alias"), - "type": extract_key(org_general_data, "type"), - "email": extract_key(org_general_data, "email"), - "phone": extract_key(org_general_data, "phone"), - "year_created": extract_key(org_general_data, "yearCreated"), - "cui": extract_key(org_general_data, "cui"), - "association_registry_number": extract_key(org_general_data, "associationRegistryNumber"), - "association_registry_part": extract_key(org_general_data, "associationRegistryPart"), - "association_registry_section": extract_key(org_general_data, "associationRegistrySection"), - "association_registry_issuer_id": extract_key(org_general_data, "associationRegistryIssuerId"), - "national_registry_number": extract_key(org_general_data, "nationalRegistryNumber"), - "raf_number": extract_key(org_general_data, "rafNumber"), - "short_description": extract_key(org_general_data, "shortDescription"), - "description": extract_key(org_general_data, "description"), - "address": extract_key(org_general_data, "address"), - "logo": extract_key(org_general_data, "logo"), - "website": extract_key(org_general_data, "website"), - "facebook": extract_key(org_general_data, "facebook"), - "instagram": extract_key(org_general_data, "instagram"), - "twitter": extract_key(org_general_data, "twitter"), - "linkedin": extract_key(org_general_data, "linkedin"), - "tiktok": extract_key(org_general_data, "tiktok"), - "donation_website": extract_key(org_general_data, "donationWebsite"), - "redirect_link": extract_key(org_general_data, "redirectLink"), - "donation_sms": extract_key(org_general_data, "donationSMS"), - "donation_keyword": extract_key(org_general_data, "donationKeyword"), - "contact": { - "email": extract_key(extract_key(org_general_data, "contact"), "email"), - "phone": extract_key(extract_key(org_general_data, "contact"), "phone"), - "full_name": extract_key(extract_key(org_general_data, "contact"), "fullName"), - }, - "organization_address": extract_key(org_general_data, "organizationAddress"), - "city": { - "id": extract_key(extract_key(org_general_data, "city"), "id"), - "name": extract_key(extract_key(org_general_data, "city"), "name"), - "county_id": extract_key(extract_key(org_general_data, "city"), "countyId"), - "created_on": convert_date(extract_key(extract_key(org_general_data, "city"), "createdOn")), - }, - "county": { - "id": extract_key(extract_key(org_general_data, "county"), "id"), - "name": extract_key(extract_key(org_general_data, "county"), "name"), - "abbreviation": extract_key(extract_key(org_general_data, "county"), "abbreviation"), - "region_id": extract_key(extract_key(org_general_data, "county"), "regionId"), - "created_on": convert_date(extract_key(extract_key(org_general_data, "county"), "createdOn")), - }, - "organization_city": extract_key(org_general_data, "organizationCity"), - "organization_county": extract_key(org_general_data, "organizationCounty"), - "association_registry_issuer": extract_key(org_general_data, "associationRegistryIssuer"), - } +from ngohub.models.locations import City, County +from ngohub.models.nomenclatures import Domain +from ngohub.models.organization import ( + Application, + Organization, + OrganizationActivity, + OrganizationApplication, + OrganizationContact, + OrganizationDirector, + OrganizationFinancial, + OrganizationGeneral, + OrganizationInvestors, + OrganizationLegal, + OrganizationLegalReprezentative, + OrganizationPartners, + OrganizationReport, + OrganizationReports, +) +from ngohub.normalization.processors import ( + camel_to_snake_case_dictionary, + camel_to_snake_case_dictionary_list, + convert_date, + extract_key, +) + + +def _normalize_organization_general(org_general_data: Dict) -> OrganizationGeneral: + org_general_data["contact"] = OrganizationContact(**org_general_data["contact"]) + org_general_data["city"] = City(**org_general_data["city"]) + org_general_data["county"] = County(**org_general_data["county"]) + + org_general = OrganizationGeneral(**org_general_data) + + return org_general + + +def _normalize_organization_activity(org_activity_data: Dict) -> OrganizationActivity: + org_domains: List[Domain] = [] + for domain in org_activity_data["domains"]: + org_domains.append(Domain(**domain)) + + org_activity_data["domains"] = org_domains + normal_data = OrganizationActivity(**org_activity_data) return normal_data -def _normalize_organization_activity(org_activity_data: Dict) -> Dict: - org_domains: List[Dict[str, Any]] = [] - for domain in extract_key(org_activity_data, "domains"): - org_domains.append( - { - "id": extract_key(domain, "id"), - "name": extract_key(domain, "name"), - "created_on": convert_date(extract_key(domain, "createdOn")), - "updated_on": convert_date(extract_key(domain, "updatedOn")), - } - ) - - is_public_interest_org = extract_key( - org_activity_data, - "isPublicIntrestOrganization", - "isPublicInterestOrganization", - ) - normal_data: Dict = { - "id": extract_key(org_activity_data, "id"), - "created_on": convert_date(extract_key(org_activity_data, "createdOn")), - "updated_on": convert_date(extract_key(org_activity_data, "updatedOn")), - "area": extract_key(org_activity_data, "area"), - "is_part_of_federation": extract_key(org_activity_data, "isPartOfFederation"), - "is_part_of_coalition": extract_key(org_activity_data, "isPartOfCoalition"), - "is_part_of_international_organization": extract_key(org_activity_data, "isPartOfInternationalOrganization"), - "international_organization_name": extract_key(org_activity_data, "internationalOrganizationName"), - "is_social_service_viable": extract_key(org_activity_data, "isSocialServiceViable"), - "offers_grants": extract_key(org_activity_data, "offersGrants"), - "is_public_interest_organization": is_public_interest_org, - "has_branches": extract_key(org_activity_data, "hasBranches"), - "federations": extract_key(org_activity_data, "federations"), - "coalitions": extract_key(org_activity_data, "coalitions"), - "domains": org_domains, - "cities": extract_key(org_activity_data, "cities"), - "branches": extract_key(org_activity_data, "branches"), - "regions": extract_key(org_activity_data, "regions"), - } - return normal_data +def _normalize_organization_legal(org_legal_data: Dict) -> OrganizationLegal: + org_directors: List[OrganizationDirector] = [] + for director in extract_key(org_legal_data, "directors"): + org_directors.append(OrganizationDirector(**director)) + org_legal_data["directors"] = org_directors + org_legal_data["legal_reprezentative"] = OrganizationLegalReprezentative(**org_legal_data["legal_reprezentative"]) -def _normalize_organization_legal(org_legal_data: Dict) -> Dict: - org_directors: List[Dict[str, Any]] = [] - for director in extract_key(org_legal_data, "directors"): - org_directors.append( - { - "id": extract_key(director, "id"), - "created_on": convert_date(extract_key(director, "createdOn")), - "updated_on": convert_date(extract_key(director, "updatedOn")), - "full_name": extract_key(director, "fullName"), - "email": extract_key(director, "email"), - "phone": extract_key(director, "phone"), - "role": extract_key(director, "role"), - } - ) - - normal_data: Dict = { - "id": extract_key(org_legal_data, "id"), - "created_on": convert_date(extract_key(org_legal_data, "createdOn")), - "updated_on": convert_date(extract_key(org_legal_data, "updatedOn")), - "others": extract_key(org_legal_data, "others"), - "organization_statute": extract_key(org_legal_data, "organizationStatute"), - "non_political_affiliation_file": extract_key(org_legal_data, "nonPoliticalAffiliationFile"), - "balance_sheet_file": extract_key(org_legal_data, "balanceSheetFile"), - "legal_reprezentative": { - "id": extract_key(extract_key(org_legal_data, "legalReprezentative"), "id"), - "created_on": convert_date(extract_key(extract_key(org_legal_data, "legalReprezentative"), "createdOn")), - "updated_on": convert_date(extract_key(extract_key(org_legal_data, "legalReprezentative"), "updatedOn")), - "full_name": extract_key(extract_key(org_legal_data, "legalReprezentative"), "fullName"), - "email": extract_key(extract_key(org_legal_data, "legalReprezentative"), "email"), - "phone": extract_key(extract_key(org_legal_data, "legalReprezentative"), "phone"), - "role": extract_key(extract_key(org_legal_data, "legalReprezentative"), "role"), - }, - "directors": org_directors, - } + normal_data = OrganizationLegal(**org_legal_data) return normal_data -def _normalize_organization_financial(org_financial_data: Dict) -> Dict: - synced_anaf = extract_key(org_financial_data, "synched_anaf", "synced_anaf", "syncedAnaf") - normal_data: Dict = { - "id": extract_key(org_financial_data, "id"), - "created_on": extract_key(org_financial_data, "createdOn"), - "updated_on": extract_key(org_financial_data, "updatedOn"), - "type": extract_key(org_financial_data, "type"), - "number_of_employees": extract_key(org_financial_data, "numberOfEmployees"), - "year": extract_key(org_financial_data, "year"), - "total": extract_key(org_financial_data, "total"), - "status": extract_key(org_financial_data, "status"), - "report_status": extract_key(org_financial_data, "reportStatus"), - "synced_anaf": synced_anaf, - "data": extract_key(org_financial_data, "data"), - } +def _normalize_organization_financial(org_financial_data_set: List[Dict]) -> List[OrganizationFinancial]: + normal_data = [OrganizationFinancial(**org_financial_data) for org_financial_data in org_financial_data_set] return normal_data -def _normalize_organization_report(org_report_data: Dict) -> Dict: - org_reports: List[Dict[str, Any]] = [] - org_partners: List[Dict[str, Any]] = [] - org_investors: List[Dict[str, Any]] = [] +def _normalize_organization_report(org_report_data: Dict) -> OrganizationReport: + org_reports: List[OrganizationReports] = [] + org_partners: List[OrganizationPartners] = [] + org_investors: List[OrganizationInvestors] = [] for report in extract_key(org_report_data, "reports"): - org_reports.append( - { - "id": extract_key(report, "id"), - "created_on": convert_date(extract_key(report, "createdOn")), - "updated_on": convert_date(extract_key(report, "updatedOn")), - "report": extract_key(report, "report"), - "numberOfVolunteers": extract_key(report, "numberOfVolunteers"), - "numberOfContractors": extract_key(report, "numberOfContractors"), - "year": extract_key(report, "year"), - "status": extract_key(report, "status"), - } - ) + org_reports.append(OrganizationReports(**report)) for partner in extract_key(org_report_data, "partners"): - org_partners.append( - { - "id": extract_key(partner, "id"), - "created_on": convert_date(extract_key(partner, "createdOn")), - "updated_on": convert_date(extract_key(partner, "updatedOn")), - "year": extract_key(partner, "year"), - "number_of_partners": extract_key(partner, "numberOfPartners"), - "status": extract_key(partner, "status"), - "path": extract_key(partner, "path"), - } - ) + org_partners.append(OrganizationPartners(**partner)) for investor in extract_key(org_report_data, "investors"): - org_investors.append( - { - "id": extract_key(investor, "id"), - "created_on": convert_date(extract_key(investor, "createdOn")), - "updated_on": convert_date(extract_key(investor, "updatedOn")), - "year": extract_key(investor, "year"), - "number_of_investors": extract_key(investor, "numberOfInvestors"), - "status": extract_key(investor, "status"), - "path": extract_key(investor, "path"), - } - ) - - normal_data: Dict = { - "id": extract_key(org_report_data, "id"), - "created_on": extract_key(org_report_data, "createdOn"), - "updated_on": extract_key(org_report_data, "updatedOn"), - "reports": org_reports, - "partners": org_partners, - "investors": org_investors, - } + org_investors.append(OrganizationInvestors(**investor)) + + org_report_data["reports"] = org_reports + org_report_data["partners"] = org_partners + org_report_data["investors"] = org_investors + + normal_data = OrganizationReport(**org_report_data) return normal_data -def normalize_organization_data(org_data: Dict) -> Dict: - general = _normalize_organization_general(extract_key(org_data, "organizationGeneral")) - activity = _normalize_organization_activity(extract_key(org_data, "organizationActivity")) - legal = _normalize_organization_legal(extract_key(org_data, "organizationLegal")) - financial = _normalize_organization_financial(extract_key(org_data, "organizationFinancial")) - report = _normalize_organization_report(extract_key(org_data, "organizationReport")) - - normal_data: Dict = { - "id": extract_key(org_data, "id"), - "created_on": convert_date(extract_key(org_data, "createdOn")), - "updated_on": convert_date(extract_key(org_data, "updatedOn")), - "status": extract_key(org_data, "status"), - "general": general, - "activity": activity, - "legal": legal, - "financial": financial, - "report": report, - } +def normalize_organization_data(org_data: Dict) -> Organization: + snake_case: Dict[str, any] = camel_to_snake_case_dictionary( + org_data, + overrides={ + "isPublicIntrestOrganization": "is_public_interest_organization", + "synched_anaf": "synced_anaf", + "organizationGeneral": "general_data", + "organizationActivity": "activity_data", + "organizationLegal": "legal_data", + "organizationFinancial": "financial_data", + "organizationReport": "report_data", + }, + cast_values={"created_on": convert_date, "updated_on": convert_date}, + ) + + snake_case["general_data"] = _normalize_organization_general(snake_case["general_data"]) + snake_case["activity_data"] = _normalize_organization_activity(snake_case["activity_data"]) + snake_case["legal_data"] = _normalize_organization_legal(snake_case["legal_data"]) + snake_case["financial_data"] = _normalize_organization_financial(snake_case["financial_data"]) + snake_case["report_data"] = _normalize_organization_report(snake_case["report_data"]) + + normal_data = Organization(**snake_case) + + return normal_data + + +def normalize_organization_applications(org_data_set: List[Dict[str, Any]]) -> List[OrganizationApplication]: + snake_case_response: List[Dict[str, any]] = camel_to_snake_case_dictionary_list( + org_data_set, + overrides={"ongStatus": "ngo_status"}, + cast_values={"created_on": convert_date}, + ) + + normal_data: List[OrganizationApplication] = [ + OrganizationApplication(**org_data) for org_data in snake_case_response + ] + + return normal_data + + +def normalize_application_list(application_data_set: List[Dict]) -> List[Application]: + snake_case_response: List[Dict[str, any]] = camel_to_snake_case_dictionary_list(application_data_set) + + normal_data: List[Application] = [Application(**app_data) for app_data in snake_case_response] return normal_data diff --git a/src/ngohub/normalization/processors.py b/src/ngohub/normalization/processors.py new file mode 100644 index 0000000..f4a979c --- /dev/null +++ b/src/ngohub/normalization/processors.py @@ -0,0 +1,89 @@ +import re +from datetime import datetime +from re import Pattern +from typing import Any, Dict, List, Optional + +camel_case_to_snake_case_pattern: Optional[Pattern[str]] = None + + +def _compile_camel_to_snake_case_pattern() -> Pattern[str]: + global camel_case_to_snake_case_pattern + + if not camel_case_to_snake_case_pattern: + camel_case_to_snake_case_pattern = re.compile( + r""" + (?<=[a-z]) # preceded by lowercase + (?=[A-Z]) # followed by uppercase + | # OR + (?<=[A-Z]) # preceded by uppercase + (?=[A-Z][a-z]) # followed by uppercase, then lowercase + """, + re.X, + ) + + return camel_case_to_snake_case_pattern + + +def camel_to_snake_case(input_string: str) -> str: + pattern: Pattern[str] = _compile_camel_to_snake_case_pattern() + + return pattern.sub("_", input_string).lower() + + +def camel_to_snake_case_dictionary( + input_dict: Dict[str, Any], + overrides: Dict[str, Any] = None, + cast_values: Dict[str, Any] = None, +) -> Dict[str, Any]: + output_dict: Dict[str, Any] = {} + + for old_key, value in input_dict.items(): + if overrides and old_key in overrides: + new_key = overrides[old_key] + else: + new_key = camel_to_snake_case(old_key) + + if new_key in output_dict: + raise ValueError(f"Cannot normalize {old_key} to {new_key} because { new_key} already exists") + + if cast_values and new_key in cast_values: + value = cast_values[new_key](value) + elif isinstance(value, dict): + value = camel_to_snake_case_dictionary(value, overrides, cast_values) + elif isinstance(value, list) and value and isinstance(value[0], dict): + value = camel_to_snake_case_dictionary_list(value, overrides, cast_values) + + output_dict[new_key] = value + + return output_dict + + +def camel_to_snake_case_dictionary_list( + input_list: List[Dict[str, Any]], + overrides: Dict[str, Any] = None, + cast_values: Dict[str, Any] = None, +) -> List[Dict[str, Any]]: + return [camel_to_snake_case_dictionary(item, overrides, cast_values) for item in input_list] + + +def convert_date(date: str) -> Optional[datetime]: + if not date: + return None + + return datetime.fromisoformat(date) + + +def extract_key(data: Dict[str, Any], key: str, *back_up_keys: str) -> Any: + try: + return data[key] + except KeyError: + if not back_up_keys: + return None + + for back_up in back_up_keys: + try: + return data[back_up] + except KeyError: + continue + + return None diff --git a/src/ngohub/normalization/public.py b/src/ngohub/normalization/public.py new file mode 100644 index 0000000..73b3d32 --- /dev/null +++ b/src/ngohub/normalization/public.py @@ -0,0 +1,9 @@ +from typing import Dict + +from ngohub.models.public import Version + + +def normalize_version(version_data: Dict[str, str]) -> Version: + normal_data = Version(**version_data) + + return normal_data diff --git a/src/ngohub/normalization/user.py b/src/ngohub/normalization/user.py new file mode 100644 index 0000000..0e4eea1 --- /dev/null +++ b/src/ngohub/normalization/user.py @@ -0,0 +1,20 @@ +from typing import Any, Dict + +from ngohub.models.user import User + + +def normalize_user(user_data: Dict[str, Any]) -> User: + normal_data = User( + id=user_data["id"], + created_on=user_data["createdOn"], + updated_on=user_data["updatedOn"], + cognito_id=user_data["cognitoId"], + name=user_data["name"], + email=user_data["email"], + phone=user_data["phone"], + role=user_data["role"], + status=user_data["status"], + organization_id=user_data["organizationId"], + ) + + return normal_data diff --git a/tests/conftest.py b/tests/conftest.py index 950d832..fa470c6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,14 +13,14 @@ def pytest_configure(): pytest.app_login_link = os.environ["APP_LOGIN_LINK"] - pytest.ngo_id = os.environ["NGO_ID"] - pytest.ngo_admin_id = os.environ["NGO_ADMIN_ID"] - pytest.ngo_user_id = os.environ["NGO_USER_ID"] - pytest.ngo_user_id_no_app = os.environ["NGO_USER_ID_NO_APP"] - - pytest.ngo_other_id = os.environ["NGO_OTHER_ID"] - pytest.ngo_other_admin_id = os.environ["NGO_OTHER_ADMIN_ID"] - pytest.ngo_other_user_id = os.environ["NGO_OTHER_USER_ID"] + pytest.ngo_id = int(os.environ["NGO_ID"]) + pytest.ngo_admin_id = int(os.environ["NGO_ADMIN_ID"]) + pytest.ngo_user_id = int(os.environ["NGO_USER_ID"]) + pytest.ngo_user_id_no_app = int(os.environ["NGO_USER_ID_NO_APP"]) + + pytest.ngo_other_id = int(os.environ["NGO_OTHER_ID"]) + pytest.ngo_other_admin_id = int(os.environ["NGO_OTHER_ADMIN_ID"]) + pytest.ngo_other_user_id = int(os.environ["NGO_OTHER_USER_ID"]) pytest.admin_username = os.environ["NGOHUB_API_ACCOUNT"] pytest.ngo_admin_username = os.environ["NGOHUB_NGO_ACCOUNT"] diff --git a/tests/test_end_to_end/schemas.py b/tests/test_end_to_end/schemas.py index c8ef1ec..4499fa7 100644 --- a/tests/test_end_to_end/schemas.py +++ b/tests/test_end_to_end/schemas.py @@ -336,3 +336,19 @@ ), } ) + + +USER_SCHEMA = Schema( + { + "cognitoId": str, + "createdOn": Regex(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z"), + "updatedOn": Regex(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z"), + "email": str, + "id": int, + "name": str, + "organizationId": int, + "phone": str, + "role": str, + "status": str, + } +) diff --git a/tests/test_end_to_end/test_nomenclatures.py b/tests/test_end_to_end/test_nomenclatures.py index a6ee203..1e7726d 100644 --- a/tests/test_end_to_end/test_nomenclatures.py +++ b/tests/test_end_to_end/test_nomenclatures.py @@ -20,19 +20,19 @@ def test_cities_nomenclatures_errors_if_no_params(): hub = NGOHub(pytest.ngohub_api_url) with pytest.raises(ValueError): - hub.get_cities_nomenclatures() + hub.get_raw_cities_nomenclatures() def test_cities_nomenclatures_returns_empty_response(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_cities_nomenclatures(search="UNKNOWN") + response = hub.get_raw_cities_nomenclatures(search="UNKNOWN") assert response == [] def test_cities_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_cities_nomenclatures(county_id=1, city_id=1) + response = hub.get_raw_cities_nomenclatures(county_id=1, city_id=1) assert len(response) == 1 assert NOMENCLATURE_CITIES_SCHEMA.validate(response) @@ -40,76 +40,76 @@ def test_cities_nomenclatures_schema(): def test_counties_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_counties_nomenclatures() + response = hub.get_raw_counties_nomenclatures() assert NOMENCLATURE_COUNTIES_SCHEMA.validate(response) def test_domains_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_domains_nomenclatures() + response = hub.get_raw_domains_nomenclatures() assert NOMENCLATURE_DOMAINS_SCHEMA.validate(response) def test_regions_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_regions_nomenclatures() + response = hub.get_raw_regions_nomenclatures() assert NOMENCLATURE_REGIONS_SCHEMA.validate(response) def test_federations_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_federations_nomenclatures() + response = hub.get_raw_federations_nomenclatures() assert NOMENCLATURE_FEDERATIONS_SCHEMA.validate(response) def test_coalitions_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_coalitions_nomenclatures() + response = hub.get_raw_coalitions_nomenclatures() assert NOMENCLATURE_COALITIONS_SCHEMA.validate(response) def test_faculties_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_faculties_nomenclatures() + response = hub.get_raw_faculties_nomenclatures() assert NOMENCLATURE_FACULTIES_SCHEMA.validate(response) def test_skills_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_skills_nomenclatures() + response = hub.get_raw_skills_nomenclatures() assert NOMENCLATURE_SKILLS_SCHEMA.validate(response) def test_practice_domains_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_practice_domains_nomenclatures() + response = hub.get_raw_practice_domains_nomenclatures() assert NOMENCLATURE_PRACTICE_DOMAINS_SCHEMA.validate(response) def test_service_domains_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_service_domains_nomenclatures() + response = hub.get_raw_service_domains_nomenclatures() assert NOMENCLATURE_SERVICE_DOMAINS_SCHEMA.validate(response) def test_beneficiaries_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_beneficiaries_nomenclatures() + response = hub.get_raw_beneficiaries_nomenclatures() assert NOMENCLATURE_BENEFIARIES_SCHEMA.validate(response) def test_issuers_nomenclatures_schema(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_issuers_nomenclatures() + response = hub.get_raw_issuers_nomenclatures() assert NOMENCLATURE_ISSUERS_SCHEMA.validate(response) diff --git a/tests/test_end_to_end/test_organization.py b/tests/test_end_to_end/test_organization.py index fb79bdd..fdc49de 100644 --- a/tests/test_end_to_end/test_organization.py +++ b/tests/test_end_to_end/test_organization.py @@ -2,6 +2,7 @@ from ngohub import NGOHub from ngohub.exceptions import HubHTTPException +from ngohub.models.organization import OrganizationApplication from tests.test_end_to_end.schemas import ( APPLICATION_LIST_SCHEMA, ORGANIZATION_APPLICATIONS_SCHEMA, @@ -21,7 +22,7 @@ def test_organization_profile_returns_401(): hub = NGOHub(pytest.ngohub_api_url) with pytest.raises(HubHTTPException): - hub.get_organization_profile(ngo_token="invalid_token") + hub.get_raw_organization_profile(ngo_token="invalid_token") def test_organization(): @@ -33,14 +34,14 @@ def test_organization(): def test_application_list(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_application_list(admin_token=pytest.admin_token) + response = hub.get_raw_application_list(admin_token=pytest.admin_token) assert APPLICATION_LIST_SCHEMA.validate(response) def test_organization_applications(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_organization_applications(admin_token=pytest.admin_token, organization_id=pytest.ngo_id) + response = hub.get_raw_organization_applications(admin_token=pytest.admin_token, organization_id=pytest.ngo_id) assert ORGANIZATION_APPLICATIONS_SCHEMA.validate(response) @@ -54,9 +55,9 @@ def test_check_missing_organization_returns_empty(): login_link=pytest.app_login_link, ) - assert response["user"] is None - assert response["application"] is None - assert response["access"] is None + assert response.user is None + assert response.application is None + assert response.has_access is None def test_check_missing_user_returns_empty(): @@ -68,9 +69,9 @@ def test_check_missing_user_returns_empty(): login_link=pytest.app_login_link, ) - assert response["user"] is None - assert response["application"] is not None - assert response["access"] is None + assert response.user is None + assert response.application is not None + assert response.has_access is None def test_check_organization_wrong_admin_does_not_have_application(): @@ -82,9 +83,9 @@ def test_check_organization_wrong_admin_does_not_have_application(): login_link=pytest.app_login_link, ) - assert response["user"] is None - assert response["application"] is not None - assert response["access"] is None + assert response.user is None + assert response.application is not None + assert response.has_access is None def test_check_organization_admin_has_application(): @@ -96,9 +97,9 @@ def test_check_organization_admin_has_application(): login_link=pytest.app_login_link, ) - assert response["user"] is not None - assert response["application"] is not None - assert response["access"] is True + assert response.user is not None + assert response.application is not None + assert response.has_access is True def test_check_organization_user_without_app_doesnt_have_application(): @@ -110,9 +111,9 @@ def test_check_organization_user_without_app_doesnt_have_application(): login_link=pytest.app_login_link, ) - assert response["user"] is not None - assert response["application"] is not None - assert response["access"] is False + assert response.user is not None + assert response.application is not None + assert response.has_access is False def test_check_organization_user_with_app_has_application(): @@ -124,9 +125,9 @@ def test_check_organization_user_with_app_has_application(): login_link=pytest.app_login_link, ) - assert response["user"] is not None - assert response["application"] is not None - assert response["access"] is True + assert response.user is not None + assert response.application is not None + assert response.has_access is True def test_check_user_organization_doesnt_have_missing_application(): @@ -136,7 +137,7 @@ def test_check_user_organization_doesnt_have_missing_application(): login_link="https://random-a3qlpo8tqsyxa0utl.com", ) - assert response == {} + assert response is None def test_check_user_organization_has_application(): @@ -146,5 +147,5 @@ def test_check_user_organization_has_application(): login_link=pytest.app_login_link, ) - assert response != {} - assert response["loginLink"].startswith(pytest.app_login_link) + assert isinstance(response, OrganizationApplication) + assert response.login_link.startswith(pytest.app_login_link) diff --git a/tests/test_end_to_end/test_status.py b/tests/test_end_to_end/test_status.py index 72982b6..283678d 100644 --- a/tests/test_end_to_end/test_status.py +++ b/tests/test_end_to_end/test_status.py @@ -4,16 +4,31 @@ from tests.test_end_to_end.schemas import VERSION_REVISION_SCHEMA -def test_health_returns_ok(): +def test_raw_health_returns_ok(): hub = NGOHub(pytest.ngohub_api_url) - assert hub.is_healthy() + + assert hub.get_raw_health() == "OK" + + +def test_health_returns_true(): + hub = NGOHub(pytest.ngohub_api_url) + + assert hub.is_healthy() is True + + +def test_raw_version_returns_version_revision_dict(): + hub = NGOHub(pytest.ngohub_api_url) + response = hub.get_raw_version() + + assert VERSION_REVISION_SCHEMA.validate(response) def test_version_returns_version_revision(): hub = NGOHub(pytest.ngohub_api_url) response = hub.get_version() - assert VERSION_REVISION_SCHEMA.validate(response) + assert response.version is not None + assert response.revision is not None def test_file_returns_path(): diff --git a/tests/test_end_to_end/test_user.py b/tests/test_end_to_end/test_user.py index 7b5d175..f510b93 100644 --- a/tests/test_end_to_end/test_user.py +++ b/tests/test_end_to_end/test_user.py @@ -1,12 +1,12 @@ import pytest from ngohub import NGOHub -from tests.test_end_to_end.schemas import PROFILE_SCHEMA +from tests.test_end_to_end.schemas import PROFILE_SCHEMA, USER_SCHEMA def test_user_returns_profile(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_profile(user_token=pytest.ngo_admin_token) + response = hub.get_raw_profile(user_token=pytest.ngo_admin_token) assert PROFILE_SCHEMA.validate(response) assert response["email"] == pytest.ngo_admin_username @@ -14,7 +14,16 @@ def test_user_returns_profile(): def test_admin_returns_profile(): hub = NGOHub(pytest.ngohub_api_url) - response = hub.get_profile(user_token=pytest.admin_token) + response = hub.get_raw_profile(user_token=pytest.admin_token) assert PROFILE_SCHEMA.validate(response) assert response["email"] == pytest.admin_username + + +def test_get_user_returns_user(): + hub = NGOHub(pytest.ngohub_api_url) + response = hub.get_user_raw(admin_token=pytest.admin_token, user_id=pytest.ngo_admin_id) + + assert USER_SCHEMA.validate(response) + assert response["email"] == pytest.ngo_admin_username + assert response["id"] == pytest.ngo_admin_id diff --git a/tests/test_normalization/__init__.py b/tests/test_normalization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_normalization/test_case_conversion.py b/tests/test_normalization/test_case_conversion.py new file mode 100644 index 0000000..b99fda6 --- /dev/null +++ b/tests/test_normalization/test_case_conversion.py @@ -0,0 +1,63 @@ +from ngohub.normalization.processors import camel_to_snake_case_dictionary, camel_to_snake_case_dictionary_list + + +def test_camel_to_snake_case_dictionary(): + test_dictionary = { + "camelCase": "value", + "anotherCamelCase": "value", + "anIntegerCamelCase": 22, + "aCamelCaseIsoDate": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + } + + expected_result = { + "camel_case": "value", + "another_camel_case": "value", + "an_integer_camel_case": 22, + "a_camel_case_iso_date": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + } + + actual_result = camel_to_snake_case_dictionary(test_dictionary) + + assert actual_result == expected_result + + +def test_camel_to_snake_case_dictionary_list(): + test_list = [ + { + "camelCase": "value", + "anotherCamelCase": "value", + "anIntegerCamelCase": 22, + "aCamelCaseIsoDate": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + }, + { + "camelCase": "value", + "anotherCamelCase": "value", + "anIntegerCamelCase": 22, + "aCamelCaseIsoDate": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + }, + ] + + expected_result = [ + { + "camel_case": "value", + "another_camel_case": "value", + "an_integer_camel_case": 22, + "a_camel_case_iso_date": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + }, + { + "camel_case": "value", + "another_camel_case": "value", + "an_integer_camel_case": 22, + "a_camel_case_iso_date": "2021-01-01T00:00:00Z", + "a_snake_case_key": "value", + }, + ] + + actual_result = camel_to_snake_case_dictionary_list(test_list) + + assert actual_result == expected_result