Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AB#207787 Information Exposure #4049

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/hct_mis_api/apps/household/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from hct_mis_api.apps.household.models import Household
from hct_mis_api.apps.payment.models import PaymentRecord
from hct_mis_api.apps.targeting.models import HouseholdSelection, TargetPopulation
from hct_mis_api.apps.utils.models import MergeStatusModel

if TYPE_CHECKING:
from hct_mis_api.apps.household.models import Individual


def get_household_status(household: Household) -> Tuple[str, datetime]:
if isinstance(household, Household):
if household.rdi_merge_status == MergeStatusModel.MERGED:
payment_records = PaymentRecord.objects.filter(household=household)
if payment_records.exists():
return "paid", payment_records.first().updated_at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,19 @@
from hct_mis_api.apps.household.fixtures import (
DocumentFactory,
DocumentTypeFactory,
PendingDocumentFactory,
PendingHouseholdFactory,
PendingIndividualFactory,
create_household,
)
from hct_mis_api.apps.household.models import (
HEAD,
IDENTIFICATION_TYPE_TAX_ID,
ROLE_NO_ROLE,
PendingIndividualRoleInHousehold,
)
from hct_mis_api.apps.payment.fixtures import PaymentRecordFactory
from hct_mis_api.apps.registration_datahub.fixtures import (
ImportedDocumentFactory,
ImportedDocumentTypeFactory,
ImportedHouseholdFactory,
ImportedIndividualFactory,
RegistrationDataImportDatahubFactory,
)
from hct_mis_api.apps.registration_datahub.models import (
ImportedIndividualRoleInHousehold,
)
from hct_mis_api.apps.registration_data.fixtures import RegistrationDataImportFactory
from hct_mis_api.apps.targeting.fixtures import TargetPopulationFactory
from hct_mis_api.apps.targeting.models import HouseholdSelection, TargetPopulation

Expand Down Expand Up @@ -73,19 +68,19 @@ def test_filtering_business_area_code_with_tax_id(self) -> None:
self.assertEqual(response_nok.status_code, 404)

