Skip to content

Commit

Permalink
S3: API now also returns number of bytes processed
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers committed Dec 25, 2024
1 parent 5179377 commit 43ebd30
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 175 deletions.
9 changes: 9 additions & 0 deletions py_partiql_parser/_internal/clause_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ class ClauseTokenizer:
def __init__(self, from_clause: str):
self.token_list = from_clause
self.token_pos = 0
self.tokens_parsed = 0

def get_tokens_parsed(self) -> int:
x = self.tokens_parsed
self.tokens_parsed = 0
return x

def current(self) -> Optional[str]:
"""
Expand All @@ -22,6 +28,7 @@ def next(self) -> Optional[str]:
"""
try:
crnt_token = self.token_list[self.token_pos]
self.tokens_parsed += 1
self.token_pos += 1
return crnt_token
except IndexError:
Expand All @@ -34,11 +41,13 @@ def peek(self) -> Optional[str]:
return None

def revert(self) -> None:
self.tokens_parsed -= 1
self.token_pos -= 1

def skip_white_space(self) -> None:
try:
while self.token_list[self.token_pos] in [" ", "\n"]:
self.tokens_parsed += 1
self.token_pos += 1
except IndexError:
pass
Expand Down
83 changes: 26 additions & 57 deletions py_partiql_parser/_internal/from_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any, Dict

from .clause_tokenizer import ClauseTokenizer
from .json_parser import JsonParser
from .utils import CaseInsensitiveDict

from ..exceptions import ParserException
Expand Down Expand Up @@ -75,79 +74,49 @@ def __init__(self, from_clause: str):


class S3FromParser(FromParser):
def get_source_data(self, documents: Dict[str, str]) -> Any:
from_alias = list(self.clauses.keys())[0].lower()
def get_source_data(self, document: CaseInsensitiveDict[str, str]) -> Any:
from_query = list(self.clauses.values())[0].lower()
if "." in from_query:
return self._get_nested_source_data(documents)
return self._get_nested_source_data(document)

key_has_asterix = from_query.endswith("[*]")
from_query = from_query[0:-3] if key_has_asterix else from_query
from_alias = from_alias[0:-3] if from_alias.endswith("[*]") else from_alias
doc_is_list = documents[from_query].startswith("[") and documents[
from_query
].endswith("]")

source_data = list(JsonParser.parse(documents[from_query]))

if doc_is_list:
return {"_1": source_data[0]}
elif from_alias:
return [CaseInsensitiveDict({from_alias: doc}) for doc in source_data]
if isinstance(document, list):
return {"_1": document}
else:
return source_data
return document

def _get_nested_source_data(self, documents: Dict[str, Any]) -> Any:
def _get_nested_source_data(self, document: CaseInsensitiveDict[str, Any]) -> Any:
"""
Our FROM-clauses are nested, meaning we need to dig into the provided document to return the key that we need
--> FROM s3object.name as name
"""
root_doc = True
source_data = documents
iterate_over_docs = False
entire_key = list(self.clauses.values())[0].lower().split(".")
if entire_key[0].lower() in ["s3object[*]"]:
entire_key = entire_key[1:]
alias = list(self.clauses.keys())[0]
if alias.endswith("[*]"):
alias = alias[0:-3]
key_so_far = []
for key in entire_key:
key_so_far.append(key)
key_has_asterix = key.endswith("[*]") and key[0:-3] in source_data
new_key = key[0:-3] if key_has_asterix else key
if iterate_over_docs and isinstance(source_data, list): # type: ignore[unreachable]
# The previous key ended in [*]
# Iterate over all docs in the result, and only return the requested source key
if key_so_far == entire_key: # type: ignore[unreachable]
# If we have an alias, we have to use that instead of the original name
source_data = [{alias: doc.get(new_key, {})} for doc in source_data]
else:
source_data = [
doc.get_original(new_key) or CaseInsensitiveDict({})
for doc in source_data
]

if key in document:
document = document[key]
if isinstance(document, list):
# AWS behaviour when the root-document is a list
document = {"_1": document[0]}
elif key_so_far == entire_key:
if list(self.clauses.keys()) == list(self.clauses.values()):
# self.clauses contains the same from_clause if no alias is provided
# FROM s3object.a
pass
else:
# An alias has been provided, and the subsequent WHERE/SELECT clauses should use it
# FROM s3object as s WHERE s.x = '..'
document = CaseInsensitiveDict({alias: document})
else:
# The previous key was a regular key
# Assume that the result consists of a singular JSON document
if new_key in source_data:
doc_is_list = source_data[new_key].startswith("[") and source_data[
new_key
].endswith("]")
source_data = list(JsonParser.parse(source_data[new_key])) # type: ignore
if root_doc and doc_is_list:
# AWS behaviour when the root-document is a list
source_data = {"_1": source_data[0]} # type: ignore
elif key_so_far == entire_key:
if isinstance(source_data, list): # type: ignore[unreachable]
source_data = [{alias: doc} for doc in source_data] # type: ignore[unreachable]
else:
source_data = {alias: source_data}
else:
source_data = {}

