Skip to content

Commit

Permalink
parallel container startup with deferred values
Browse files Browse the repository at this point in the history
This commit tries to be as unobtrusive as possible, attaching new behavior to existing types where possible rather than building out new infrastructure.

constructorNode returns a deferred value when called. On the first call, it asks paramList to start building an arg slice, which may also be deferred.

Once the arg slice is resolved, constructorNode schedules its constructor function to be called. Once it's called, it resolves its own deferral.

Multiple paramSingles can observe the same constructorNode before it's ready. If there's an error, they may all see the same error, which is a change in behavior.

There are two schedulers: synchronous and parallel. The synchronous scheduler returns things in the same order as before. The parallel may not (and the tests that rely on shuffle order will fail). The scheduler needs to be flushed after deferred values are created. The synchronous scheduler does nothing on when flushing, but the parallel scheduler runs a pool of goroutines to resolve constructors.

Calls to dig functions always happen on the same goroutine as Scope.Invoke(). Calls to constructor functions can happen on pooled goroutines.

The choice of scheduler is up to the Scope. Whether constructor functions are safe to call in parallel seems most logically to be a property of the scope, and the scope is passed down the constructor/param call chain.
  • Loading branch information
xandris committed Jan 27, 2022
1 parent c0dbff4 commit 3790411
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 142 deletions.
56 changes: 27 additions & 29 deletions constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,36 @@ func (n *constructorNode) Call(c containerStore) *deferred {
}

var args []reflect.Value
d := n.paramList.BuildList(c, false /* decorating */, &args)

d.observe(func(err error) {
if err != nil {
n.calling = false
n.deferred.resolve(errArgumentsFailed{
Func: n.location,
Reason: err,
})
return
}

var results []reflect.Value
var results []reflect.Value

c.scheduler().schedule(func() {
n.paramList.BuildList(c, false /* decorating */, &args).catch(func(err error) error {
return errArgumentsFailed{
Func: n.location,
Reason: err,
}
}).then(func() *deferred {
return c.scheduler().schedule(func() {
results = c.invoker()(reflect.ValueOf(n.ctor), args)
}).observe(func(_ error) {
n.calling = false
receiver := newStagingContainerWriter()
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
n.deferred.resolve(errConstructorFailed{Func: n.location, Reason: err})
return
}

// Commit the result to the original container that this constructor
// was supplied to. The provided container is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.called = true
n.deferred.resolve(nil)
})
}).then(func() *deferred {
receiver := newStagingContainerWriter()
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
return failedDeferred(errConstructorFailed{Func: n.location, Reason: err})
}

// Commit the result to the original container that this constructor
// was supplied to. The provided container is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.calling = false
n.called = true
n.deferred.resolve(nil)
return &alreadyResolved
}).catch(func(err error) error {
n.calling = false
n.deferred.resolve(err)
return nil
})

return &n.deferred
Expand Down
70 changes: 52 additions & 18 deletions decorate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

type decorator interface {
Call(c containerStore) error
Call(c containerStore) *deferred
ID() dot.CtorID
}

Expand All @@ -42,12 +42,18 @@ type decoratorNode struct {
// Location where this function was defined.
location *digreflect.Func

// Whether this node is already building its paramList and calling the constructor
calling bool

// Whether the decorator owned by this node was already called.
called bool

// Parameters of the decorator.
params paramList

// The result of calling the constructor
deferred deferred

// Results of the decorator.
results resultList

Expand Down Expand Up @@ -86,32 +92,60 @@ func newDecoratorNode(dcor interface{}, s *Scope) (*decoratorNode, error) {
return n, nil
}

func (n *decoratorNode) Call(s containerStore) error {
if n.called {
return nil
// Call calls this decorator if it hasn't already been called and injects any values produced by it into the container
// passed to newConstructorNode.
//
// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has
// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the
// call again if it failed last time.
//
// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its
// zero value; don't store the returned pointer. (It will still call each observer only once.)
func (n *decoratorNode) Call(s containerStore) *deferred {
if n.calling || n.called {
return &n.deferred
}

n.calling = true
n.deferred = deferred{}

if err := shallowCheckDependencies(s, n.params); err != nil {
return errMissingDependencies{
n.deferred.resolve(errMissingDependencies{
Func: n.location,
Reason: err,
}
})
}

args, err := n.params.BuildList(n.s, true /* decorating */)
if err != nil {
return errArgumentsFailed{
Func: n.location,
Reason: err,
var args []reflect.Value
d := n.params.BuildList(s, true /* decorating */, &args)

d.observe(func(err error) {
if err != nil {
n.calling = false
n.deferred.resolve(errArgumentsFailed{
Func: n.location,
Reason: err,
})
return
}
}

results := reflect.ValueOf(n.dcor).Call(args)
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
return err
}
n.called = true
return nil
var results []reflect.Value

s.scheduler().schedule(func() {
results = s.invoker()(reflect.ValueOf(n.dcor), args)
}).observe(func(_ error) {
n.calling = false
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
n.deferred.resolve(err)
return
}

n.called = true
n.deferred.resolve(nil)
})
})

return &n.deferred
}

func (n *decoratorNode) ID() dot.CtorID { return n.id }
Expand Down
21 changes: 16 additions & 5 deletions deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,27 @@ func (d *deferred) resolve(err error) {
d.observers = nil
}

// then returns a new deferred that is either resolved with the same error as this deferred, or any error returned from
// the supplied function. The supplied function is only called if this deferred is resolved without error.
func (d *deferred) then(res func() error) *deferred {
// then returns a new deferred that is either resolved with the same error as this deferred or the eventual result of
// the deferred returned by res.
func (d *deferred) then(res func() *deferred) *deferred {
// Shortcut: if we're settled...
if d.settled {
if d.err == nil {
// ...successfully, then return the other deferred
return res()
} else {
// ...with an error, then return us
return d
}
}

d2 := new(deferred)
d.observe(func(err error) {
if err != nil {
d2.resolve(err)
return
} else {
res().observe(d2.resolve)
}
d2.resolve(res())
})
return d2
}
Expand Down
2 changes: 1 addition & 1 deletion invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *Scope) Invoke(function interface{}, opts ...InvokeOption) error {

var args []reflect.Value

d := pl.BuildList(s, &args, false /* decorating */)
d := pl.BuildList(s, false /* decorating */, &args)
d.observe(func(err2 error) {
err = err2
})
Expand Down
Loading

0 comments on commit 3790411

Please sign in to comment.