Skip to content

Commit

Permalink
feat: better worker logging (#230)
Browse files Browse the repository at this point in the history
The logging from the worker was logging on every http request, and
because querying from SQLite files uses _many_ requests, the logs were
too noisy to be helpful.

To address this, the httpx log level in the worker is increased from
INFO to WARNING, and manual log messages have been put into key parts of
the process.
  • Loading branch information
michalc authored Jun 13, 2024
2 parents a0a5e70 + 3d3f1ad commit 2ac87de
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions app_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def convert_json_to_csvs(dataset_id, version):
def save_csv(path, chunks):
table = path.replace('_', '-') # GDS API guidelines prefer dash to underscore
s3_key = f'{dataset_id}/{version}/tables/{table}/data.csv'
logger.info('Converting %s %s JSON "table" %s to CSV in %s',
dataset_id, version, table, s3_key)
aws_multipart_upload(signed_s3_request, s3_key, chunks)

with signed_s3_request('GET', s3_key=f'{dataset_id}/{version}/data.json') as response:
Expand Down Expand Up @@ -196,13 +198,15 @@ def with_is_first(iterable):
# Convert SQLite to an ODS file
s3_key = f'{dataset_id}/{version}/data.ods'
ods_sheets = to_sheets(query, get_table_sqls(query), fix_ods_types)
logger.info('Converting %s %s SQLite to ODS in %s', dataset_id, version, s3_key)
aws_multipart_upload(signed_s3_request, s3_key, stream_write_ods(ods_sheets))
except ZipOverflowError:
logger.exception('ODS of entire SQLite would be too large for LibreOffice')

# Convert SQLite to JSON
s3_key = f'{dataset_id}/{version}/data.json'
json_sheets = to_sheets(query, get_table_sqls(query), lambda r: r)
logger.info('Converting %s %s SQLite to JSON in %s', dataset_id, version, s3_key)
aws_multipart_upload(signed_s3_request, s3_key, stream_write_json(json_sheets))

for table_name, data_sql in get_table_sqls(query):
Expand All @@ -211,6 +215,8 @@ def with_is_first(iterable):
# Save as CSV, with rows ordered by primary kay columns
with query(data_sql) as (cols, rows):
s3_key = f'{dataset_id}/{version}/tables/{table_id}/data.csv'
logger.info('Converting %s %s SQLite table %s to CSV in %s', dataset_id,
version, table_name, s3_key)
aws_multipart_upload(signed_s3_request, s3_key,
csv_data(cols, rows, with_header=True))

Expand All @@ -219,6 +225,8 @@ def with_is_first(iterable):
with query(data_sql) as (cols, rows):
s3_key = f'{dataset_id}/{version}/tables/{table_id}/data.ods'
ods_sheets = ((table_name, cols, fix_ods_types(rows)),)
logger.info('Converting %s %s SQLite table %s to ODS in %s', dataset_id,
version, table_name, s3_key)
aws_multipart_upload(signed_s3_request, s3_key,
stream_write_ods(ods_sheets))
except ZipOverflowError:
Expand Down Expand Up @@ -253,6 +261,9 @@ def get_num_statements_with_rows(query_multi, script):
for name, script in reports:
report_id = name.replace('_', '-')

logger.info('Converting %s %s SQLite report %s to ODS in %s',
dataset_id, version, report_id, s3_key)

# Is this multi-statement query?
num_statements = get_num_statements_with_rows(query_multi, script)

Expand All @@ -263,6 +274,8 @@ def get_num_statements_with_rows(query_multi, script):
for i, (cols, rows) in enumerate(with_non_zero_rows(query_multi(script)))
for line in csv_data(cols, rows, with_header=i == 0)
)
logger.info('Converting %s %s SQLite report %s to combined CSV in %s',
dataset_id, version, report_id, s3_key)
aws_multipart_upload(signed_s3_request, s3_key, csv_lines)

# ... and as ODS with the results of each statement as a separate sheet
Expand All @@ -273,6 +286,8 @@ def get_num_statements_with_rows(query_multi, script):
for i, (cols, rows) in enumerate(
with_non_zero_rows(query_multi(script)))
)
logger.info('Converting %s %s SQLite report %s to combined ODS in %s',
dataset_id, version, report_id, s3_key)
aws_multipart_upload(signed_s3_request, s3_key, stream_write_ods(sheets))
except ZipOverflowError:
logger.exception(
Expand Down Expand Up @@ -309,35 +324,45 @@ def yield_compressed_bytes(_uncompressed_bytes):
status_sqlite, headers_sqlite = aws_head(signed_s3_request, sqlite_s3_key)

if status_json == 200 and status_sqlite == 404:
logger.debug('Source format of %s %s is JSON', dataset_id, version)
source_s3_key = json_s3_key
headers = headers_json
convert_func = convert_json_to_csvs
elif status_sqlite == 200:
logger.debug('Source format of %s %s is SQLite', dataset_id, version)
source_s3_key = sqlite_s3_key
headers = headers_sqlite
convert_func = convert_sqlite_to_csvs
else:
logger.warning('Unknown source format of %s %s - skipping', dataset_id, version)
continue

# Skip if we have already converted the source
etag = headers['etag'].strip('"')
etag_key = f'{source_s3_key}__CSV_VERSION_{CSV_VERSION}__{etag}'
status, _ = aws_head(signed_s3_request, etag_key)
if status == 200:
logger.debug('Have already converted %s %s to other formats', dataset_id, version)
continue

# Convert the source to CSVs
logger.info('Converting %s %s to other formats', dataset_id, version)

try:
convert_func(dataset_id, version)
except Exception:
logger.exception('Exception writing CSVs %s %s', dataset_id, version)
continue

# Compress the CSVs
logger.info('Compressing %s %s CSVs', dataset_id, version)

prefixes = (f'{dataset_id}/{version}/tables/', f'{dataset_id}/{version}/reports/')
for prefix in prefixes:
for table in aws_list_folders(signed_s3_request, prefix=prefix):
csv_s3_key = f'{prefix}{table}/data.csv'
logger.info('Compressing %s %s CSV table/report %s to %s',
dataset_id, version, table, csv_s3_key)
with signed_s3_request('GET', s3_key=csv_s3_key) as response:
if response.status != 200:
return
Expand All @@ -347,6 +372,7 @@ def yield_compressed_bytes(_uncompressed_bytes):
with signed_s3_request('GET', s3_key=source_s3_key) as response:
if response.status != 200:
continue
logger.info('Compressing %s %s source file', dataset_id, version)
save_compressed(source_s3_key + '.gz', response.stream(65536))

# Re-create the CSVs if the data has since changed...
Expand All @@ -358,6 +384,7 @@ def yield_compressed_bytes(_uncompressed_bytes):
continue

# ... and don't re-create the CSVs if it has not since changed
logger.info('Putting %s %s etag key at %s', dataset_id, version, etag_key)
with signed_s3_request('PUT', s3_key=etag_key) as response:
put_response_body = response.read()
if response.status != 200:
Expand All @@ -374,6 +401,11 @@ def main():
handler.setFormatter(ASIMFormatter())
logger.addHandler(handler)

# Converting SQLite files makes many HTTP requests (in the hundreds of thousands level for
# the UK Tariff) and the INFO log level makes the logs too noisy to be very useful
httpxLogger = logging.getLogger('httpx')
httpxLogger.setLevel(logging.WARNING)

if os.environ.get('SENTRY_DSN'):
sentry_sdk.init( # pylint: disable=abstract-class-instantiated
dsn=os.environ['SENTRY_DSN'],
Expand Down

0 comments on commit 2ac87de

Please sign in to comment.