iterate_over_docs = key_has_asterix
root_doc = False

return source_data
document = {}

return document


class DynamoDBFromParser(FromParser):
Expand Down
13 changes: 12 additions & 1 deletion py_partiql_parser/_internal/json_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from json import JSONEncoder
from typing import Any, List, Iterator, Optional
from typing import Any, List, Iterator, Optional, Tuple

from .clause_tokenizer import ClauseTokenizer
from .utils import CaseInsensitiveDict, Variable
Expand All @@ -25,6 +25,17 @@ def parse(original: str) -> Iterator[Any]: # type: ignore[misc]
if result is not None:
yield result

@staticmethod
def parse_with_tokens(original: str) -> Tuple[Iterator[Any], int]: # type: ignore[misc]
"""
Parse JSON string. Returns a tuple of (json_doc, nr_of_bytes_processed)
"""
tokenizer = ClauseTokenizer(original)
while tokenizer.current() is not None:
result = JsonParser._get_next_document(original, tokenizer)
if result is not None:
yield result, tokenizer.get_tokens_parsed()

@staticmethod
def _get_next_document( # type: ignore[misc]
original: str,
Expand Down
51 changes: 28 additions & 23 deletions py_partiql_parser/_internal/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from .delete_parser import DeleteParser
from .from_parser import DynamoDBFromParser, S3FromParser, FromParser
from .insert_parser import InsertParser
from .select_parser import SelectParser
from .json_parser import JsonParser
from .select_parser import DynamoDBSelectParser, S3SelectClauseParser
from .update_parser import UpdateParser
from .where_parser import DynamoDBWhereParser, S3WhereParser, WhereParser
from .utils import is_dict, QueryMetadata, CaseInsensitiveDict
from .utils import QueryMetadata, CaseInsensitiveDict


TYPE_RESPONSE = Tuple[
Expand All @@ -19,38 +20,42 @@


class S3SelectParser:
def __init__(self, source_data: Dict[str, str]):
# Source data is in the format: {source: json}
# Where 'json' is one or more json documents separated by a newline
def __init__(self, source_data: str):
# Source data is one or more json documents
self.documents = source_data
self.table_prefix = "s3object"
self.bytes_scanned = 0

def parse(self, query: str) -> List[Dict[str, Any]]:
query = query.replace("\n", " ")
clauses = re.split("SELECT | FROM | WHERE ", query, flags=re.IGNORECASE)
# First clause is whatever comes in front of SELECT - which should be nothing
_ = clauses[0]
# FROM
from_parser = S3FromParser(from_clause=clauses[2])

source_data = from_parser.get_source_data(self.documents)
if is_dict(source_data):
source_data = [source_data]

# WHERE
if len(clauses) > 3:
where_clause = clauses[3]
source_data = S3WhereParser(source_data).parse(where_clause)

# SELECT
select_clause = clauses[1]
from_parser = S3FromParser(from_clause=clauses[2])
table_prefix = self.table_prefix
for alias_key, alias_value in from_parser.clauses.items():
if table_prefix == alias_value:
if table_prefix == alias_value or f"{table_prefix}[*]" == alias_value:
table_prefix = alias_key
return SelectParser(table_prefix).parse(
select_clause, from_parser.clauses, source_data
)

results = []

for doc, tokens_parsed in JsonParser.parse_with_tokens(self.documents):
doc = from_parser.get_source_data(doc)
self.bytes_scanned += tokens_parsed

if len(clauses) > 3:
where_clause = clauses[3]
if not S3WhereParser.applies(doc, table_prefix, where_clause):
continue

select_clause = clauses[1]

S3SelectClauseParser(table_prefix).parse(
select_clause, from_parser.clauses, doc, results
)

return results


class DynamoDBStatementParser:
Expand Down Expand Up @@ -116,7 +121,7 @@ def _parse_select(

# SELECT
select_clause = clauses[1]
queried_data = SelectParser().parse(
queried_data = DynamoDBSelectParser().parse(
select_clause, from_parser.clauses, source_data
)
updates: Dict[
Expand Down
Loading

0 comments on commit 43ebd30

Please sign in to comment.