Skip to content

Commit

Permalink
Move WorkGroup increment
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeckman314 committed Jul 12, 2023
1 parent f6718e8 commit 5e9ba38
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/node/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger) error {
if err != nil {
return err
}
err = w.Run(ctx)
err = w.Run(ctx, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Opti
if err != nil {
return err
}
return w.Run(ctx)
return w.Run(ctx, nil)
}

// NewWorker returns a new Funnel worker based on the given config.
Expand Down
2 changes: 1 addition & 1 deletion compute/local/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (b *Backend) Submit(task *tes.Task) error {
go func() { // TODO: This is the async goroutine that is calling Closed() on the worker.
// waitgroup to block closing the worker until the last event is written.
var wg sync.WaitGroup
wg.Add(1)
// wg.Add(1)

err = w.Run(ctx, &wg)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions database/mongodb/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

"runtime/debug"

"github.com/globalsign/mgo"
"github.com/ohsu-comp-bio/funnel/compute/scheduler"
"github.com/ohsu-comp-bio/funnel/config"
Expand Down Expand Up @@ -116,7 +114,7 @@ func (db *MongoDB) Init() error {
// Close closes the database session.
func (db *MongoDB) Close() {
// TODO: Print stack info for calling function
debug.PrintStack()
// debug.PrintStack()
if db.active {
db.sess.Close()
}
Expand Down
15 changes: 12 additions & 3 deletions events/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,15 @@ func StreamLogTail(ctx context.Context, taskID string, attempt, index uint32, si
// whole buffer (log tail) every tick, so when the output event writer
// catches up, it will write the new, complete tail.


// output event writer routine
// fmt.Println("Calling wg.Add(1)")
if wg == nil {
wg = new(sync.WaitGroup)
}
wg.Add(1)
go func() {
if (wg != nil) {
defer wg.Done()
}
defer done(wg)

for e := range eventch {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down Expand Up @@ -294,6 +298,11 @@ func StreamLogTail(ctx context.Context, taskID string, attempt, index uint32, si
return &logTailWriter{stdoutch}, &logTailWriter{stderrch}
}

func done(wg *sync.WaitGroup) {
// fmt.Println("Calling wg.Done()")
wg.Done()
}

type logTailWriter struct {
ch chan<- []byte
}
Expand Down
2 changes: 1 addition & 1 deletion tests/core/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func TestMetadataEvent(t *testing.T) {
t.Error("error writing event", "error", err, "taskID", id)
}

err = w.Run(ctx)
err = w.Run(ctx, nil)
if err != nil {
t.Error("error running task", "error", err, "taskID", id)
}
Expand Down
14 changes: 7 additions & 7 deletions tests/core/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestLargeLogRate(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := w.Run(ctx)
err := w.Run(ctx, nil)
if err != nil {
t.Log(err)
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestZeroLogRate(t *testing.T) {
EventWriter: m,
}

err := w.Run(context.Background())
err := w.Run(context.Background(), nil)
if err != nil {
t.Log(err)
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestZeroLogTailSize(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := w.Run(ctx)
err := w.Run(ctx, nil)
if err != nil {
t.Log(err)
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestLogTailContent(t *testing.T) {
EventWriter: m,
}

err := w.Run(context.Background())
err := w.Run(context.Background(), nil)
if err != nil {
t.Error("unexpected worker.Run error", err)
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestDockerContainerMetadata(t *testing.T) {
EventWriter: m,
}

err := w.Run(context.Background())
err := w.Run(context.Background(), nil)
if err != nil {
t.Error("unexpected worker.Run error", err)
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestWorkerRunFileTaskReader(t *testing.T) {
}
worker.EventWriter = &events.MultiWriter{b, worker.EventWriter}

err = worker.Run(ctx)
err = worker.Run(ctx, nil)
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestWorkerRunBase64TaskReader(t *testing.T) {
}
worker.EventWriter = &events.MultiWriter{b, worker.EventWriter}

err = worker.Run(ctx)
err = worker.Run(ctx, nil)
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestManualBackend(t *testing.T) {
if err != nil {
return err
}
return w.Run(ctx)
return w.Run(ctx, nil)
}

n, err := scheduler.NewNodeProcess(ctx, srv.Conf, factory, log)
Expand Down
8 changes: 2 additions & 6 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type DefaultWorker struct {

// Run runs the Worker.
// Using a variadic WaitGroup pointer to allow for passing in a nil WaitGroup.
func (r *DefaultWorker) Run(pctx context.Context, wg ...*sync.WaitGroup) (runerr error) {
func (r *DefaultWorker) Run(pctx context.Context, wg *sync.WaitGroup) (runerr error) {

// The code here is verbose, but simple; mainly loops and simple error checking.
//
Expand Down Expand Up @@ -204,11 +204,7 @@ func (r *DefaultWorker) Run(pctx context.Context, wg ...*sync.WaitGroup) (runerr
}

if run.ok() || ignoreError {
if wg != nil && wg[0] != nil {
run.execerr = s.Run(ctx, wg[0])
} else {
run.execerr = s.Run(ctx, nil)
}
run.execerr = s.Run(ctx, wg)
}

ignoreError = d.GetIgnoreError()
Expand Down

0 comments on commit 5e9ba38

Please sign in to comment.