From 173badd9e35dd119c67f54105d2e71955763a990 Mon Sep 17 00:00:00 2001 From: avneet15 Date: Tue, 19 Dec 2023 11:02:33 -0800 Subject: [PATCH] Change app status to rollback on job start timeout (#298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Once a new Flink job is submitted, it’s possible that the job vertices are not in the running state by the configured timeout period of 3 minutes. There exists a bug in the flinkk8scontroller where the app will get stuck in this phase, instead of being rolled back and marked as DeployFailed. Slack [thread](https://lyft.slack.com/archives/CPN3NG47M/p1701461663432759?thread_ts=1701458116.814489&cid=CPN3NG47M) --- .../flinkapplication/flink_state_machine.go | 4 +++- .../flinkapplication/flink_state_machine_test.go | 14 ++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 1b935907..5424d63d 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -784,7 +784,9 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta if err != nil { logger.Info(ctx, "Job monitoring failed with error: %v", err) s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error()) - return statusUnchanged, err + s.flinkController.UpdateLatestJobID(ctx, app, "") + s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) + return statusChanged, err } if jobStarted { return updateJobAndReturn(ctx, s, app, hash) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 54c893a5..23eceff6 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -759,13 +759,16 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object k8sclient.Object) error { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) } statusUpdateCount++ return nil @@ -925,13 +928,16 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object k8sclient.Object) error { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } statusUpdateCount++ return nil