-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
325 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from hope_country_workspace.config import env | ||
|
||
KOBO_KF_URL=env("KOBO_KF_URL") | ||
KOBO_KC_URL=env("KOBO_KC_URL") | ||
KOBO_MASTER_API_TOKEN=env("KOBO_MASTER_API_TOKEN") |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
import logging | ||
import time | ||
import typing | ||
from io import BytesIO | ||
from typing import Any, Dict, List, Optional, Tuple | ||
from urllib.parse import urlparse | ||
|
||
from django.conf import settings | ||
|
||
import requests | ||
from requests.adapters import HTTPAdapter | ||
from requests.exceptions import RetryError | ||
|
||
from .common import filter_by_owner | ||
# from hct_mis_api.apps.core.models import BusinessArea, XLSXKoboTemplate | ||
# from hct_mis_api.apps.utils.exceptions import log_and_raise | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TokenNotProvided(Exception): | ||
pass | ||
|
||
|
||
class TokenInvalid(Exception): | ||
pass | ||
|
||
|
||
class KoboRequestsSession(requests.Session): | ||
AUTH_DOMAINS = [urlparse(settings.KOBO_KF_URL).hostname, urlparse(settings.KOBO_KC_URL).hostname] | ||
|
||
def should_strip_auth(self, old_url: str, new_url: str) -> bool: | ||
new_parsed = urlparse(new_url) | ||
if new_parsed.hostname in KoboRequestsSession.AUTH_DOMAINS: | ||
return False | ||
return super().should_strip_auth(old_url, new_url) # type: ignore # FIXME: Call to untyped function "should_strip_auth" in typed context | ||
|
||
|
||
class KoboAPI: | ||
def __init__(self, business_area_slug: Optional[str] = None): | ||
if business_area_slug is not None: | ||
self.business_area = BusinessArea.objects.get(slug=business_area_slug) | ||
self.KPI_URL = self.business_area.kobo_url or settings.KOBO_KF_URL | ||
else: | ||
self.business_area = None | ||
self.KPI_URL = settings.KOBO_KF_URL | ||
|
||
self._get_token() | ||
|
||
def _handle_paginated_results(self, url: str) -> List[Dict]: | ||
next_url = url | ||
results: List = [] | ||
|
||
# if there will be more than 30000 results, | ||
# we need to make additional queries | ||
while next_url: | ||
data = self._handle_request(next_url) | ||
next_url = data["next"] | ||
results.extend(data["results"]) | ||
return results | ||
|
||
def _get_url( | ||
self, | ||
endpoint: str, | ||
append_api: bool = True, | ||
add_limit: bool = True, | ||
additional_query_params: Optional[Any] = None, | ||
) -> str: | ||
endpoint.strip("/") | ||
if endpoint != "token" and append_api is True: | ||
endpoint = f"api/v2/{endpoint}" | ||
# According to the Kobo API documentation, | ||
# the maximum limit per page is 30000 | ||
query_params = f"format=json{'&limit=30000' if add_limit else ''}" | ||
if additional_query_params is not None: | ||
query_params += f"&{additional_query_params}" | ||
return f"{self.KPI_URL}/{endpoint}?{query_params}" | ||
|
||
def _get_token(self) -> None: | ||
self._client = KoboRequestsSession() | ||
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504], allowed_methods=False) | ||
self._client.mount(self.KPI_URL, HTTPAdapter(max_retries=retries)) | ||
|
||
if self.business_area is None: | ||
token = settings.KOBO_MASTER_API_TOKEN | ||
else: | ||
token = self.business_area.kobo_token | ||
|
||
if not token: | ||
msg = f"KOBO Token is not set for business area {self.business_area}" | ||
logger.error(msg) | ||
raise TokenNotProvided(msg) | ||
|
||
self._client.headers.update({"Authorization": f"token {token}"}) | ||
|
||
def _handle_request(self, url: str) -> Dict: | ||
response = self._client.get(url=url) | ||
try: | ||
response.raise_for_status() | ||
except requests.exceptions.HTTPError as e: | ||
logger.exception(e) | ||
raise | ||
return response.json() | ||
|
||
def _post_request( | ||
self, url: str, data: Optional[Dict] = None, files: Optional[typing.IO] = None | ||
) -> requests.Response: | ||
return self._client.post(url=url, data=data, files=files) | ||
|
||
def _patch_request( | ||
self, url: str, data: Optional[Dict] = None, files: Optional[typing.IO] = None | ||
) -> requests.Response: | ||
return self._client.patch(url=url, data=data, files=files) | ||
|
||
def create_template_from_file( | ||
self, bytes_io_file: Optional[typing.IO], xlsx_kobo_template_object: XLSXKoboTemplate, template_id: str = "" | ||
) -> Optional[Tuple[Dict, str]]: | ||
data = { | ||
"name": "Untitled", | ||
"asset_type": "template", | ||
"description": "", | ||
"sector": "", | ||
"country": "", | ||
"share-metadata": False, | ||
} | ||
if not template_id: | ||
asset_response = self._post_request(url=self._get_url("assets/", add_limit=False), data=data) | ||
try: | ||
asset_response.raise_for_status() | ||
except requests.exceptions.HTTPError as e: | ||
logger.exception(e) | ||
raise | ||
asset_response_dict = asset_response.json() | ||
asset_uid = asset_response_dict.get("uid") | ||
else: | ||
asset_uid = template_id | ||
file_import_data = { | ||
"assetUid": asset_uid, | ||
"destination": self._get_url(f"assets/{asset_uid}/", append_api=False, add_limit=False), | ||
} | ||
file_import_response = self._post_request( | ||
url=self._get_url("imports/", append_api=False, add_limit=False), | ||
data=file_import_data, | ||
files={"file": bytes_io_file}, # type: ignore # FIXME | ||
) | ||
file_import_response_dict = file_import_response.json() | ||
url = file_import_response_dict.get("url") | ||
|
||
attempts = 5 | ||
while attempts >= 0: | ||
response_dict = self._handle_request(url) | ||
import_status = response_dict.get("status") | ||
if import_status == "processing": | ||
xlsx_kobo_template_object.status = XLSXKoboTemplate.PROCESSING | ||
xlsx_kobo_template_object.save() | ||
attempts -= 1 | ||
time.sleep(1) | ||
else: | ||
return response_dict, asset_uid | ||
|
||
log_and_raise("Fetching import data took too long", error_type=RetryError) | ||
return None | ||
|
||
def get_all_projects_data(self) -> List: | ||
if not self.business_area: | ||
logger.error("Business area is not provided") | ||
raise ValueError("Business area is not provided") | ||
projects_url = self._get_url("assets") | ||
|
||
results = self._handle_paginated_results(projects_url) | ||
return filter_by_owner(results, self.business_area) | ||
|
||
def get_single_project_data(self, uid: str) -> Dict: | ||
projects_url = self._get_url(f"assets/{uid}") | ||
|
||
return self._handle_request(projects_url) | ||
|
||
def get_project_submissions(self, uid: str, only_active_submissions: bool) -> List: | ||
additional_query_params = None | ||
if only_active_submissions: | ||
additional_query_params = 'query={"_validation_status.uid":"validation_status_approved"}' | ||
submissions_url = self._get_url( | ||
f"assets/{uid}/data", | ||
additional_query_params=additional_query_params, | ||
) | ||
|
||
return self._handle_paginated_results(submissions_url) | ||
|
||
def get_attached_file(self, url: str) -> BytesIO: | ||
response = self._client.get(url=url) | ||
try: | ||
response.raise_for_status() | ||
except requests.exceptions.HTTPError as e: | ||
logger.exception(e) | ||
raise | ||
return BytesIO(response.content) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
from typing import Any, Dict, List, Optional | ||
|
||
from dateutil.parser import parse | ||
|
||
from hct_mis_api.apps.core.models import BusinessArea | ||
from hct_mis_api.apps.household.models import NON_BENEFICIARY, RELATIONSHIP_UNKNOWN | ||
|
||
KOBO_FORM_INDIVIDUALS_COLUMN_NAME = "individual_questions" | ||
|
||
|
||
def reduce_asset(asset: Dict, *args: Any, **kwargs: Any) -> Dict: | ||
""" | ||
Takes from asset only values that are needed by our frontend. | ||
{ | ||
"uid": "aY2dvQ64KudGV5UdSvJkB6", | ||
"name": "Test", | ||
"sector": "Humanitarian - Education", | ||
"country": "Afghanistan", | ||
"asset_type": "survey", | ||
"date_modified": "2020-05-20T10:43:58.781178Z", | ||
"deployment_active": False, | ||
"has_deployment": False, | ||
"xls_link": "https://kobo.humanitarianresponse.info/ | ||
api/v2/assets/aY2dvQ64KudGV5UdSvJkB6.xls", | ||
} | ||
""" | ||
download_link = "" | ||
for element in asset["downloads"]: | ||
if element["format"] == "xls": | ||
download_link = element["url"] | ||
|
||
settings = asset.get("settings") | ||
country = None | ||
sector = None | ||
|
||
if settings: | ||
if sector := settings.get("sector"): | ||
sector = sector.get("label") | ||
if country := settings.get("country"): | ||
country = next(iter(country)).get("label") if isinstance(country, list) else country.get("label") | ||
|
||
return { | ||
"id": asset["uid"], | ||
"name": asset["name"], | ||
"sector": sector, | ||
"country": country, | ||
"asset_type": asset["asset_type"], | ||
"date_modified": parse(asset["date_modified"]), | ||
"deployment_active": asset["deployment__active"], | ||
"has_deployment": asset["has_deployment"], | ||
"xls_link": download_link, | ||
} | ||
|
||
|
||
def get_field_name(field_name: str) -> str: | ||
if "/" in field_name: | ||
return field_name.split("/")[-1] | ||
else: | ||
return field_name | ||
|
||
|
||
def reduce_assets_list(assets: List, deployed: bool = True, *args: Any, **kwarg: Any) -> List: | ||
if deployed: | ||
return [reduce_asset(asset) for asset in assets if asset["has_deployment"] and asset["deployment__active"]] | ||
return [reduce_asset(asset) for asset in assets] | ||
|
||
|
||
def count_population(results: list, business_area: BusinessArea) -> tuple[int, int]: | ||
from hashlib import sha256 | ||
|
||
from hct_mis_api.apps.core.utils import rename_dict_keys | ||
from hct_mis_api.apps.registration_datahub.models import KoboImportedSubmission | ||
from hct_mis_api.apps.registration_datahub.tasks.utils import ( | ||
get_submission_metadata, | ||
) | ||
|
||
total_households_count = 0 | ||
total_individuals_count = 0 | ||
seen_hash_keys = [] | ||
for result in results: | ||
submission_meta_data = get_submission_metadata(result) | ||
|
||
if business_area.get_sys_option("ignore_amended_kobo_submissions"): | ||
submission_meta_data["amended"] = False | ||
|
||
submission_exists = KoboImportedSubmission.objects.filter(**submission_meta_data).exists() | ||
if submission_exists is False: | ||
total_households_count += 1 | ||
for individual_data in result[KOBO_FORM_INDIVIDUALS_COLUMN_NAME]: | ||
fields: Dict[str, Optional[str]] = { | ||
"given_name_i_c": None, | ||
"middle_name_i_c": None, | ||
"family_name_i_c": None, | ||
"full_name_i_c": None, | ||
"gender_i_c": None, | ||
"birth_date_i_c": None, | ||
"estimated_birth_date_i_c": None, | ||
"phone_no_i_c": None, | ||
"phone_no_alternative_i_c": None, | ||
} | ||
reduced_submission = rename_dict_keys(individual_data, get_field_name) | ||
for field_name in fields: | ||
fields[field_name] = str(reduced_submission.get(field_name)) | ||
hash_key = sha256(";".join(fields.values()).encode()).hexdigest() # type: ignore | ||
seen_hash_keys.append(hash_key) | ||
total_individuals_count += 1 | ||
if ( | ||
reduced_submission.get("relationship_i_c", RELATIONSHIP_UNKNOWN).upper() == NON_BENEFICIARY | ||
and seen_hash_keys.count(hash_key) > 1 | ||
): | ||
total_individuals_count -= 1 | ||
|
||
return total_households_count, total_individuals_count | ||
|
||
|
||
def filter_by_owner(data: List, business_area: BusinessArea) -> List: | ||
kobo_username = business_area.kobo_username | ||
if data: | ||
return [element for element in data if element["owner__username"] == kobo_username] | ||
return [] |