Skip to content

Commit

Permalink
bug-1889156: use variation of check_crashids_for_date instead of fetc…
Browse files Browse the repository at this point in the history
…h_crashids

fetch_crashids uses Elasticsearch as the data source under the hood, but we need the data source to be S3 or GCS.
  • Loading branch information
biancadanforth committed May 8, 2024
1 parent 7bdd323 commit ecddb5a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 47 deletions.
113 changes: 72 additions & 41 deletions bin/load_processed_crash_into_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,89 @@

# Usage: ./bin/load_processed_crash_into_es.py [OPTIONS] CRASH_ID

from contextlib import redirect_stdout
import io
import click
import datetime
from more_itertools import chunked

from socorro import settings
from socorro.libclass import build_instance_from_settings
from socorro.scripts.fetch_crashids import main as fetch_crashids
from socorro.lib.libooid import date_from_ooid
from webapp.crashstats.supersearch.models import SuperSearchUnredacted


DEFAULT_HOST = "https://crash-stats.mozilla.org"

PROCESSED_CRASH_TEMPLATE = "v1/processed_crash/%s"
# TODO: change to "all" when done manual testing
NUM_CRASHIDS_TO_FETCH = 5
# Number of prefix variations to pass to a check_crashids
CHUNK_SIZE = 4

def get_threechars():
"""Generate all combinations of 3 hex digits."""
chars = "0123456789abcdef"
for x in chars:
for y in chars:
for z in chars:
yield x + y + z

def check_elasticsearch(supersearch, crash_ids):
"""Checks Elasticsearch and returns list of missing crash ids.
Crash ids should all be on the same day.
"""
crash_ids = [crash_ids] if isinstance(crash_ids, str) else crash_ids
crash_date = date_from_ooid(crash_ids[0])

# The datestamp in the crashid doesn't match the processed date sometimes especially
# when the crash came in at the end of the day.
start_date = (crash_date - datetime.timedelta(days=5)).strftime("%Y-%m-%d")
end_date = (crash_date + datetime.timedelta(days=5)).strftime("%Y-%m-%d")

params = {
"uuid": crash_ids,
"date": [">=%s" % start_date, "<=%s" % end_date],
"_columns": ["uuid"],
"_facets": [],
"_facets_size": 0,
}
search_results = supersearch.get(**params)

crash_ids_in_es = [hit["uuid"] for hit in search_results["hits"]]
return set(crash_ids) - set(crash_ids_in_es)

def get_missing_crash_ids(crash_source, date):
supersearch = SuperSearchUnredacted()

missing = []

firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE)
for firstchars in firstchars_chunked:
# Grab all the crash ids at the given date directory
processed_crash_key_prefix = PROCESSED_CRASH_TEMPLATE % (date, firstchars)

page_iterator = crash_source.connection.list_objects_paginator(
bucket=crash_source.bucket,
prefix=processed_crash_key_prefix,
)

for page in page_iterator:
crash_ids = [
item["Key"].split("/")[-1] for item in page.get("Contents", [])
]

if not crash_ids:
continue

# Check Elasticsearch in batches
for crash_ids_batch in chunked(crash_ids, 100):
missing_in_es = check_elasticsearch(supersearch, crash_ids_batch)
missing.extend(missing_in_es)

return list(set(missing))


@click.command()
@click.option(
"--host",
default=DEFAULT_HOST,
type=str,
help="Host for system to fetch crashids from.",
)
@click.option(
"--date",
default=None,
Expand All @@ -49,38 +109,23 @@
type=str,
help="A single crash ID to load into ES from the source. E.g. 64f9a777-771d-4db3-82fa-b9c190240430",
)
@click.option(
"--skip-if-already-stored",
default=True,
type=bool,
help="Don't reload the crash if it is already stored in Elasticsearch.",
)
@click.pass_context
def load_crash(ctx, host, date, crash_id, skip_if_already_stored):
"""Loads a processed crash into Elasticsearch by crash ID or date.
Specify CRASH_ID or DATE.
Note: --skip-if-already-stored only works when date is provided.
"""
crash_ids = []

crash_source = build_instance_from_settings(settings.CRASH_SOURCE)

crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])

if crash_id:
crash_ids.append(crash_id)
elif date:
click.echo(f"Fetching crash IDs from {host!r} on {date!r}...")
with redirect_stdout(io.StringIO()) as f:
# TODO: Performance concerns about this being a really high number of crash IDs if using num="all"?
ctx.invoke(
fetch_crashids,
["--date", date, "--host", host, "--num", str(NUM_CRASHIDS_TO_FETCH), "--product", "all"],
)
crash_ids = f.getvalue().splitlines()
crash_ids = get_missing_crash_ids(crash_source, date)

else:
raise click.BadParameter(
Expand All @@ -91,18 +136,6 @@ def load_crash(ctx, host, date, crash_id, skip_if_already_stored):

for crash_id in crash_ids:
try:
date_datetime = datetime.datetime.strptime(date, "%Y-%m-%d")
if (
skip_if_already_stored
# We could only have a crash ID, which is insufficient to check
and date
and crash_dest.document_exists(date_datetime, crash_id)
):
click.echo(
f"Already loaded processed crash {crash_id!r} in Elasticsearch. Skipping."
)
continue

processed_crash = crash_source.get_processed_crash(crash_id)
crash_dest.save_processed_crash(None, processed_crash)

Expand All @@ -114,9 +147,7 @@ def load_crash(ctx, host, date, crash_id, skip_if_already_stored):
click.echo(
f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}."
)
# TODO: Should this be a continue? In most cases, we'll run this
# with a date with a list of crash ids.
ctx.exit(1)
continue


if __name__ == "__main__":
Expand Down
6 changes: 0 additions & 6 deletions socorro/external/es/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,3 @@ def _submit_crash_to_elasticsearch(
exc_info=True,
)
raise

def document_exists(self, date, crash_id):
index = self.get_index_for_date(date)
# TODO: Are there different `doc_type`s that we use? Is `_all` fine?
# https://elasticsearch-py.readthedocs.io/en/1.9.0/api.html#elasticsearch.Elasticsearch.exists
return self.client.connection().exists(index=index, doc_type="_all", id=crash_id)

0 comments on commit ecddb5a

Please sign in to comment.