Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[py-tx] Implement NCMEC Upvote/Downvote in reference API #1648

Merged
merged 10 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import urllib.error

import requests
from requests.packages.urllib3.util.retry import Retry
from urllib3.util.retry import Retry
from threatexchange.exchanges.clients.utils.common import TimeoutHTTPAdapter


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum, unique
import logging
Expand All @@ -19,7 +20,7 @@
import urllib.parse

import requests
from requests.packages.urllib3.util.retry import Retry
from urllib3.util.retry import Retry
from threatexchange.exchanges.clients.utils.common import TimeoutHTTPAdapter


Expand Down Expand Up @@ -113,24 +114,129 @@ def str(self, key: str) -> str:

@dataclass
class StatusResult:
"""
Represents a NCMEC member.

While it does correspond to the /status endpoint, we should probably
rename it at this point.
"""

esp_id: int
esp_name: str

@classmethod
def from_xml(cls, xml: _XMLWrapper) -> "StatusResult":
return cls(esp_id=xml.int("id"), esp_name=xml.text)


@unique
class NCMECEntryType(Enum):
"""Type of entry, as marked by xml"""

image = "image"
video = "video"


@unique
class FingerprintType(Enum):
"""
The list of supported fingerprints, as of 10/2024.

This also corresponds to the feedback types for the upvote/downvote API

We are currently not parsing these in the returned entry to prevent
compatibility issues if NCMEC were to add more fingerprint types, and
returning them as simple strings.
"""

md5 = "MD5"
sha1 = "SHA1"
pdna = "PDNA"
pdq = "PDQ"
netclean = "NETCLEAN"
videntifier = "VIDENTIFIER"
tmk_pdqf = "TMK_PDQF"
ssvh_pdna = "SSVH_PDNA"
ssvh_safer_hash = "SSVH_SAFER_HASH"


@dataclass
class Feedback:
"""Feedback on a single fingerprint in an entry"""

# True = upvote | False = downvote
sentiment: bool
# The member giving the feedback
member: StatusResult
# For upvotes, the reason text
reason: str = ""

@classmethod
def get_from_entry_feedback(
cls, entry: _XMLWrapper
) -> t.Dict[str, t.List["Feedback"]]:
feedbacks_xml = entry.maybe("feedback")
if not feedbacks_xml:
return {}
ret: t.Dict[str, t.List[Feedback]] = {}

for sentimentTag in feedbacks_xml:
feedbacks = ret.setdefault(sentimentTag.str("type"), [])
if sentimentTag.tag == "affirmativeFeedback":
# Iterate over members
for m in sentimentTag.maybe("members"):
feedbacks.append(cls(True, StatusResult.from_xml(m)))
elif sentimentTag.tag == "negativeFeedback":
# It's impossible to tell from the public documentation how
# to parse this correctly because it's ambigous how it's
# formatted.
reason_block = list(sentimentTag.maybe("reasons"))
for i in range(0, len(reason_block), 2):
if i + 1 >= len(reason_block):
logging.warning("[ncmec] reason block has odd number of blocks")
continue
reason = reason_block[i]
members = reason_block[i + 1]
if reason.tag != "reason" or members.tag != "members":
logging.warning(
"[ncmec] reason block malformed: reason:%s remembers:%s",
reason.tag,
members.tag,
)
continue

reason_name = reason.str("name")
for m in members:
feedbacks.append(
cls(False, StatusResult.from_xml(m), reason_name)
)
else:
logging.warning(
"[ncmec] Ignoring unknown sentiment '%s'", sentimentTag.tag
)
continue
return ret


@dataclass
class NCMECEntryUpdate:
# The entry id
id: str
# The esp_id for the uploader
member_id: int
# The entry or content type (e.g. image/video)
entry_type: NCMECEntryType
# Whether or not this is a tombstone for a deleted entry
deleted: bool
# The string classification of the entry
classification: t.Optional[str]
# The hashes/fingerprints for this entry, Dict[type, value]. This is
# roughly equivalent to SignalType.get_name(): signal_value, but NCMEC
# chooses different names for these
fingerprints: t.Dict[str, str]
# The feedback (upvote/downvote) that other ESPs have given for this entry
# Keyed the same way as fingerprints
feedback: t.Dict[str, t.List[Feedback]]

@classmethod
def from_xml(cls, xml: _XMLWrapper) -> "NCMECEntryUpdate":
Expand All @@ -148,6 +254,7 @@ def from_xml(cls, xml: _XMLWrapper) -> "NCMECEntryUpdate":
fingerprints={
x.tag: x.text for x in xml.maybe("fingerprints") if x.has_text
},
feedback=Feedback.get_from_entry_feedback(xml),
)


Expand Down Expand Up @@ -215,11 +322,31 @@ def estimated_entries_in_range(self) -> int:
)


# TODO: once we know the shape of response, finish this class
@dataclass
class UpdateEntryResponse:
updates: t.List[NCMECEntryUpdate]

@classmethod
def from_xml(
cls, xml: _XMLWrapper, fallback_max_time: int
) -> "UpdateEntryResponse":
updates: t.List[NCMECEntryUpdate] = []

for content_xml in (xml.maybe("images"), xml.maybe("videos")):
if not content_xml or not len(content_xml):
continue
updates.extend(NCMECEntryUpdate.from_xml(c) for c in content_xml)

return cls(updates)


