Skip to content

Commit

Permalink
Improve job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Sep 11, 2023
1 parent ee4040b commit f7fff08
Show file tree
Hide file tree
Showing 17 changed files with 730 additions and 1,153 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ spark-warehouse
*.flattened-pom.xml

seatunnel-examples
/lib/*
/lib/*
version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ source {
parallelism = ${fake_parallelism}
username = ${username}
password = ${password}
partition= ${partition111}
schema = {
fields {
name = "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ public void testSubmitJob() {
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender

logger.zeta.name=org.apache.seatunnel.engine
logger.zeta.level=WARN
logger.zeta.level=INFO

appender.consoleStdout.name = consoleStdoutAppender
appender.consoleStdout.type = CONSOLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testContentFormatUtil() throws InterruptedException {
new JobStatusData(
4352352414135L + i,
"Testfdsafew" + i,
JobStatus.CANCELLING,
JobStatus.CANCELING,
System.currentTimeMillis(),
System.currentTimeMillis()));
Thread.sleep(2L);
Expand All @@ -53,7 +53,7 @@ public void testContentFormatUtil() throws InterruptedException {
new JobStatusData(
4352352414135L + i,
"fdsafsddfasfsdafasdf" + i,
JobStatus.RECONCILING,
JobStatus.UNKNOWABLE,
System.currentTimeMillis(),
null));
Thread.sleep(2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,29 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.scheduler;
package org.apache.seatunnel.engine.common.exception;

import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.core.HazelcastException;

import lombok.NonNull;
public class TaskGroupDeployException extends HazelcastException
implements ClientExceptionFactory.ExceptionFactory {
public TaskGroupDeployException() {}

import java.util.concurrent.CompletableFuture;
public TaskGroupDeployException(String message) {
super(message);
}

public interface JobScheduler {
CompletableFuture<Void> reSchedulerPipeline(@NonNull SubPlan subPlan);
public TaskGroupDeployException(String message, Throwable cause) {
super(message, cause);
}

void startScheduling();
public TaskGroupDeployException(Throwable cause) {
super(cause);
}

@Override
public Throwable createException(String s, Throwable throwable) {
return new TaskGroupDeployException(s, throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public enum JobStatus {
/** Job is newly created, no task has started to run. */
CREATED(EndState.NOT_END),

/** Job is begin schedule but some task not deploy complete. */
/** Job will scheduler every pipeline */
SCHEDULED(EndState.NOT_END),

/** Some tasks are scheduled or running, some may be pending, some may be finished. */
/** Job is running and begine to scheduler pipeline running. */
RUNNING(EndState.NOT_END),

/** The job has failed and is currently waiting for the cleanup to complete. */
Expand All @@ -42,26 +42,14 @@ public enum JobStatus {
FAILED(EndState.GLOBALLY),

/** Job is being cancelled. */
CANCELLING(EndState.NOT_END),
CANCELING(EndState.NOT_END),

/** Job has been cancelled. */
CANCELED(EndState.GLOBALLY),

/** All of the job's tasks have successfully finished. */
FINISHED(EndState.GLOBALLY),

/** The job is currently undergoing a reset and total restart. */
RESTARTING(EndState.NOT_END),

/**
* The job has been suspended which means that it has been stopped but not been removed from a
* potential HA job store.
*/
SUSPENDED(EndState.LOCALLY),

/** The job is currently reconciling and waits for task execution report to recover state. */
RECONCILING(EndState.NOT_END),

/** Cannot find the JobID or the job status has already been cleared. */
UNKNOWABLE(EndState.GLOBALLY);

Expand All @@ -79,30 +67,6 @@ private enum EndState {
this.endState = endState;
}

/**
* Checks whether this state is <i>globally terminal</i>. A globally terminal job is complete
* and cannot fail any more and will not be restarted or recovered by another standby master
* node.
*
* <p>When a globally terminal state has been reached, all recovery data for the job is dropped
* from the high-availability services.
*
* @return True, if this job status is globally terminal, false otherwise.
*/
public boolean isGloballyEndState() {
return endState == EndState.GLOBALLY;
}

/**
* Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the state of
* a job's execution graph within an executing JobManager. If the execution graph is locally
* terminal, the JobManager will not continue executing or recovering the job.
*
* <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
* which is typically entered when the executing JobManager loses its leader status.
*
* @return True, if this job status is terminal, false otherwise.
*/
public boolean isEndState() {
return endState != EndState.NOT_END;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public enum PipelineStatus {

CANCELED,

FAILED,
FAILING,

RECONCILING,
FAILED,

/** Restoring last possible valid state of the pipeline if it has it. */
INITIALIZING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,92 +275,37 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
metricsImap,
engineConfig);

// If Job Status is CANCELLING , set needRestore to false
try {
jobMaster.init(
runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
true,
!JobStatus.CANCELLING.equals(jobStatus));
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), true);
} catch (Exception e) {
throw new SeaTunnelEngineException(String.format("Job id %s init failed", jobId), e);
}