def test_filtering_business_area_code_with_registration_id(self) -> None:
rdi_datahub = RegistrationDataImportDatahubFactory(business_area_slug=self.business_area.slug)
imported_household = ImportedHouseholdFactory(registration_data_import=rdi_datahub)
imported_individual = ImportedIndividualFactory(household=imported_household, relationship=HEAD)
imported_household.head_of_household = imported_individual
imported_household.detail_id = "HOPE-2022530111222"
imported_household.save()
ImportedIndividualRoleInHousehold.objects.create(
individual=imported_individual,
rdi = RegistrationDataImportFactory(business_area=self.business_area)
pending_household = PendingHouseholdFactory(registration_data_import=rdi)
pending_individual = PendingIndividualFactory(household=pending_household, relationship=HEAD)
pending_household.head_of_household = pending_individual
pending_household.detail_id = "HOPE-2022530111222"
pending_household.save()
PendingIndividualRoleInHousehold.objects.create(
individual=pending_individual,
role=ROLE_NO_ROLE,
household=imported_household,
household=pending_household,
)

registration_id = imported_household.detail_id
registration_id = pending_household.detail_id

response_ok = self.api_client.get(
f"/api/hh-status?registration_id={registration_id}&business_area_code={self.business_area.code}"
Expand All @@ -103,28 +98,26 @@ def test_getting_non_existent_individual(self) -> None:
self.assertEqual(response.json()["status"], "not found")

def test_getting_individual_with_status_imported(self) -> None:
imported_household = ImportedHouseholdFactory()
imported_individual = ImportedIndividualFactory(household=imported_household, relationship=HEAD)
imported_household.head_of_household = imported_individual
imported_household.save()
ImportedIndividualRoleInHousehold.objects.create(
individual=imported_individual,
pending_household = PendingHouseholdFactory()
pending_individual = PendingIndividualFactory(household=pending_household, relationship=HEAD)
pending_household.head_of_household = pending_individual
pending_household.save()
PendingIndividualRoleInHousehold.objects.create(
individual=pending_individual,
role=ROLE_NO_ROLE,
household=imported_household,
household=pending_household,
)

imported_document_type = ImportedDocumentTypeFactory(
key=IDENTIFICATION_TYPE_TO_KEY_MAPPING[IDENTIFICATION_TYPE_TAX_ID]
)
imported_document = ImportedDocumentFactory(individual=imported_individual, type=imported_document_type)
tax_id = imported_document.document_number
document_type = DocumentTypeFactory(key=IDENTIFICATION_TYPE_TO_KEY_MAPPING[IDENTIFICATION_TYPE_TAX_ID])
pending_document = PendingDocumentFactory(individual=pending_individual, type=document_type)
tax_id = pending_document.document_number

response = self.api_client.get(f"/api/hh-status?tax_id={tax_id}")
self.assertEqual(response.status_code, 200)
data = response.json()
info = data["info"]
self.assertEqual(info["status"], "imported")
self.assertEqual(info["date"], _time(imported_household.updated_at))
self.assertEqual(info["date"], _time(pending_household.updated_at))

individual = info["individual"]
self.assertIsNotNone(individual)
Expand Down Expand Up @@ -209,23 +202,23 @@ def test_getting_non_existent_household(self) -> None:
self.assertEqual(response.json()["status"], "not found")

def test_getting_household_with_status_imported(self) -> None:
imported_household = ImportedHouseholdFactory()
imported_individual = ImportedIndividualFactory(household=imported_household, relationship=HEAD)
imported_household.head_of_household = imported_individual
imported_household.detail_id = "HOPE-2022530111222"
imported_household.save()
ImportedIndividualRoleInHousehold.objects.create(
individual=imported_individual,
pending_household = PendingHouseholdFactory()
pending_individual = PendingIndividualFactory(household=pending_household, relationship=HEAD)
pending_household.head_of_household = pending_individual
pending_household.detail_id = "HOPE-2022530111222"
pending_household.save()
PendingIndividualRoleInHousehold.objects.create(
individual=pending_individual,
role=ROLE_NO_ROLE,
household=imported_household,
household=pending_household,
)

registration_id = imported_household.detail_id
registration_id = pending_household.detail_id

response = self.api_client.get(f"/api/hh-status?registration_id={registration_id}")
self.assertEqual(response.status_code, 200)
data = response.json()
info = data["info"]
self.assertEqual(info["status"], "imported")
self.assertEqual(info["date"], _time(imported_household.updated_at))
self.assertEqual(info["date"], _time(pending_household.updated_at))
self.assertTrue("individual" not in info)
44 changes: 24 additions & 20 deletions backend/hct_mis_api/apps/household/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Optional
import logging
from typing import Dict, Optional, Union

from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
Expand All @@ -12,19 +13,21 @@
IDENTIFICATION_TYPE_TAX_ID,
Document,
Household,
Individual,
PendingDocument,
PendingHousehold,
PendingIndividual,
)
from hct_mis_api.apps.household.serializers import (
serialize_by_household,
serialize_by_individual,
)
from hct_mis_api.apps.registration_datahub.models import (
ImportedDocument,
ImportedHousehold,
)
from hct_mis_api.apps.utils.profiling import profiling

logger = logging.getLogger(__name__)


def get_individual(tax_id: str, business_area_code: Optional[str]) -> Document:
def get_individual(tax_id: str, business_area_code: Optional[str]) -> Union[Individual, PendingIndividual]:
documents = (
Document.objects.all()
if not business_area_code
Expand All @@ -35,21 +38,21 @@ def get_individual(tax_id: str, business_area_code: Optional[str]) -> Document:
if documents.count() == 1:
return documents.first().individual

imported_documents = (
ImportedDocument.objects.all()
pending_documents = (
PendingDocument.objects.all()
if not business_area_code
else ImportedDocument.objects.filter(
else PendingDocument.objects.filter(
individual__household__registration_data_import__business_area__code=business_area_code
)
).filter(type__key=IDENTIFICATION_TYPE_TO_KEY_MAPPING[IDENTIFICATION_TYPE_TAX_ID], document_number=tax_id)
if imported_documents.count() > 1:
raise Exception(f"Multiple imported documents ({imported_documents.count()}) with given tax_id found")
if imported_documents.count() == 1:
return imported_documents.first().individual
if pending_documents.count() > 1:
raise Exception(f"Multiple imported documents ({pending_documents.count()}) with given tax_id found")
if pending_documents.count() == 1:
return pending_documents.first().individual
raise Exception("Document with given tax_id not found")


def get_household(registration_id: str, business_area_code: Optional[str]) -> ImportedHousehold:
def get_household(registration_id: str, business_area_code: Optional[str]) -> Union[PendingHousehold, Household]:
kobo_asset_value = _prepare_kobo_asset_id_value(registration_id)
households = (
Household.objects.all()
Expand All @@ -62,17 +65,17 @@ def get_household(registration_id: str, business_area_code: Optional[str]) -> Im
return households.first() # type: ignore

if business_area_code is None:
imported_households_by_business_area = ImportedHousehold.objects.all()
pending_households_by_business_area = PendingHousehold.objects.all()
else:
business_areas = BusinessArea.objects.filter(code=business_area_code)
if not business_areas:
raise Exception(f"Business area with code {business_area_code} not found")
business_area = business_areas.first() # code is unique, so no need to worry here
imported_households_by_business_area = ImportedHousehold.objects.filter(
registration_data_import__business_area_slug=business_area.slug
pending_households_by_business_area = PendingHousehold.objects.filter(
registration_data_import__business_area__slug=business_area.slug
)

imported_households = imported_households_by_business_area.filter(detail_id__endswith=kobo_asset_value)
imported_households = pending_households_by_business_area.filter(detail_id__endswith=kobo_asset_value)
if imported_households.count() > 1:
raise Exception(
f"Multiple imported households ({imported_households.count()}) with given registration_id found"
Expand Down Expand Up @@ -112,7 +115,8 @@ def get(self, request: Request) -> Response:

try:
data = get_household_or_individual(tax_id, registration_id, business_area_code)
except Exception as exception:
return Response({"status": "not found", "error_message": str(exception)}, status=404)
except Exception as e: # pragma: no cover
logger.exception(e)
return Response({"status": "not found", "error_message": "Household not Found"}, status=404)

johniak marked this conversation as resolved.
Show resolved Hide resolved
return Response(data, status=200)
Loading