Skip to content

Commit

Permalink
Rename WorkflowContext to WorkflowLease #2 (temporalio#5418)
Browse files Browse the repository at this point in the history
## What changed?

Renamed remaining api.WorkflowContext to api.WorkflowLease. (including
the file name)

## Why?

Missed a couple earlier in
temporalio#5386

## How did you test it?

Compiler

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Feb 14, 2024
1 parent 4b47ecd commit 0c2cb7a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
22 changes: 11 additions & 11 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease(
if reqClock != nil {
currentClock := c.shardContext.CurrentVectorClock()
if vclock.Comparable(reqClock, currentClock) {
return c.getWorkflowContextValidatedByClock(
return c.getWorkflowLeaseValidatedByClock(
ctx,
reqClock,
currentClock,
Expand All @@ -134,7 +134,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease(
// shard ownership asserted boolean keep tracks whether AssertOwnership is already caller
shardOwnershipAsserted := false
if len(workflowKey.RunID) != 0 {
return c.getWorkflowContextValidatedByCheck(
return c.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
consistencyPredicate,
Expand All @@ -152,7 +152,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease(
)
}

func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
func (c *WorkflowConsistencyCheckerImpl) getWorkflowLeaseValidatedByClock(
ctx context.Context,
reqClock *clockspb.VectorClock,
currentClock *clockspb.VectorClock,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
return NewWorkflowLease(wfContext, release, mutableState), nil
}

func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
func (c *WorkflowConsistencyCheckerImpl) getWorkflowLeaseValidatedByCheck(
ctx context.Context,
shardOwnershipAsserted *bool,
consistencyPredicate MutableStateConsistencyPredicate,
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
if err != nil {
return nil, err
}
wfContext, err := c.getWorkflowContextValidatedByCheck(
workflowLease, err := c.getWorkflowLeaseValidatedByCheck(
ctx,
shardOwnershipAsserted,
consistencyPredicate,
Expand All @@ -279,8 +279,8 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
if err != nil {
return nil, err
}
if wfContext.GetMutableState().IsWorkflowExecutionRunning() {
return wfContext, nil
if workflowLease.GetMutableState().IsWorkflowExecutionRunning() {
return workflowLease, nil
}

currentRunID, err := c.getCurrentRunID(
Expand All @@ -291,14 +291,14 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
lockPriority,
)
if err != nil {
wfContext.GetReleaseFn()(err)
workflowLease.GetReleaseFn()(err)
return nil, err
}
if currentRunID == wfContext.GetContext().GetWorkflowKey().RunID {
return wfContext, nil
if currentRunID == workflowLease.GetContext().GetWorkflowKey().RunID {
return workflowLease, nil
}

wfContext.GetReleaseFn()(nil)
workflowLease.GetReleaseFn()(nil)
return nil, consts.ErrLocateCurrentWorkflowExecution
}

Expand Down
20 changes: 10 additions & 10 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState, nil)

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState, workflowContext.GetMutableState())
s.Equal(mutableState, workflowLease.GetMutableState())
s.False(released)
}

Expand Down Expand Up @@ -163,15 +163,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState2, nil),
)

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState2, workflowContext.GetMutableState())
s.Equal(mutableState2, workflowLease.GetMutableState())
s.False(released)
}

Expand All @@ -197,15 +197,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.shardContext.EXPECT().AssertOwnership(ctx).Return(nil)

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.NotFound{}, err)
s.Nil(workflowContext)
s.Nil(workflowLease)
s.True(released)
}

Expand All @@ -231,15 +231,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.shardContext.EXPECT().AssertOwnership(ctx).Return(&persistence.ShardOwnershipLostError{})

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&persistence.ShardOwnershipLostError{}, err)
s.Nil(workflowContext)
s.Nil(workflowLease)
s.True(released)
}

Expand All @@ -263,15 +263,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(nil, serviceerror.NewUnavailable(""))

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
workflowLease, err := s.checker.getWorkflowLeaseValidatedByCheck(
ctx,
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.Unavailable{}, err)
s.Nil(workflowContext)
s.Nil(workflowLease)
s.True(released)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Invoke(
return nil, err
}

workflowContext, err := workflowConsistencyChecker.GetWorkflowLease(
workflowLease, err := workflowConsistencyChecker.GetWorkflowLease(
ctx,
request.Clock,
// it's ok we have stale state when doing verification,
Expand All @@ -67,9 +67,9 @@ func Invoke(
if err != nil {
return nil, err
}
defer func() { workflowContext.GetReleaseFn()(retError) }()
defer func() { workflowLease.GetReleaseFn()(retError) }()

mutableState := workflowContext.GetMutableState()
mutableState := workflowLease.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() &&
mutableState.GetExecutionState().State != enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
// parent has already completed and can't be blocked after failover.
Expand Down
File renamed without changes.
14 changes: 7 additions & 7 deletions service/history/workflow_task_handler_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
return nil, consts.ErrDeserializingToken
}

workflowContext, err := handler.workflowConsistencyChecker.GetWorkflowLease(
workflowLease, err := handler.workflowConsistencyChecker.GetWorkflowLease(
ctx,
token.Clock,
func(mutableState workflow.MutableState) bool {
Expand All @@ -421,8 +421,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
if err != nil {
return nil, err
}
weContext := workflowContext.GetContext()
ms := workflowContext.GetMutableState()
weContext := workflowLease.GetContext()
ms := workflowLease.GetMutableState()

currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())
if !ms.IsWorkflowExecutionRunning() ||
Expand All @@ -433,11 +433,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
currentWorkflowTask.Attempt != token.Attempt ||
(token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) {
// we have not alter mutable state yet, so release with it with nil to avoid clear MS.
workflowContext.GetReleaseFn()(nil)
workflowLease.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

defer func() { workflowContext.GetReleaseFn()(retError) }()
defer func() { workflowLease.GetReleaseFn()(retError) }()

var effects effect.Buffer
defer func() {
Expand Down Expand Up @@ -834,13 +834,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
if wtHeartbeatTimedOut {
// at this point, update is successful, but we still return an error to client so that the worker will give up this workflow
// release workflow lock with nil error to prevent mutable state from being cleared and reloaded
workflowContext.GetReleaseFn()(nil)
workflowLease.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("workflow task heartbeat timeout")
}

if wtFailedCause != nil {
// release workflow lock with nil error to prevent mutable state from being cleared and reloaded
workflowContext.GetReleaseFn()(nil)
workflowLease.GetReleaseFn()(nil)
return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message())
}

Expand Down

0 comments on commit 0c2cb7a

Please sign in to comment.