diff --git a/service/history/api/consistency_checker.go b/service/history/api/consistency_checker.go index 95d8574b7cd..43a900798a4 100644 --- a/service/history/api/consistency_checker.go +++ b/service/history/api/consistency_checker.go @@ -117,7 +117,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease( if reqClock != nil { currentClock := c.shardContext.CurrentVectorClock() if vclock.Comparable(reqClock, currentClock) { - return c.getWorkflowContextValidatedByClock( + return c.getWorkflowLeaseValidatedByClock( ctx, reqClock, currentClock, @@ -134,7 +134,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease( // shard ownership asserted boolean keep tracks whether AssertOwnership is already caller shardOwnershipAsserted := false if len(workflowKey.RunID) != 0 { - return c.getWorkflowContextValidatedByCheck( + return c.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, consistencyPredicate, @@ -152,7 +152,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease( ) } -func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( +func (c *WorkflowConsistencyCheckerImpl) getWorkflowLeaseValidatedByClock( ctx context.Context, reqClock *clockspb.VectorClock, currentClock *clockspb.VectorClock, @@ -194,7 +194,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock( return NewWorkflowLease(wfContext, release, mutableState), nil } -func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck( +func (c *WorkflowConsistencyCheckerImpl) getWorkflowLeaseValidatedByCheck( ctx context.Context, shardOwnershipAsserted *bool, consistencyPredicate MutableStateConsistencyPredicate, @@ -269,7 +269,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( if err != nil { return nil, err } - wfContext, err := c.getWorkflowContextValidatedByCheck( + workflowLease, err := c.getWorkflowLeaseValidatedByCheck( ctx, shardOwnershipAsserted, consistencyPredicate, @@ -279,8 +279,8 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( if err != nil { return nil, err } - if wfContext.GetMutableState().IsWorkflowExecutionRunning() { - return wfContext, nil + if workflowLease.GetMutableState().IsWorkflowExecutionRunning() { + return workflowLease, nil } currentRunID, err := c.getCurrentRunID( @@ -291,14 +291,14 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext( lockPriority, ) if err != nil { - wfContext.GetReleaseFn()(err) + workflowLease.GetReleaseFn()(err) return nil, err } - if currentRunID == wfContext.GetContext().GetWorkflowKey().RunID { - return wfContext, nil + if currentRunID == workflowLease.GetContext().GetWorkflowKey().RunID { + return workflowLease, nil } - wfContext.GetReleaseFn()(nil) + workflowLease.GetReleaseFn()(nil) return nil, consts.ErrLocateCurrentWorkflowExecution } diff --git a/service/history/api/consistency_checker_test.go b/service/history/api/consistency_checker_test.go index 7431060cf21..98616339680 100644 --- a/service/history/api/consistency_checker_test.go +++ b/service/history/api/consistency_checker_test.go @@ -125,7 +125,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState, nil) - workflowContext, err := s.checker.getWorkflowContextValidatedByCheck( + workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, BypassMutableStateConsistencyPredicate, @@ -133,7 +133,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck workflow.LockPriorityHigh, ) s.NoError(err) - s.Equal(mutableState, workflowContext.GetMutableState()) + s.Equal(mutableState, workflowLease.GetMutableState()) s.False(released) } @@ -163,7 +163,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState2, nil), ) - workflowContext, err := s.checker.getWorkflowContextValidatedByCheck( + workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, @@ -171,7 +171,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck workflow.LockPriorityHigh, ) s.NoError(err) - s.Equal(mutableState2, workflowContext.GetMutableState()) + s.Equal(mutableState2, workflowLease.GetMutableState()) s.False(released) } @@ -197,7 +197,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck s.shardContext.EXPECT().AssertOwnership(ctx).Return(nil) - workflowContext, err := s.checker.getWorkflowContextValidatedByCheck( + workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, @@ -205,7 +205,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck workflow.LockPriorityHigh, ) s.IsType(&serviceerror.NotFound{}, err) - s.Nil(workflowContext) + s.Nil(workflowLease) s.True(released) } @@ -231,7 +231,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck s.shardContext.EXPECT().AssertOwnership(ctx).Return(&persistence.ShardOwnershipLostError{}) - workflowContext, err := s.checker.getWorkflowContextValidatedByCheck( + workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, @@ -239,7 +239,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck workflow.LockPriorityHigh, ) s.IsType(&persistence.ShardOwnershipLostError{}, err) - s.Nil(workflowContext) + s.Nil(workflowLease) s.True(released) } @@ -263,7 +263,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck ).Return(wfContext, releaseFn, nil) wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(nil, serviceerror.NewUnavailable("")) - workflowContext, err := s.checker.getWorkflowContextValidatedByCheck( + workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck( ctx, &shardOwnershipAsserted, FailMutableStateConsistencyPredicate, @@ -271,7 +271,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck workflow.LockPriorityHigh, ) s.IsType(&serviceerror.Unavailable{}, err) - s.Nil(workflowContext) + s.Nil(workflowLease) s.True(released) } diff --git a/service/history/api/verifychildworkflowcompletionrecorded/api.go b/service/history/api/verifychildworkflowcompletionrecorded/api.go index 1ec1ea36e87..61df63b0a09 100644 --- a/service/history/api/verifychildworkflowcompletionrecorded/api.go +++ b/service/history/api/verifychildworkflowcompletionrecorded/api.go @@ -49,7 +49,7 @@ func Invoke( return nil, err } - workflowContext, err := workflowConsistencyChecker.GetWorkflowLease( + workflowLease, err := workflowConsistencyChecker.GetWorkflowLease( ctx, request.Clock, // it's ok we have stale state when doing verification, @@ -67,9 +67,9 @@ func Invoke( if err != nil { return nil, err } - defer func() { workflowContext.GetReleaseFn()(retError) }() + defer func() { workflowLease.GetReleaseFn()(retError) }() - mutableState := workflowContext.GetMutableState() + mutableState := workflowLease.GetMutableState() if !mutableState.IsWorkflowExecutionRunning() && mutableState.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { // parent has already completed and can't be blocked after failover. diff --git a/service/history/api/workflow_context.go b/service/history/api/workflow_lease.go similarity index 100% rename from service/history/api/workflow_context.go rename to service/history/api/workflow_lease.go diff --git a/service/history/workflow_task_handler_callbacks.go b/service/history/workflow_task_handler_callbacks.go index 75d6bfa206c..4461b024a96 100644 --- a/service/history/workflow_task_handler_callbacks.go +++ b/service/history/workflow_task_handler_callbacks.go @@ -398,7 +398,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, consts.ErrDeserializingToken } - workflowContext, err := handler.workflowConsistencyChecker.GetWorkflowLease( + workflowLease, err := handler.workflowConsistencyChecker.GetWorkflowLease( ctx, token.Clock, func(mutableState workflow.MutableState) bool { @@ -421,8 +421,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( if err != nil { return nil, err } - weContext := workflowContext.GetContext() - ms := workflowContext.GetMutableState() + weContext := workflowLease.GetContext() + ms := workflowLease.GetMutableState() currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId()) if !ms.IsWorkflowExecutionRunning() || @@ -433,11 +433,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( currentWorkflowTask.Attempt != token.Attempt || (token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) { // we have not alter mutable state yet, so release with it with nil to avoid clear MS. - workflowContext.GetReleaseFn()(nil) + workflowLease.GetReleaseFn()(nil) return nil, serviceerror.NewNotFound("Workflow task not found.") } - defer func() { workflowContext.GetReleaseFn()(retError) }() + defer func() { workflowLease.GetReleaseFn()(retError) }() var effects effect.Buffer defer func() { @@ -834,13 +834,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( if wtHeartbeatTimedOut { // at this point, update is successful, but we still return an error to client so that the worker will give up this workflow // release workflow lock with nil error to prevent mutable state from being cleared and reloaded - workflowContext.GetReleaseFn()(nil) + workflowLease.GetReleaseFn()(nil) return nil, serviceerror.NewNotFound("workflow task heartbeat timeout") } if wtFailedCause != nil { // release workflow lock with nil error to prevent mutable state from being cleared and reloaded - workflowContext.GetReleaseFn()(nil) + workflowLease.GetReleaseFn()(nil) return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) }