Skip to content
This repository has been archived by the owner on Oct 2, 2020. It is now read-only.

get rid of sync.Map for better go version compatibility #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: go
go:
- 1.9
- 1.8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should stick to 1.9 unless there's a huge demand for supporting 1.8.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the tradeoff between supporting more versions and losing the 1.9 specific features, but I agree we can punt on this PR and evaluate the tradeoff when demand for 1.8 (or below) really comes.

env:
global:
- TEST_TIMEOUT_SCALE=40
Expand Down
19 changes: 11 additions & 8 deletions participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ type participant struct {
keyBuilder *KeyBuilder
zkClient *uzk.Client
// Mirrors org.apache.helix.participant.HelixStateMachineEngine
// stateModelName->stateModelProcessor
stateModelProcessors sync.Map
stateModelProcessorsMu sync.Mutex
stateModelProcessors map[string]*StateModelProcessor // state model name to processor
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's avoid inline comment

stateModelProcessorLocks map[string]*sync.Mutex
stateModel StateModel
sync.Mutex
dataAccessor *DataAccessor
dataAccessor *DataAccessor

// fatalErrChan would notify user when a fatal error occurs
fatalErrChan chan error
Expand Down Expand Up @@ -122,6 +121,7 @@ func NewParticipant(
port: port,
keyBuilder: keyBuilder,
zkClient: zkClient,
stateModelProcessors: make(map[string]*StateModelProcessor),
stateModelProcessorLocks: make(map[string]*sync.Mutex),
dataAccessor: newDataAccessor(zkClient, keyBuilder),
stateModel: NewStateModel(),
Expand Down Expand Up @@ -159,7 +159,9 @@ func (p *participant) IsConnected() bool {

// RegisterStateModel associates state trasition functions with the participant
func (p *participant) RegisterStateModel(stateModelName string, processor *StateModelProcessor) {
p.stateModelProcessors.Store(stateModelName, processor)
p.stateModelProcessorsMu.Lock()
p.stateModelProcessors[stateModelName] = processor
p.stateModelProcessorsMu.Unlock()
p.stateModelProcessorLocks[stateModelName] = &sync.Mutex{}
}

Expand Down Expand Up @@ -435,7 +437,7 @@ func (p *participant) handleMsg(msg *model.Message) error {
mu, ok := p.stateModelProcessorLocks[msg.GetStateModelDef()]
if !ok {
p.logger.Error("failed to find state model in stateModelProcessorLocks",
zap.String("StateModelDefinition", msg.GetStateModelDef()),
zap.String("stateModelDefinition", msg.GetStateModelDef()),
zap.Any("stateModelProcessorLocks", p.stateModelProcessorLocks))
return errMsgMissingStateModelDef
}
Expand Down Expand Up @@ -556,8 +558,9 @@ func (p *participant) handleStateTransition(msg *model.Message) error {
// set the msg execution time
msg.SetExecuteStartTime(time.Now())

if val, ok := p.stateModelProcessors.Load(msg.GetStateModelDef()); ok {
processor := val.(*StateModelProcessor)
p.stateModelProcessorsMu.Lock()
defer p.stateModelProcessorsMu.Unlock()
if processor, ok := p.stateModelProcessors[msg.GetStateModelDef()]; ok {
if toStateHandler, ok := processor.Transitions[fromState]; ok {
if handler, ok := toStateHandler[toState]; ok {
// TODO: deal with handler error
Expand Down