Skip to content

Commit

Permalink
render progress via click
Browse files Browse the repository at this point in the history
  • Loading branch information
petrjasek committed Mar 11, 2024
1 parent 2cba856 commit 343093f
Showing 1 changed file with 46 additions and 13 deletions.
59 changes: 46 additions & 13 deletions eve_elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bson import ObjectId
from elasticsearch.helpers import bulk, reindex # noqa: F401

from click import progressbar
from uuid import uuid4
from flask import request, abort, json, current_app as app
from eve.utils import config
Expand Down Expand Up @@ -1074,7 +1075,13 @@ def reindex(self, resource, *, requests_per_second=1000): # noqa: F811
try:
es.indices.get(index=alias)
print("Old index is not using an alias.")
_background_reindex(es, alias, new_index)
_background_reindex(
es,
alias,
new_index,
requests_per_second=requests_per_second,
refresh=True,
)
es.indices.update_aliases(
body={
"actions": [
Expand Down Expand Up @@ -1158,14 +1165,19 @@ def reindex(self, resource, *, requests_per_second=1000): # noqa: F811
)

# do it as fast as possible
_background_reindex(es, tmp_index, new_index)
_background_reindex(es, tmp_index, new_index, refresh=True)

print("REMOVE TMP INDEX", tmp_index)
es.indices.delete(index=tmp_index)


def _background_reindex(
es: Elasticsearch, old_index: str, new_index: str, *, requests_per_second=None
es: Elasticsearch,
old_index: str,
new_index: str,
*,
requests_per_second=None,
refresh=False,
):
resp = es.reindex(
body={
Expand All @@ -1174,22 +1186,43 @@ def _background_reindex(
},
requests_per_second=requests_per_second,
wait_for_completion=False,
refresh=True,
refresh=refresh,
)
task_id = resp["task"]
print(f"REINDEXING {old_index} to {new_index} (task {task_id})")

# first get total number of items
while True:
time.sleep(2.0)
task_info = es.tasks.get(task_id=task_id)
if task_info["completed"]:
total = task_info["response"]["total"]
took = int(task_info["response"]["took"] / 1000) # ms to s
print()
print(f"DONE reindexing {total} items, took {took}s")
time.sleep(1.0)
task = es.tasks.get(task_id=task_id)
if task["completed"]:
print_task_done(task)
return
if task["task"]["status"]["total"]:
break
else:
print(".", end="")

# now it can render progress
last_created = 0
with progressbar(length=task["task"]["status"]["total"], label="Reindexing") as bar:
while True:
time.sleep(2.0)
task = es.tasks.get(task_id=task_id)
if (
task["task"]["status"]["created"]
and task["task"]["status"]["created"] > last_created
):
bar.update(task["task"]["status"]["created"] - last_created)
last_created = task["task"]["status"]["created"]
if task["completed"]:
bar.finish()
break

print_task_done(task)


def print_task_done(task):
took = int(task["response"]["took"] / 1000)
print(f"DONE in {took}s")


def build_elastic_query(doc):
Expand Down

0 comments on commit 343093f

Please sign in to comment.