Skip to content

Commit

Permalink
[Back merge] prod>STG (#4294)
Browse files Browse the repository at this point in the history
* [hotfix] 216567 deduplication key error when deduplicating records (#4271)

* 2216567_Deduplication_Key_Error_when_deduplicating_records

* black

* remove distinct

* fix dedup logic, add ut

* lint

---------

Co-authored-by: marekbiczysko <[email protected]>

* skip trivy failure

* fix document status when copy population (#4291)

---------

Co-authored-by: Marek Biczysko <[email protected]>
Co-authored-by: marekbiczysko <[email protected]>
Co-authored-by: Bartosz Woźniak <[email protected]>
  • Loading branch information
4 people authored Oct 7, 2024
1 parent 9a0f30a commit d684e27
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 51 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Run Trivy vulnerability scanner
continue-on-error: true # due to getting TOOMANYREQUESTS
uses: aquasecurity/trivy-action@master
with:
image-ref: '${{ vars.DOCKERHUB_ORGANIZATION }}/hope-support-images:core-${{ github.sha }}'
Expand Down
1 change: 1 addition & 0 deletions src/hct_mis_api/apps/program/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def copy_document_per_individual(
document.individual = individual_representation
document.program_id = individual_representation.program_id
document.rdi_merge_status = rdi_merge_status
document.status = Document.STATUS_PENDING
documents_list.append(document)
return documents_list

Expand Down
92 changes: 51 additions & 41 deletions src/hct_mis_api/apps/registration_datahub/tasks/deduplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,20 +674,26 @@ def deduplicate(
new_documents.filter(Q(status=Document.STATUS_PENDING) & Q(type__is_identity_document=True) & program_q)
.select_related("individual", "type")
.select_for_update(of=("self",)) # no need to lock individuals
.order_by("pk")
.order_by("document_number", "created_at")
)
documents_numbers = [x.document_number for x in documents_to_dedup]
new_document_signatures = [self._generate_signature(d) for d in documents_to_dedup]
new_document_signatures_duplicated_in_batch = [
d for d in new_document_signatures if new_document_signatures.count(d) > 1
]

documents_numbers = []
new_document_signatures = []

# use this dict for skip ticket creation for the same Individual with the same doc number
ind_and_new_document_signatures_duplicated_in_batch_dict = defaultdict(list)
new_document_signatures_in_batch_per_individual_dict = defaultdict(list)
for d in documents_to_dedup:
ind_and_new_document_signatures_duplicated_in_batch_dict[str(d.individual_id)].append(
self._generate_signature(d)
new_document_signature = self._generate_signature(d)
documents_numbers.append(d.document_number)
new_document_signatures.append(new_document_signature)
new_document_signatures_in_batch_per_individual_dict[str(d.individual_id)].append(
new_document_signature
)

new_document_signatures_duplicated_in_batch = [
d for d in new_document_signatures if new_document_signatures.count(d) > 1
]

# added order_by because test was failed randomly
all_matching_number_documents = (
Document.objects.select_related("individual", "individual__household", "individual__business_area")
Expand All @@ -708,58 +714,62 @@ def deduplicate(
)
)
.order_by("individual_id")
.distinct()
)
all_matching_number_documents_dict = {d.signature: d for d in all_matching_number_documents}
all_matching_number_documents_signatures = all_matching_number_documents_dict.keys()
already_processed_signatures = []

already_processed_signatures = set()
ticket_data_dict = {}
possible_duplicates_individuals_id_set = set()

for new_document in documents_to_dedup:
new_document_signature = self._generate_signature(new_document)
# use this dict for skip ticket creation for the same Individual with the same doc number
is_duplicated_document_number_for_individual: bool = (
ind_and_new_document_signatures_duplicated_in_batch_dict.get(
str(new_document.individual_id), []
).count(new_document_signature)
> 1
)

individual_document_duplicates_count = new_document_signatures_in_batch_per_individual_dict.get(
str(new_document.individual_id), []
).count(new_document_signature)

is_duplicated_document_number_for_individual: bool = individual_document_duplicates_count > 1

if new_document_signature in all_matching_number_documents_signatures:
new_document.status = Document.STATUS_NEED_INVESTIGATION
ticket_data = ticket_data_dict.get(
new_document_signature,
{
"original": all_matching_number_documents_dict[new_document_signature],
"possible_duplicates": [],
"possible_duplicates": set(),
},
)
ticket_data["possible_duplicates"].append(new_document)
ticket_data["possible_duplicates"].add(new_document)
ticket_data_dict[new_document_signature] = ticket_data
possible_duplicates_individuals_id_set.add(str(new_document.individual_id))
continue

if (
new_document_signature in new_document_signatures_duplicated_in_batch
and new_document_signature in already_processed_signatures
and not is_duplicated_document_number_for_individual
):
new_document.status = Document.STATUS_NEED_INVESTIGATION
ticket_data_dict[new_document_signature]["possible_duplicates"].append(new_document)
possible_duplicates_individuals_id_set.add(str(new_document.individual_id))
continue

new_document.status = Document.STATUS_VALID
already_processed_signatures.append(new_document_signature)
elif new_document_signatures_duplicated_in_batch.count(new_document_signature) > 1:
if is_duplicated_document_number_for_individual:
# do not create ticket for the same Individual with the same doc number
new_document.status = Document.STATUS_VALID
new_document_signatures_in_batch_per_individual_dict[str(new_document.individual_id)].remove(
new_document_signature
)
new_document_signatures_duplicated_in_batch.remove(new_document_signature)
elif new_document_signature not in already_processed_signatures:
# first occurrence of new document is considered valid, duplicated are added in next iterations
new_document.status = Document.STATUS_VALID
ticket_data_dict[new_document_signature] = {
"original": new_document,
"possible_duplicates": set(),
}
already_processed_signatures.add(new_document_signature)
else:
new_document.status = Document.STATUS_NEED_INVESTIGATION
ticket_data_dict[new_document_signature]["possible_duplicates"].add(new_document)
possible_duplicates_individuals_id_set.add(str(new_document.individual_id))

if (
new_document_signature in new_document_signatures_duplicated_in_batch
and not is_duplicated_document_number_for_individual
):
ticket_data_dict[new_document_signature] = {
"original": new_document,
"possible_duplicates": [],
}
else:
# not a duplicate
new_document.status = Document.STATUS_VALID
already_processed_signatures.add(new_document_signature)

try:
Document.objects.bulk_update(documents_to_dedup, ("status", "updated_at"))
Expand All @@ -768,7 +778,7 @@ def deduplicate(
f"Hard Deduplication Documents bulk update error."
f"All matching documents in DB: {all_matching_number_documents_signatures}"
f"New documents to dedup: {new_document_signatures}"
f"new_document_signatures_duplicated_in_batch: {new_document_signatures_duplicated_in_batch}"
f"new_document_signatures_duplicated_in_batch: {set(new_document_signatures_duplicated_in_batch)}"
)
raise

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

from hct_mis_api.apps.core.models import BusinessArea
from hct_mis_api.apps.geo import models as geo_models
from hct_mis_api.apps.grievance.models import GrievanceTicket
from hct_mis_api.apps.grievance.models import (
GrievanceTicket,
TicketNeedsAdjudicationDetails,
)
from hct_mis_api.apps.household.fixtures import (
DocumentTypeFactory,
create_household_and_individuals,
Expand Down Expand Up @@ -437,23 +440,23 @@ def test_ticket_creation_for_the_same_ind_doc_numbers(self) -> None:
passport = Document.objects.create(
country=self.country, # the same country
type=self.dt,
document_number="123444444", # the same doc number
document_number="111", # the same doc number
individual=self.individuals[2], # the same Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
tax_id = Document.objects.create(
country=self.country, # the same country
type=self.dt_tax_id,
document_number="123444444", # the same doc number
document_number="111", # the same doc number
individual=self.individuals[2], # the same Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d1 = Document.objects.create(
country=self.country,
type=self.dt,
document_number="123321321",
document_number="222",
individual=self.individuals[2],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
Expand All @@ -462,7 +465,7 @@ def test_ticket_creation_for_the_same_ind_doc_numbers(self) -> None:
Document.objects.create(
country=self.country,
type=self.dt,
document_number="123321321",
document_number="222",
individual=self.individuals[1],
program=self.program,
status=Document.STATUS_VALID,
Expand All @@ -471,39 +474,39 @@ def test_ticket_creation_for_the_same_ind_doc_numbers(self) -> None:
d2 = Document.objects.create(
country=self.country,
type=self.dt,
document_number="222",
document_number="333",
individual=self.individuals[3],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d3 = Document.objects.create(
country=self.country,
type=self.dt_tax_id,
document_number="222",
document_number="333",
individual=self.individuals[4],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d4 = Document.objects.create(
country=self.country,
type=self.dt,
document_number="111",
document_number="444",
individual=self.individuals[0],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d5 = Document.objects.create(
country=self.country,
type=self.dt_tax_id,
document_number="111",
document_number="444",
individual=self.individuals[1],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d6 = Document.objects.create(
country=self.country,
type=DocumentTypeFactory(label="other_type", key="other_type"),
document_number="111",
document_number="444",
individual=self.individuals[2],
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
Expand All @@ -522,3 +525,55 @@ def test_ticket_creation_for_the_same_ind_doc_numbers(self) -> None:

tax_id.refresh_from_db()
self.assertEqual(tax_id.status, Document.STATUS_VALID)

def test_ticket_creation_for_the_same_ind_and_across_other_inds_doc_numbers(self) -> None:
passport = Document.objects.create(
country=self.country, # the same country
type=self.dt,
document_number="111", # the same doc number
individual=self.individuals[2], # the same Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
tax_id = Document.objects.create(
country=self.country, # the same country
type=self.dt_tax_id,
document_number="111", # the same doc number
individual=self.individuals[2], # the same Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d1 = Document.objects.create(
country=self.country,
type=self.dt,
document_number="222", # different doc number
individual=self.individuals[2], # different Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
d2 = Document.objects.create(
country=self.country,
type=self.dt,
document_number="111", # the same doc number
individual=self.individuals[3], # different Individual
program=self.program,
rdi_merge_status=MergeStatusModel.MERGED,
)
self.assertEqual(GrievanceTicket.objects.all().count(), 0)
HardDocumentDeduplication().deduplicate(
self.get_documents_query([passport, tax_id, d1, d2]),
self.registration_data_import,
)

self.assertEqual(GrievanceTicket.objects.all().count(), 1)
ticket_details = TicketNeedsAdjudicationDetails.objects.first()
self.assertIsNotNone(ticket_details.golden_records_individual)
self.assertEqual(ticket_details.possible_duplicates.all().count(), 1)
self.assertNotEqual(ticket_details.golden_records_individual, ticket_details.possible_duplicates.first())

passport.refresh_from_db()
self.assertEqual(passport.status, Document.STATUS_VALID)
tax_id.refresh_from_db()
self.assertEqual(tax_id.status, Document.STATUS_VALID)
d2.refresh_from_db()
self.assertEqual(d2.status, Document.STATUS_NEED_INVESTIGATION)
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ def test_create_pending_objects_from_objects(self) -> None:
pending_document.individual,
head_of_household_pending_individual,
)
self.assertEqual(
pending_document.status,
Document.STATUS_PENDING,
)

pending_identity = IndividualIdentity.pending_objects.first()
self.assertEqual(
Expand Down

0 comments on commit d684e27

Please sign in to comment.