From 469f0cc834eec89a996352d223823f5f324971b1 Mon Sep 17 00:00:00 2001 From: Yuh Shin Ong Date: Fri, 15 Dec 2023 13:06:46 -0800 Subject: [PATCH] Parser: Move MT objects into its own file/module Summary: The objects are growing in mariana_trench_parser. These objects (e.g. `Method`, `CallInfo`, etc.) follow the MT json schema rather closely (not exactly, but close enough). Moving them into their own mariana_trench_parser_objects.py module. They will be referred to as `mariana_trench.` in the parser. Reviewed By: anwesht Differential Revision: D52179907 fbshipit-source-id: ae0c9d80caefc048536c6be34ad78dd0538265e1 --- sapp/pipeline/mariana_trench_parser.py | 438 +++--------------- .../pipeline/mariana_trench_parser_objects.py | 354 ++++++++++++++ 2 files changed, 407 insertions(+), 385 deletions(-) create mode 100644 sapp/pipeline/mariana_trench_parser_objects.py diff --git a/sapp/pipeline/mariana_trench_parser.py b/sapp/pipeline/mariana_trench_parser.py index 9450ee55..91b07ba4 100644 --- a/sapp/pipeline/mariana_trench_parser.py +++ b/sapp/pipeline/mariana_trench_parser.py @@ -5,10 +5,8 @@ import json import logging -import re import sys -from collections import defaultdict from typing import ( Any, Dict, @@ -26,6 +24,7 @@ from .. import pipeline as sapp from ..analysis_output import AnalysisOutput, Metadata +from . import mariana_trench_parser_objects as mariana_trench from .base_parser import BaseParser if sys.version_info >= (3, 8): @@ -35,328 +34,6 @@ log: logging.Logger = logging.getLogger() -UNKNOWN_PATH: str = "unknown" -UNKNOWN_LINE: int = -1 - - -class Method(NamedTuple): - name: str - - @staticmethod - def from_json(method: Union[str, Dict[str, Any]]) -> "Method": - if isinstance(method, str): - return Method(method) - - canonical_name = method["name"] - - parameter_type_overrides = method.get("parameter_type_overrides") - if parameter_type_overrides: - parameter_type_overrides = ( - f"{override['parameter']}: {override['type']}" - for override in parameter_type_overrides - ) - canonical_name += "[%s]" % ", ".join(parameter_type_overrides) - - return Method(canonical_name) - - -class Port(NamedTuple): - value: str - - def is_leaf(self) -> bool: - return ( - self.value in ("source", "sink") - or self.value.startswith("anchor:") - or self.value.startswith("producer:") - ) - - @staticmethod - def to_crtex(port: str) -> str: - """Converts 'argument(n)' to 'formal(n)'. Other CRTEX tools use 'formal' - to denote argument positions.""" - return re.sub(r"argument\((-?\d+)\)", r"formal(\1)", port) - - @staticmethod - def from_json(port: str, leaf_kind: str) -> "Port": - elements = port.split(".") - - if len(elements) == 0: - raise sapp.ParseError(f"Invalid port: `{port}`.") - - elements[0] = elements[0].lower() - if elements[0] == "leaf": - elements[0] = leaf_kind - elif elements[0] == "return": - elements[0] = "result" - elif elements[0] == "anchor": - # Anchor port is of the form Anchor. - # SAPP/CRTEX expects: "anchor:formal(0)" - canonical_port = Port.from_json( - ".".join(elements[1:]), "unreachable_leaf_kind_anchor" - ) - return Port(f"{elements[0]}:{Port.to_crtex(canonical_port.value)}") - elif elements[0] == "producer" and len(elements) >= 3: - # Producer port is of the form Producer... - # SAPP/CRTEX expects: "producer::". - root = elements[0] - producer_id = elements[1] - canonical_port = Port.from_json( - ".".join(elements[2:]), "unreachable_leaf_kind_producer" - ) - return Port(f"{root}:{producer_id}:{Port.to_crtex(canonical_port.value)}") - - return Port(".".join(elements)) - - -class Position(NamedTuple): - path: str - line: int - start: int - end: int - - @staticmethod - def default() -> "Position": - return Position(UNKNOWN_PATH, UNKNOWN_LINE, 0, 0) - - @staticmethod - def from_json(position: Dict[str, Any], method: Optional[Method]) -> "Position": - path = position.get("path", UNKNOWN_PATH) - line = position.get("line", UNKNOWN_LINE) - start = position.get("start", 0) + 1 - end = max(position.get("end", 0) + 1, start) - if path == UNKNOWN_PATH and method: - path = method.name.split(";")[0] - path = path.split("$")[0] - path = path[1:] - return Position(path, line, start, end) - - def to_sapp(self) -> sapp.SourceLocation: - return sapp.SourceLocation( - line_no=self.line, - begin_column=self.start, - end_column=self.end, - ) - - -class Origin(NamedTuple): - callee_name: Method - callee_port: Port - - @staticmethod - def from_json(leaf_json: Dict[str, Any], leaf_kind: str) -> "Origin": - """ - Depending on the origin kind, the json keys will vary: - - Method origin (most common): { "method" : ... , "port" : ... } - Field origin: { "field" : ... } - No port for field origins. Always assumed to be "Leaf". - Crtex origin : { "canonical_name" : ... , "port" : ... } - """ - callee = leaf_json.get( - "method", leaf_json.get("field", leaf_json.get("canonical_name")) - ) - if not callee: - raise sapp.ParseError(f"No callee found in origin {leaf_json}.") - callee_name = Method.from_json(callee) - - # The origin represents a call to a leaf/terminal trace. Its port should - # indicate that, so that downstream trace reachability computation knows - # when it has reached the end. See trace_graph.is_leaf_port(). Non-CRTEX - # ports should always be regardless of the JSON (e.g. method - # origins could indicate that the sink comes from "argument(1)"", but it - # needs to be "sink" in sapp). - callee_port = Port.from_json("leaf", leaf_kind) - if "canonical_name" in leaf_json: - # All CRTEX ports are considered leaf ports. - callee_port = Port.from_json(leaf_json["port"], leaf_kind) - - if not callee_port.is_leaf(): - raise sapp.ParseError(f"Encountered non-leaf port in origin {leaf_json}") - - return Origin(callee_name, callee_port) - - -class CallInfo(NamedTuple): - """Mirrors the CallInfo object in the analysis""" - - call_kind: str - method: Optional[Method] - port: Port - position: Position - - @staticmethod - def from_json( - taint_json: Dict[str, Any], leaf_kind: str, caller_position: Position - ) -> "CallInfo": - call_kind = taint_json["call_kind"] - - callee = taint_json.get("resolves_to") - method = Method.from_json(callee) if callee else None - port = Port.from_json(taint_json.get("port", "leaf"), leaf_kind) - - position_json = taint_json.get("position") - position = ( - caller_position - if not position_json - else Position.from_json(position_json, method) - ) - return CallInfo(call_kind, method, port, position) - - def is_declaration(self) -> bool: - """Can can be a declaration for a source/sink (call_kind == Declaration) - or a propagation (call_kind == PropagationWithTrace:Declaration)""" - return "Declaration" in self.call_kind - - def is_origin(self) -> bool: - return "Origin" in self.call_kind - - def is_propagation_without_trace(self) -> bool: - return "Propagation" == self.call_kind - - -class LocalPositions(NamedTuple): - positions: List[Position] - - @staticmethod - def from_json(positions: List[Dict[str, Any]], method: Method) -> "LocalPositions": - return LocalPositions( - [Position.from_json(position, method) for position in positions] - ) - - @staticmethod - def from_taint_json( - taint: Dict[str, Any], caller_method: Method - ) -> "LocalPositions": - """The `taint` json should be of the following form: - { - "call": {...}, --> Optional field in `taint` - "kinds": [ - { "kind": "Source", "local_positions": [ { } ] }, - ... - ] - } - """ - return LocalPositions.from_json( - taint.get("local_positions", []), - caller_method, - ) - - def to_sapp(self) -> List[sapp.SourceLocation]: - return [position.to_sapp() for position in sorted(self.positions)] - - -class Features(NamedTuple): - features: Set[str] - - @staticmethod - def from_json(features: Dict[str, Any]) -> "Features": - may_features = set(features.get("may_features", [])) - always_features = { - f"always-{feature}" for feature in features.get("always_features", []) - } - return Features(may_features | always_features) - - @staticmethod - def from_taint_json(taint: Dict[str, Any]) -> "Features": - """Similar to `LocalPositions.from_taint_json`.""" - # User-declared features are stored in "local_user_features" and should - # be reported as local features in order to show up in the trace frame - # on the UI. - user_features = Features.from_json(taint.get("local_user_features", {})) - local_features = Features.from_json(taint.get("local_features", {})) - return Features(user_features.features | local_features.features) - - def to_sapp(self) -> List[str]: - return sorted(self.features) - - def to_sapp_as_parsetracefeature(self) -> List[sapp.ParseTraceFeature]: - return [ - sapp.ParseTraceFeature(feature, []) for feature in sorted(self.features) - ] - - -class ExtraTrace(NamedTuple): - kind: str - callee: CallInfo - - @staticmethod - def from_json( - extra_trace: Dict[str, Any], caller_position: Position - ) -> "ExtraTrace": - return ExtraTrace( - kind=extra_trace["kind"], - callee=CallInfo.from_json( - extra_trace["call_info"], "sink", caller_position - ), - ) - - def to_sapp(self) -> sapp.ParseTraceAnnotation: - subtraces = ( - [ - sapp.ParseTraceAnnotationSubtrace( - callee=self.callee.method.name, - port=self.callee.port.value, - position=self.callee.position.to_sapp(), - ) - ] - if self.callee.method - else [] - ) - - return sapp.ParseTraceAnnotation( - location=self.callee.position.to_sapp(), - kind="tito_transform", - msg=f"Propagation through {self.kind}", - leaf_kind=self.kind, - leaf_depth=0, - type_interval=None, - link=None, - trace_key=None, - titos=[], - subtraces=subtraces, - ) - - -class Kind(NamedTuple): - name: str - distance: int - origins: List[Origin] - extra_traces: List[ExtraTrace] - callee_interval: Optional[Tuple[int, int]] - preserves_type_context: bool - - @staticmethod - def from_json( - kind: Dict[str, Any], leaf_kind: str, caller_position: Position - ) -> "Kind": - origins = [] - for origin in kind.get("origins", []): - origins.append(Origin.from_json(origin, leaf_kind)) - extra_traces = [] - for extra_trace in kind.get("extra_traces", []): - extra_traces.append(ExtraTrace.from_json(extra_trace, caller_position)) - interval = kind.get("callee_interval") - return Kind( - name=kind["kind"], - distance=kind.get("distance", 0), - origins=origins, - extra_traces=extra_traces, - callee_interval=(interval[0], interval[1]) if interval else None, - preserves_type_context=kind.get("preserves_type_context", False), - ) - - @staticmethod - def partition_by_interval( - kinds: List["Kind"], - ) -> Dict[Optional["ConditionTypeInterval"], List["Kind"]]: - kinds_by_interval = defaultdict(list) - for kind in kinds: - if kind.callee_interval is None: - kinds_by_interval[None].append(kind) - else: - interval = ConditionTypeInterval.from_kind(kind) - kinds_by_interval[interval].append(kind) - return kinds_by_interval class ConditionLeaf(NamedTuple): @@ -364,7 +41,7 @@ class ConditionLeaf(NamedTuple): distance: int @staticmethod - def from_kind(kind: Kind) -> "ConditionLeaf": + def from_kind(kind: mariana_trench.Kind) -> "ConditionLeaf": return ConditionLeaf(kind=kind.name, distance=kind.distance) def to_sapp(self) -> Tuple[str, int]: @@ -374,12 +51,12 @@ def to_sapp(self) -> Tuple[str, int]: class ConditionCall(NamedTuple): """Represents a caller/callee in a [pre|post]Condition""" - method: Method - port: Port - position: Position + method: mariana_trench.Method + port: mariana_trench.Port + position: mariana_trench.Position @staticmethod - def from_call_info(call_info: CallInfo) -> "ConditionCall": + def from_call_info(call_info: mariana_trench.CallInfo) -> "ConditionCall": if call_info.method is None: raise sapp.ParseError( f"Cannot construct a ConditionCall without a valid method {call_info}" @@ -387,7 +64,9 @@ def from_call_info(call_info: CallInfo) -> "ConditionCall": return ConditionCall(call_info.method, call_info.port, call_info.position) @staticmethod - def from_origin(origin: Origin, call_info: CallInfo) -> "ConditionCall": + def from_origin( + origin: mariana_trench.Origin, call_info: mariana_trench.CallInfo + ) -> "ConditionCall": return ConditionCall( method=origin.callee_name, port=origin.callee_port, @@ -395,37 +74,14 @@ def from_origin(origin: Origin, call_info: CallInfo) -> "ConditionCall": ) -class ConditionTypeInterval(NamedTuple): - start: int - finish: int - preserves_type_context: bool - - @staticmethod - def from_kind(kind: Kind) -> "ConditionTypeInterval": - if kind.callee_interval is None: - raise sapp.ParseError(f"Callee interval expected in {kind}") - return ConditionTypeInterval( - start=kind.callee_interval[0], - finish=kind.callee_interval[1], - preserves_type_context=kind.preserves_type_context, - ) - - def to_sapp(self) -> sapp.ParseTypeInterval: - return sapp.ParseTypeInterval( - start=self.start, - finish=self.finish, - preserves_type_context=self.preserves_type_context, - ) - - class Condition(NamedTuple): caller: ConditionCall callee: ConditionCall leaves: List[ConditionLeaf] - local_positions: LocalPositions - features: Features - extra_traces: Set[ExtraTrace] - type_interval: Optional[ConditionTypeInterval] + local_positions: mariana_trench.LocalPositions + features: mariana_trench.Features + extra_traces: Set[mariana_trench.ExtraTrace] + type_interval: Optional[mariana_trench.TypeInterval] def convert_to_sapp( self, kind: Literal[sapp.ParseType.PRECONDITION, sapp.ParseType.POSTCONDITION] @@ -469,10 +125,10 @@ def to_sapp(self) -> sapp.ParseConditionTuple: class IssueCondition(NamedTuple): callee: ConditionCall leaves: List[ConditionLeaf] - local_positions: LocalPositions - features: Features - extra_traces: Set[ExtraTrace] - type_interval: Optional[ConditionTypeInterval] + local_positions: mariana_trench.LocalPositions + features: mariana_trench.Features + extra_traces: Set[mariana_trench.ExtraTrace] + type_interval: Optional[mariana_trench.TypeInterval] def to_sapp(self) -> sapp.ParseIssueConditionTuple: return sapp.ParseIssueConditionTuple( @@ -490,7 +146,7 @@ def to_sapp(self) -> sapp.ParseIssueConditionTuple: class Leaf(NamedTuple): - method: Method + method: mariana_trench.Method kind: str distance: int @@ -501,16 +157,16 @@ def to_sapp(self) -> sapp.ParseIssueLeaf: class Issue(NamedTuple): code: int message: str - callable: Method + callable: mariana_trench.Method callee_signature: str sink_index: int - callable_position: Position - issue_position: Position + callable_position: mariana_trench.Position + issue_position: mariana_trench.Position preconditions: List[IssueCondition] postconditions: List[IssueCondition] initial_sources: Set[Leaf] final_sinks: Set[Leaf] - features: Features + features: mariana_trench.Features def to_sapp(self, parser: "Parser") -> sapp.ParseIssueTuple: return sapp.ParseIssueTuple( @@ -655,10 +311,14 @@ def _parse_issues(self, model: Dict[str, Any]) -> Iterable[sapp.ParseIssueTuple] for issue in model.get("issues", []): code = issue["rule"] rule = self._rules[code] - callable = Method.from_json(model["method"]) - callable_position = Position.from_json(model["position"], callable) - issue_position = Position.from_json(issue["position"], callable) - features = Features.from_json(issue) + callable = mariana_trench.Method.from_json(model["method"]) + callable_position = mariana_trench.Position.from_json( + model["position"], callable + ) + issue_position = mariana_trench.Position.from_json( + issue["position"], callable + ) + features = mariana_trench.Features.from_json(issue) (preconditions, final_sinks) = self._parse_issue_conditions( issue, callable, callable_position, "sink" @@ -685,8 +345,8 @@ def _parse_issues(self, model: Dict[str, Any]) -> Iterable[sapp.ParseIssueTuple] def _parse_issue_conditions( self, issue: Dict[str, Any], - callable: Method, - callable_position: Position, + callable: mariana_trench.Method, + callable_position: mariana_trench.Position, leaf_kind: str, ) -> Tuple[List[IssueCondition], Set[Leaf]]: condition_taints = issue[f"{leaf_kind}s"] @@ -695,15 +355,19 @@ def _parse_issue_conditions( issue_leaves = set() for condition_taint in condition_taints: - local_positions = LocalPositions.from_taint_json(condition_taint, callable) - features = Features.from_taint_json(condition_taint) - call_info = CallInfo.from_json( + local_positions = mariana_trench.LocalPositions.from_taint_json( + condition_taint, callable + ) + features = mariana_trench.Features.from_taint_json(condition_taint) + call_info = mariana_trench.CallInfo.from_json( condition_taint["call_info"], leaf_kind, callable_position ) - kinds_by_interval = Kind.partition_by_interval( + kinds_by_interval = mariana_trench.Kind.partition_by_interval( [ - Kind.from_json(kind_json, leaf_kind, callable_position) + mariana_trench.Kind.from_json( + kind_json, leaf_kind, callable_position + ) for kind_json in condition_taint["kinds"] ] ) @@ -812,18 +476,20 @@ def _parse_condition( leaf_kind: str, condition_class: Type[ConditionType], ) -> Iterable[ConditionType]: - caller_method = Method.from_json(model["method"]) - caller_position = Position.from_json(model["position"], caller_method) + caller_method = mariana_trench.Method.from_json(model["method"]) + caller_position = mariana_trench.Position.from_json( + model["position"], caller_method + ) for leaf_model in model.get(condition_model_key, []): caller = ConditionCall( method=caller_method, - port=Port.from_json(leaf_model[port_key], leaf_kind), + port=mariana_trench.Port.from_json(leaf_model[port_key], leaf_kind), position=caller_position, ) for leaf_taint in leaf_model[leaf_model_key]: call_info_json = leaf_taint["call_info"] - call_info = CallInfo.from_json( + call_info = mariana_trench.CallInfo.from_json( call_info_json, leaf_kind, caller_position ) if ( @@ -834,15 +500,17 @@ def _parse_condition( # Propagations (without traces) can also be ignored. continue - local_positions = LocalPositions.from_taint_json( + local_positions = mariana_trench.LocalPositions.from_taint_json( leaf_taint, caller_method ) - local_features = Features.from_taint_json(leaf_taint) + local_features = mariana_trench.Features.from_taint_json(leaf_taint) kinds_json = leaf_taint["kinds"] - kinds_by_interval = Kind.partition_by_interval( + kinds_by_interval = mariana_trench.Kind.partition_by_interval( [ - Kind.from_json(kind_json, leaf_kind, caller_position) + mariana_trench.Kind.from_json( + kind_json, leaf_kind, caller_position + ) for kind_json in kinds_json ] ) diff --git a/sapp/pipeline/mariana_trench_parser_objects.py b/sapp/pipeline/mariana_trench_parser_objects.py new file mode 100644 index 00000000..9b3683ce --- /dev/null +++ b/sapp/pipeline/mariana_trench_parser_objects.py @@ -0,0 +1,354 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + + +import re + +from collections import defaultdict +from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union + +from .. import pipeline as sapp + + +UNKNOWN_PATH: str = "unknown" +UNKNOWN_LINE: int = -1 + + +class Method(NamedTuple): + name: str + + @staticmethod + def from_json(method: Union[str, Dict[str, Any]]) -> "Method": + if isinstance(method, str): + return Method(method) + + canonical_name = method["name"] + + parameter_type_overrides = method.get("parameter_type_overrides") + if parameter_type_overrides: + parameter_type_overrides = ( + f"{override['parameter']}: {override['type']}" + for override in parameter_type_overrides + ) + canonical_name += "[%s]" % ", ".join(parameter_type_overrides) + + return Method(canonical_name) + + +class Port(NamedTuple): + value: str + + def is_leaf(self) -> bool: + return ( + self.value in ("source", "sink") + or self.value.startswith("anchor:") + or self.value.startswith("producer:") + ) + + @staticmethod + def to_crtex(port: str) -> str: + """Converts 'argument(n)' to 'formal(n)'. Other CRTEX tools use 'formal' + to denote argument positions.""" + return re.sub(r"argument\((-?\d+)\)", r"formal(\1)", port) + + @staticmethod + def from_json(port: str, leaf_kind: str) -> "Port": + elements = port.split(".") + + if len(elements) == 0: + raise sapp.ParseError(f"Invalid port: `{port}`.") + + elements[0] = elements[0].lower() + if elements[0] == "leaf": + elements[0] = leaf_kind + elif elements[0] == "return": + elements[0] = "result" + elif elements[0] == "anchor": + # Anchor port is of the form Anchor. + # SAPP/CRTEX expects: "anchor:formal(0)" + canonical_port = Port.from_json( + ".".join(elements[1:]), "unreachable_leaf_kind_anchor" + ) + return Port(f"{elements[0]}:{Port.to_crtex(canonical_port.value)}") + elif elements[0] == "producer" and len(elements) >= 3: + # Producer port is of the form Producer... + # SAPP/CRTEX expects: "producer::". + root = elements[0] + producer_id = elements[1] + canonical_port = Port.from_json( + ".".join(elements[2:]), "unreachable_leaf_kind_producer" + ) + return Port(f"{root}:{producer_id}:{Port.to_crtex(canonical_port.value)}") + + return Port(".".join(elements)) + + +class Position(NamedTuple): + path: str + line: int + start: int + end: int + + @staticmethod + def default() -> "Position": + return Position(UNKNOWN_PATH, UNKNOWN_LINE, 0, 0) + + @staticmethod + def from_json(position: Dict[str, Any], method: Optional[Method]) -> "Position": + path = position.get("path", UNKNOWN_PATH) + line = position.get("line", UNKNOWN_LINE) + start = position.get("start", 0) + 1 + end = max(position.get("end", 0) + 1, start) + if path == UNKNOWN_PATH and method: + path = method.name.split(";")[0] + path = path.split("$")[0] + path = path[1:] + return Position(path, line, start, end) + + def to_sapp(self) -> sapp.SourceLocation: + return sapp.SourceLocation( + line_no=self.line, + begin_column=self.start, + end_column=self.end, + ) + + +class Origin(NamedTuple): + callee_name: Method + callee_port: Port + + @staticmethod + def from_json(leaf_json: Dict[str, Any], leaf_kind: str) -> "Origin": + """ + Depending on the origin kind, the json keys will vary: + + Method origin (most common): { "method" : ... , "port" : ... } + Field origin: { "field" : ... } + No port for field origins. Always assumed to be "Leaf". + Crtex origin : { "canonical_name" : ... , "port" : ... } + """ + callee = leaf_json.get( + "method", leaf_json.get("field", leaf_json.get("canonical_name")) + ) + if not callee: + raise sapp.ParseError(f"No callee found in origin {leaf_json}.") + callee_name = Method.from_json(callee) + + # The origin represents a call to a leaf/terminal trace. Its port should + # indicate that, so that downstream trace reachability computation knows + # when it has reached the end. See trace_graph.is_leaf_port(). Non-CRTEX + # ports should always be regardless of the JSON (e.g. method + # origins could indicate that the sink comes from "argument(1)"", but it + # needs to be "sink" in sapp). + callee_port = Port.from_json("leaf", leaf_kind) + if "canonical_name" in leaf_json: + # All CRTEX ports are considered leaf ports. + callee_port = Port.from_json(leaf_json["port"], leaf_kind) + + if not callee_port.is_leaf(): + raise sapp.ParseError(f"Encountered non-leaf port in origin {leaf_json}") + + return Origin(callee_name, callee_port) + + +class CallInfo(NamedTuple): + """Mirrors the CallInfo object in the analysis""" + + call_kind: str + method: Optional[Method] + port: Port + position: Position + + @staticmethod + def from_json( + taint_json: Dict[str, Any], leaf_kind: str, caller_position: Position + ) -> "CallInfo": + call_kind = taint_json["call_kind"] + + callee = taint_json.get("resolves_to") + method = Method.from_json(callee) if callee else None + port = Port.from_json(taint_json.get("port", "leaf"), leaf_kind) + + position_json = taint_json.get("position") + position = ( + caller_position + if not position_json + else Position.from_json(position_json, method) + ) + return CallInfo(call_kind, method, port, position) + + def is_declaration(self) -> bool: + """Can can be a declaration for a source/sink (call_kind == Declaration) + or a propagation (call_kind == PropagationWithTrace:Declaration)""" + return "Declaration" in self.call_kind + + def is_origin(self) -> bool: + return "Origin" in self.call_kind + + def is_propagation_without_trace(self) -> bool: + return "Propagation" == self.call_kind + + +class LocalPositions(NamedTuple): + positions: List[Position] + + @staticmethod + def from_json(positions: List[Dict[str, Any]], method: Method) -> "LocalPositions": + return LocalPositions( + [Position.from_json(position, method) for position in positions] + ) + + @staticmethod + def from_taint_json( + taint: Dict[str, Any], caller_method: Method + ) -> "LocalPositions": + """The `taint` json should be of the following form: + { + "call": {...}, --> Optional field in `taint` + "kinds": [ + { "kind": "Source", "local_positions": [ { } ] }, + ... + ] + } + """ + return LocalPositions.from_json( + taint.get("local_positions", []), + caller_method, + ) + + def to_sapp(self) -> List[sapp.SourceLocation]: + return [position.to_sapp() for position in sorted(self.positions)] + + +class Features(NamedTuple): + features: Set[str] + + @staticmethod + def from_json(features: Dict[str, Any]) -> "Features": + may_features = set(features.get("may_features", [])) + always_features = { + f"always-{feature}" for feature in features.get("always_features", []) + } + return Features(may_features | always_features) + + @staticmethod + def from_taint_json(taint: Dict[str, Any]) -> "Features": + """Similar to `LocalPositions.from_taint_json`.""" + # User-declared features are stored in "local_user_features" and should + # be reported as local features in order to show up in the trace frame + # on the UI. + user_features = Features.from_json(taint.get("local_user_features", {})) + local_features = Features.from_json(taint.get("local_features", {})) + return Features(user_features.features | local_features.features) + + def to_sapp(self) -> List[str]: + return sorted(self.features) + + def to_sapp_as_parsetracefeature(self) -> List[sapp.ParseTraceFeature]: + return [ + sapp.ParseTraceFeature(feature, []) for feature in sorted(self.features) + ] + + +class ExtraTrace(NamedTuple): + kind: str + callee: CallInfo + + @staticmethod + def from_json( + extra_trace: Dict[str, Any], caller_position: Position + ) -> "ExtraTrace": + return ExtraTrace( + kind=extra_trace["kind"], + callee=CallInfo.from_json( + extra_trace["call_info"], "sink", caller_position + ), + ) + + def to_sapp(self) -> sapp.ParseTraceAnnotation: + subtraces = ( + [ + sapp.ParseTraceAnnotationSubtrace( + callee=self.callee.method.name, + port=self.callee.port.value, + position=self.callee.position.to_sapp(), + ) + ] + if self.callee.method + else [] + ) + + return sapp.ParseTraceAnnotation( + location=self.callee.position.to_sapp(), + kind="tito_transform", + msg=f"Propagation through {self.kind}", + leaf_kind=self.kind, + leaf_depth=0, + type_interval=None, + link=None, + trace_key=None, + titos=[], + subtraces=subtraces, + ) + + +class TypeInterval(NamedTuple): + callee_interval: Tuple[int, int] + preserves_type_context: bool + + @staticmethod + def from_json(kind: Dict[str, Any], leaf_kind: str) -> Optional["TypeInterval"]: + """Parses class interval information from the kind JSON""" + interval = kind.get("callee_interval") + if interval is None: + return None + + # If "callee_interval" exists, "preserves_type_context" must exist too. + return TypeInterval( + callee_interval=(interval[0], interval[1]), + preserves_type_context=kind["preserves_type_context"], + ) + + def to_sapp(self) -> sapp.ParseTypeInterval: + return sapp.ParseTypeInterval( + start=self.callee_interval[0], + finish=self.callee_interval[1], + preserves_type_context=self.preserves_type_context, + ) + + +class Kind(NamedTuple): + name: str + distance: int + origins: List[Origin] + extra_traces: List[ExtraTrace] + type_interval: Optional[TypeInterval] + + @staticmethod + def from_json( + kind: Dict[str, Any], leaf_kind: str, caller_position: Position + ) -> "Kind": + origins = [] + for origin in kind.get("origins", []): + origins.append(Origin.from_json(origin, leaf_kind)) + extra_traces = [] + for extra_trace in kind.get("extra_traces", []): + extra_traces.append(ExtraTrace.from_json(extra_trace, caller_position)) + return Kind( + name=kind["kind"], + distance=kind.get("distance", 0), + origins=origins, + extra_traces=extra_traces, + type_interval=TypeInterval.from_json(kind, leaf_kind), + ) + + @staticmethod + def partition_by_interval( + kinds: List["Kind"], + ) -> Dict[Optional[TypeInterval], List["Kind"]]: + kinds_by_interval = defaultdict(list) + for kind in kinds: + kinds_by_interval[kind.type_interval].append(kind) + return kinds_by_interval