String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
if (jobStatus.isEndState()) {
logger.info(
String.format(
"The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
jobFullName, jobStatus));
jobMaster.cleanJob();
return;
}

if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
CompletableFuture.runAsync(
() -> {
logger.info(
String.format(
"The restore %s is state %s, cancel job and submit it again.",
jobFullName, jobStatus));
jobMaster.cancelJob();
jobMaster.getJobMasterCompleteFuture().join();
submitJob(jobId, jobInfo.getJobImmutableInformation()).join();
},
executorService);

return;
}

runningJobMasterMap.put(jobId, jobMaster);
jobMaster.markRestore();

if (JobStatus.CANCELLING.equals(jobStatus)) {
logger.info(
String.format(
"The restore %s is in %s state, cancel the job",
jobFullName, jobStatus));
CompletableFuture.runAsync(
() -> {
try {
jobMaster.cancelJob();
jobMaster.run();
} finally {
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
}
},
executorService);
return;
}

if (JobStatus.RUNNING.equals(jobStatus)) {
logger.info(
String.format(
"The restore %s is in %s state, restore pipeline and take over this job running",
jobFullName, jobStatus));
CompletableFuture.runAsync(
() -> {
try {
jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
logger.info(
String.format(
"The restore %s is in %s state, restore pipeline and take over this job running",
jobFullName, jobStatus));
CompletableFuture.runAsync(
() -> {
try {
jobMaster
.getPhysicalPlan()
.getPipelineList()
.forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
// voidCompletableFuture will be cancelled when zeta master node
// shutdown to simulate master failure,
// don't update runningJobMasterMap is this case.
if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
runningJobMasterMap.remove(jobId);
}
},
executorService);
}
}
},
executorService);
}

private void checkNewActiveMaster() {
Expand Down Expand Up @@ -390,10 +335,11 @@ private void checkNewActiveMaster() {
}
}

private void clearCoordinatorService() {
public synchronized void clearCoordinatorService() {
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
executorService.shutdownNow();
runningJobMasterMap.clear();

try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
Expand Down Expand Up @@ -457,9 +403,7 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
jobMaster.init(
runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
false,
true);
runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
// We specify that when init is complete, the submitJob is complete
jobSubmitFuture.complete(null);
} catch (Throwable e) {
Expand Down Expand Up @@ -690,7 +634,7 @@ private void makeTasksFailed(
|| executionState.equals(ExecutionState.RUNNING)
|| executionState.equals(ExecutionState.CANCELING))) {
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
physicalVertex.updateTaskExecutionState(
physicalVertex.updateStateByExecutionService(
new TaskExecutionState(
taskGroupLocation,
ExecutionState.FAILED,
Expand Down Expand Up @@ -759,9 +703,6 @@ public void printJobDetailInfo() {
case CREATED:
createdJobCount.addAndGet(1);
break;
case SCHEDULED:
scheduledJobCount.addAndGet(1);
break;
case RUNNING:
runningJobCount.addAndGet(1);
break;
Expand All @@ -771,7 +712,7 @@ public void printJobDetailInfo() {
case FAILED:
failedJobCount.addAndGet(1);
break;
case CANCELLING:
case CANCELING:
cancellingJobCount.addAndGet(1);
break;
case CANCELED:
Expand All @@ -780,15 +721,6 @@ public void printJobDetailInfo() {
case FINISHED:
finishedJobCount.addAndGet(1);
break;
case RESTARTING:
restartingJobCount.addAndGet(1);
break;
case SUSPENDED:
suspendedJobCount.addAndGet(1);
break;
case RECONCILING:
reconcilingJobCount.addAndGet(1);
break;
default:
}
}
Expand Down
Loading

0 comments on commit f7fff08

Please sign in to comment.