Skip to content

Commit

Permalink
Merge pull request #668 from oda-hub/get_query_new_status-logic
Browse files Browse the repository at this point in the history
get_query_new_status function Job class and improve logic assigning `query_new_status`
  • Loading branch information
burnout87 authored Jun 19, 2024
2 parents 7fb9989 + a26eb70 commit e4b7784
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 18 deletions.
16 changes: 16 additions & 0 deletions cdci_data_analysis/analysis/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ def get_call_back_url(self):

return url

def get_query_new_status(self):
if self.status == 'done':
query_new_status = 'done'
elif self.status == 'failed':
query_new_status = 'failed'
elif self.status == 'progress':
query_new_status = 'progress'
else:
job_monitor = self.updated_dataserver_monitor()
if job_monitor['status'] == 'progress':
query_new_status = 'progress'
else:
query_new_status = 'submitted'
self.set_submitted()

return query_new_status

class OsaJob(Job):
def __init__(self,
Expand Down
20 changes: 2 additions & 18 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2172,15 +2172,7 @@ def run_query(self, off_line=False, disp_conf=None):
debug_message=e.debug_message)

if query_out.status_dictionary['status'] == 0:
if job.status == 'done':
query_new_status = 'done'
elif job.status == 'failed':
query_new_status = 'failed'
elif job.status == 'progress':
query_new_status = 'progress'
else:
query_new_status = 'submitted'
job.set_submitted()
query_new_status = job.get_query_new_status()

if email_helper.is_email_to_send_run_query(self.logger,
query_new_status,
Expand Down Expand Up @@ -2270,15 +2262,7 @@ def run_query(self, off_line=False, disp_conf=None):
self.logger.info('-----------------> job status after query: %s', job.status)

if query_out.status_dictionary['status'] == 0:
if job.status == 'done':
query_new_status = 'done'
elif job.status == 'failed':
query_new_status = 'failed'
elif job.status == 'progress':
query_new_status = 'progress'
else:
query_new_status = 'submitted'
job.set_submitted()
query_new_status = job.get_query_new_status()

products_url = self.generate_products_url(self.app.config.get('conf').products_url, self.par_dic)
email_api_code = DispatcherAPI.set_api_code(self.par_dic,
Expand Down
68 changes: 68 additions & 0 deletions tests/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,74 @@ def test_resubmission_job_id(dispatcher_live_fixture_no_resubmit_timeout, status
assert jdata['exit_status']['job_status'] == 'ready'


@pytest.mark.not_safe_parallel
def test_resubmission_after_callback(dispatcher_live_fixture_no_resubmit_timeout):
server = dispatcher_live_fixture_no_resubmit_timeout
DispatcherJobState.remove_scratch_folders()
DataServerQuery.set_status("submitted")
logger.info("constructed server: %s", server)

# let's generate a valid token
token_payload = {
**default_token_payload,
}
encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256')

# these parameters define request content
base_dict_param = dict(
instrument="empty-async",
product_type="dummy-log-submit",
query_type="Real",
)

dict_param = dict(
query_status="new",
token=encoded_token,
**base_dict_param
)

c = requests.get(os.path.join(server, "run_analysis"),
dict_param
)

assert c.status_code == 200
jdata = c.json()
print(json.dumps(jdata, sort_keys=True, indent=4))
dispatcher_job_state = DispatcherJobState.from_run_analysis_response(c.json())
time_request = jdata['time_request']
jdata = c.json()
assert jdata['exit_status']['job_status'] == "submitted"
assert DataServerQuery.get_status() == "submitted"

c = requests.get(os.path.join(server, "call_back"),
params=dict(
job_id=dispatcher_job_state.job_id,
session_id=dispatcher_job_state.session_id,
instrument_name="empty-async",
action="progress",
node_id='node_progress',
message='progressing',
token=encoded_token,
time_original_request=time_request
))
assert c.status_code == 200
jdata = dispatcher_job_state.load_job_state_record('node_progress', 'progressing')
assert jdata['status'] == "progress"
assert jdata['full_report_dict']['action'] == "progress"

# resubmit the job after the timeout expired
time.sleep(10.5)
dict_param['job_id'] = dispatcher_job_state.job_id
dict_param['query_status'] = "progress"
c = requests.get(os.path.join(server, "run_analysis"),
dict_param
)

assert c.status_code == 200
jdata = c.json()
assert jdata['exit_status']['job_status'] == "progress"
assert jdata['query_status'] == "progress"

@pytest.mark.not_safe_parallel
def test_failed_resubmission(dispatcher_live_fixture_no_resubmit_timeout):
server = dispatcher_live_fixture_no_resubmit_timeout
Expand Down

0 comments on commit e4b7784

Please sign in to comment.