@unique
class NCMECEndpoint(Enum):
status = "status"
entries = "entries"
members = "members"
feedback = "feedback"


class NCMECEnvironment(Enum):
Expand Down Expand Up @@ -266,10 +393,13 @@ def __init__(
self.username = username
self.password = password
self._base_url = environment.value
self._my_esp: t.Optional[StatusResult] = None
# type -> name -> guid
self._feedback_reason_map: t.Dict[FingerprintType, t.Dict[str, str]] = {}

def _get_session(self) -> requests.Session:
"""
Custom requests sesson
Custom requests session

Ideally, should be used within a context manager:
```
Expand All @@ -295,14 +425,18 @@ def _get_session(self) -> requests.Session:
)
return session

def _get(self, endpoint: NCMECEndpoint, *, next_: str = "", **params) -> ET.Element:
def _get(
self, endpoint: NCMECEndpoint, *, path: str = "", next_: str = "", **params
) -> ET.Element:
"""
Perform an HTTP GET request, and return the XML response payload.

Same timeouts and retry strategy as `_get_session` above.
"""

url = "/".join((self._base_url, self.VERSION, endpoint.value))
if path:
url = "/".join((url, path))
if next_:
url = self._base_url + next_
params = {}
Expand All @@ -328,25 +462,71 @@ def _post(self, endpoint: NCMECEndpoint, *, data=None) -> t.Any:
No timeout or retry strategy.
"""

url = "/".join((self._base_url, endpoint.value))
url = "/".join((self._base_url, self.VERSION, endpoint.value))
with self._get_session() as session:
response = session.post(url, data=data)
response.raise_for_status()
return response

def _put(
self,
endpoint: NCMECEndpoint,
*,
member_id: t.Optional[int] = None,
entry_id: t.Optional[str] = None,
feedback_type: t.Optional[FingerprintType] = None,
data=None,
) -> t.Any:
"""
Perform an HTTP PUT request, and return the XML response payload.

No timeout or retry strategy.
"""

url = "/".join((self._base_url, self.VERSION, endpoint.value))
if feedback_type and member_id and entry_id:
url = "/".join(
(
self._base_url,
endpoint.value,
str(member_id),
entry_id,
feedback_type.value,
NCMECEndpoint.feedback.value,
)
)
with self._get_session() as session:
response = session.put(url, data=data)
response.raise_for_status()
return response

def status(self) -> StatusResult:
"""Query the status endpoint, which tells you who you are."""
response = self._get(NCMECEndpoint.status)
member = _XMLWrapper(response)["member"]
return StatusResult(member.int("id"), member.text)
ret = StatusResult.from_xml(_XMLWrapper(response)["member"])
self._my_esp = deepcopy(ret)
return ret

def members(self) -> t.List[StatusResult]:
"""Query the members endpoint, which gives you a list of esps"""
response = self._get(NCMECEndpoint.members)
return [
StatusResult(member.int("id"), member.text)
for member in _XMLWrapper(response)
]
return [StatusResult.from_xml(member) for member in _XMLWrapper(response)]

def feedback_reasons(self, fingerprint_type: FingerprintType) -> t.Dict[str, str]:
"""
Get the possible negative feedback reasons for this type

According to NCMEC documentation, the GUIDs
"""
xml = _XMLWrapper(
self._get(NCMECEndpoint.feedback, path=f"{fingerprint_type.value}/reasons")
)
ret = {
reason.str("guid"): reason.str("name")
for reason in xml["availableFeedbackReasons"]
}
self._feedback_reason_map[fingerprint_type] = deepcopy(ret)
return ret

def get_entries(
self,
Expand Down Expand Up @@ -401,6 +581,54 @@ def get_entries_iter(
has_more = bool(next_)
yield result

def submit_feedback(
self,
entry_id: str,
fingerprint_type: FingerprintType,
affirmative: bool,
negative_reason_guid: t.Optional[str] = None,
) -> None:

# need member_id to submit feedback
my_esp = self._my_esp
if my_esp is None:
my_esp = self.status()

# Prepare the XML payload
root = ET.Element("feedbackSubmission")
root.set("xmlns", "https://hashsharing.ncmec.org/hashsharing/v2")
vote = ET.SubElement(root, "affirmative" if affirmative else "negative")

if not affirmative:
if not negative_reason_guid:
# We need a reason ID, but there may be only one choice
# so we can just use that one
if fingerprint_type not in self._feedback_reason_map:
self.feedback_reasons(fingerprint_type)
feedback_options = self._feedback_reason_map[fingerprint_type]
if not feedback_options:
raise Exception(
"No feedback options for this type? Try reaching out to NCMEC"
)
if len(feedback_options) == 1:
# Only one choice
negative_reason_guid = next(iter(feedback_options.keys()))
if not negative_reason_guid:
raise Exception(
f"Need to pick a feedback reason. Options: {feedback_options}"
)
reasons = ET.SubElement(vote, "reasonIds")
guid = ET.SubElement(reasons, "guid")
guid.text = negative_reason_guid

self._put(
NCMECEndpoint.entries,
member_id=my_esp.esp_id,
entry_id=entry_id,
feedback_type=fingerprint_type,
data=ET.tostring(root),
)


def _date_format(timestamp: int) -> str:
"""ISO 8601 format yyyy-MM-dd'T'HH:mm:ss.SSSZ"""
Expand Down
Loading