Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detecting errors in data unstaging #5345

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ class BashWrapperBuilder {
binding.launch_cmd = getLaunchCommand(interpreter,env)
binding.stage_cmd = getStageCommand()
binding.unstage_cmd = getUnstageCommand()
binding.unstage_controls_cmd = getUnstageControlsCommand()
binding.unstage_controls = changeDir || shouldUnstageOutputs() ? getUnstageControls() : null

if( changeDir || shouldUnstageOutputs() ) {
Expand Down Expand Up @@ -752,6 +753,9 @@ class BashWrapperBuilder {

protected String getUnstageCommand() { 'nxf_unstage' }

protected String getUnstageControlsCommand() { 'nxf_unstage_std_files' }


protected String getUnstageControls() {
def result = copyFileToWorkDir(TaskRun.CMD_OUTFILE) + ' || true' + ENDL
result += copyFileToWorkDir(TaskRun.CMD_ERRFILE) + ' || true' + ENDL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
return """\
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
${stageOutCommand('$name', targetDir, mode)} || true
${stageOutCommand('$name', targetDir, mode)}
done
unset IFS""".stripIndent(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
## Capture possible errors.
## Can be caused either by the task script, unstage script or after script if defined
local last_err=$?
## capture the task error first or fallback to unstage error
exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:0} -ne 0 ]] && exit_status=${nxf_unstage_ret}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status {{exit_file}}
set +u
{{cleanup_cmd}}
Expand All @@ -123,11 +129,14 @@ nxf_stage() {

nxf_unstage() {
true
{{unstage_controls}}
[[ ${nxf_main_ret:=0} != 0 ]] && return
{{unstage_outputs}}
}

nxf_unstage_std_files() {
true
{{unstage_controls}}
}

nxf_main() {
trap on_exit EXIT
trap on_term TERM INT USR2
Expand All @@ -150,11 +159,17 @@ nxf_main() {
export NXF_TASK_WORKDIR="$PWD"
{{stage_cmd}}

## Deactivate fast failure to allow uploading stdout and stderr files later
set +e
(set -o pipefail; (nxf_launch | tee {{stdout_file}}) 3>&1 1>&2 2>&3 | tee {{stderr_file}}) &
pid=$!
wait $pid || nxf_main_ret=$?
{{unstage_cmd}}
if [[ ${nxf_main_ret:=0} == 0 ]]; then
## Data unstaging redirecting stdout and stderr with append mode
(set -e -o pipefail; ({{unstage_cmd}} | tee -a {{stdout_file}}) 3>&1 1>&2 2>&3 | tee -a {{stderr_file}})
nxf_unstage_ret=$?
fi
{{unstage_controls_cmd}}
{{after_script}}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class BashWrapperBuilderTest extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_fs_copy "$name" /work/dir || true
nxf_fs_copy "$name" /work/dir
done
unset IFS
'''.stripIndent().rightTrim()
Expand All @@ -576,7 +576,7 @@ class BashWrapperBuilderTest extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_fs_move "$name" /another/dir || true
nxf_fs_move "$name" /another/dir
done
unset IFS
'''.stripIndent().rightTrim()
Expand Down Expand Up @@ -1311,12 +1311,14 @@ class BashWrapperBuilderTest extends Specification {
then:
builder.getStageCommand() == 'nxf_stage'
builder.getUnstageCommand() == 'nxf_unstage'
builder.getUnstageControlsCommand() == 'nxf_unstage_std_files'

when:
def binding = builder.makeBinding()
then:
binding.stage_cmd == 'nxf_stage'
binding.unstage_cmd == 'nxf_unstage'
binding.unstage_controls_cmd == 'nxf_unstage_std_files'

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_copy "$name" /target/work\\ dir || true
nxf_fs_copy "$name" /target/work\\ dir
done
unset IFS
'''
Expand All @@ -293,7 +293,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_move "$name" /target/store || true
nxf_fs_move "$name" /target/store
done
unset IFS
'''
Expand All @@ -315,7 +315,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_rsync "$name" /target/work\\'s || true
nxf_fs_rsync "$name" /target/work\\'s
done
unset IFS
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:0} -ne 0 ]] && exit_status=${nxf_unstage_ret}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -291,7 +294,10 @@ nxf_stage() {

nxf_unstage() {
true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_unstage_std_files() {
true
}

nxf_main() {
Expand All @@ -313,7 +319,11 @@ nxf_main() {
(set -o pipefail; (nxf_launch | tee .command.out) 3>&1 1>&2 2>&3 | tee .command.err) &
pid=$!
wait $pid || nxf_main_ret=$?
nxf_unstage
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (nxf_unstage | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
nxf_unstage_std_files
}

$NXF_ENTRY
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:0} -ne 0 ]] && exit_status=${nxf_unstage_ret}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -102,7 +105,10 @@ nxf_stage() {

nxf_unstage() {
true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_unstage_std_files() {
true
}

nxf_main() {
Expand Down Expand Up @@ -130,7 +136,11 @@ nxf_main() {
(set -o pipefail; (nxf_launch | tee .command.out) 3>&1 1>&2 2>&3 | tee .command.err) &
pid=$!
wait $pid || nxf_main_ret=$?
nxf_unstage
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (nxf_unstage | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
nxf_unstage_std_files
}

$NXF_ENTRY
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BashWrapperBuilderWithS3Test extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai bla\\ nk.txt" | sort | uniq); do
nxf_s3_upload $name s3://some/buck\\ et || true
nxf_s3_upload $name s3://some/buck\\ et
done
unset IFS
'''.stripIndent().rightTrim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BashWrapperBuilderWithAzTest extends Specification {
binding.unstage_outputs == """\
IFS=\$'\\n'
for name in \$(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_az_upload \$name '${AzHelper.toHttpUrl(target)}' || true
nxf_az_upload \$name '${AzHelper.toHttpUrl(target)}'
done
unset IFS
""".stripIndent().rightTrim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class GoogleLifeSciencesHelper {
final remoteTaskDir = getRemoteTaskDir(workDir)
def result = 'set -x; '
result += "trap 'err=\$?; exec 1>&2; gsutil -m -q cp -R $localTaskDir/${TaskRun.CMD_LOG} ${remoteTaskDir}/${TaskRun.CMD_LOG} || true; [[ \$err -gt 0 || \$GOOGLE_LAST_EXIT_STATUS -gt 0 || \$NXF_DEBUG -gt 0 ]] && { ls -lah $localTaskDir || true; gsutil -m -q cp -R /google/ ${remoteTaskDir}; } || rm -rf $localTaskDir; exit \$err' EXIT; "
result += "{ cd $localTaskDir; bash ${TaskRun.CMD_RUN} nxf_unstage; } >> $localTaskDir/${TaskRun.CMD_LOG} 2>&1"
result += "{ cd $localTaskDir; bash ${TaskRun.CMD_RUN} nxf_unstage; bash ${TaskRun.CMD_RUN} nxf_unstage_std_files;} >> $localTaskDir/${TaskRun.CMD_LOG} 2>&1"
return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ class GoogleLifeSciencesScriptLauncher extends BashWrapperBuilder {
protected String getStageCommand() { null }

@Override
protected String getUnstageCommand() { null }
protected String getUnstageCommand() { 'true' }

@Override
protected String getUnstageControlsCommand() { null }

@Override
String touchFile(Path file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class GoogleLifeSciencesHelperTest extends GoogleSpecification {
def unstage = helper.getUnstagingScript(dir)
then:
unstage ==
'set -x; trap \'err=$?; exec 1>&2; gsutil -m -q cp -R /work/dir/.command.log gs://my-bucket/work/dir/.command.log || true; [[ $err -gt 0 || $GOOGLE_LAST_EXIT_STATUS -gt 0 || $NXF_DEBUG -gt 0 ]] && { ls -lah /work/dir || true; gsutil -m -q cp -R /google/ gs://my-bucket/work/dir; } || rm -rf /work/dir; exit $err\' EXIT; { cd /work/dir; bash .command.run nxf_unstage; } >> /work/dir/.command.log 2>&1'
'set -x; trap \'err=$?; exec 1>&2; gsutil -m -q cp -R /work/dir/.command.log gs://my-bucket/work/dir/.command.log || true; [[ $err -gt 0 || $GOOGLE_LAST_EXIT_STATUS -gt 0 || $NXF_DEBUG -gt 0 ]] && { ls -lah /work/dir || true; gsutil -m -q cp -R /google/ gs://my-bucket/work/dir; } || rm -rf /work/dir; exit $err\' EXIT; { cd /work/dir; bash .command.run nxf_unstage; bash .command.run nxf_unstage_std_files;} >> /work/dir/.command.log 2>&1'
}

@Unroll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class GoogleLifeSciencesScriptLauncherTest extends GoogleSpecification {
then:
binding.touch_file == null
binding.stage_cmd == null
binding.unstage_cmd == null
binding.unstage_cmd == 'true'
binding.unstage_controls_cmd == null
binding.task_env == '''\
chmod +x /work/xx/yy/nextflow-bin/* || true
export PATH=/work/xx/yy/nextflow-bin:$PATH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:0} -ne 0 ]] && exit_status=${nxf_unstage_ret}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -194,10 +197,13 @@ nxf_stage() {

nxf_unstage() {
true
}

nxf_unstage_std_files() {
true
gsutil -m -q cp -R .command.out gs://bucket/work/dir/.command.out || true
gsutil -m -q cp -R .command.err gs://bucket/work/dir/.command.err || true
gsutil -m -q cp -R .exitcode gs://bucket/work/dir/.exitcode || true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_main() {
Expand All @@ -217,6 +223,10 @@ nxf_main() {
(set -o pipefail; (nxf_launch | tee .command.out) 3>&1 1>&2 2>&3 | tee .command.err) &
pid=$!
wait $pid || nxf_main_ret=$?
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (true | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
}

$NXF_ENTRY