diff --git a/ctx.go b/ctx.go index 607a678ff8..23c409b5ae 100644 --- a/ctx.go +++ b/ctx.go @@ -5,6 +5,7 @@ package fiber import ( + "bufio" "bytes" "context" "crypto/tls" @@ -1669,6 +1670,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error { return nil } +// SendStreamWriter sets response body stream writer +func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error { + c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter)) + + return nil +} + // Set sets the response's HTTP header field to the specified key, value. func (c *DefaultCtx) Set(key, val string) { c.fasthttp.Response.Header.Set(key, val) diff --git a/ctx_interface_gen.go b/ctx_interface_gen.go index 62f2d368ad..194cdc4e1f 100644 --- a/ctx_interface_gen.go +++ b/ctx_interface_gen.go @@ -3,6 +3,7 @@ package fiber import ( + "bufio" "context" "crypto/tls" "io" @@ -282,6 +283,8 @@ type Ctx interface { SendString(body string) error // SendStream sets response body stream and optional body size. SendStream(stream io.Reader, size ...int) error + // SendStreamWriter sets response body stream writer + SendStreamWriter(streamWriter func(*bufio.Writer)) error // Set sets the response's HTTP header field to the specified key, value. Set(key, val string) setCanonical(key, val string) diff --git a/ctx_test.go b/ctx_test.go index a94e4cb42b..da061b9e8e 100644 --- a/ctx_test.go +++ b/ctx_test.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "testing" "text/template" "time" @@ -4343,6 +4344,73 @@ func Test_Ctx_SendStream(t *testing.T) { require.Equal(t, "Hello bufio", string(c.Response().Body())) } +// go test -run Test_Ctx_SendStreamWriter +func Test_Ctx_SendStreamWriter(t *testing.T) { + t.Parallel() + app := New() + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + + err := c.SendStreamWriter(func(w *bufio.Writer) { + w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error + }) + require.NoError(t, err) + require.Equal(t, "Don't crash please", string(c.Response().Body())) + + err = c.SendStreamWriter(func(w *bufio.Writer) { + for lineNum := 1; lineNum <= 5; lineNum++ { + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error + if err := w.Flush(); err != nil { + t.Errorf("unexpected error: %s", err) + return + } + } + }) + require.NoError(t, err) + require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body())) + + err = c.SendStreamWriter(func(_ *bufio.Writer) {}) + require.NoError(t, err) + require.Empty(t, c.Response().Body()) +} + +// go test -run Test_Ctx_SendStreamWriter_Interrupted +func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) { + t.Parallel() + app := New() + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + + var mutex sync.Mutex + startChan := make(chan bool) + interruptStreamWriter := func() { + <-startChan + time.Sleep(5 * time.Millisecond) + mutex.Lock() + c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error + mutex.Unlock() + } + err := c.SendStreamWriter(func(w *bufio.Writer) { + go interruptStreamWriter() + + startChan <- true + for lineNum := 1; lineNum <= 5; lineNum++ { + mutex.Lock() + fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error + mutex.Unlock() + + if err := w.Flush(); err != nil { + if lineNum < 3 { + t.Errorf("unexpected error: %s", err) + } + return + } + + time.Sleep(1500 * time.Microsecond) + } + }) + require.NoError(t, err) + require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body())) +} + // go test -run Test_Ctx_Set func Test_Ctx_Set(t *testing.T) { t.Parallel() diff --git a/docs/api/ctx.md b/docs/api/ctx.md index 00c2422cd9..81a7ef178a 100644 --- a/docs/api/ctx.md +++ b/docs/api/ctx.md @@ -1871,6 +1871,47 @@ app.Get("/", func(c fiber.Ctx) error { }) ``` +## SendStreamWriter + +Sets the response body stream writer. + +:::note +The argument `streamWriter` represents a function that populates +the response body using a buffered stream writer. +::: + +```go title="Signature" +func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error +``` + +```go title="Example" +app.Get("/", func (c fiber.Ctx) error { + return c.SendStreamWriter(func(w *bufio.Writer) { + fmt.Fprintf(w, "Hello, World!\n") + }) + // => "Hello, World!" +}) +``` + +:::info +To flush data before the function returns, you can call `w.Flush()` +on the provided writer. Otherwise, the buffered stream flushes after +`streamWriter` returns. +::: + +```go title="Example" +app.Get("/wait", func(c fiber.Ctx) error { + return c.SendStreamWriter(func(w *bufio.Writer) { + fmt.Fprintf(w, "Waiting for 10 seconds\n") + if err := w.Flush(); err != nil { + log.Print("User quit early") + } + time.Sleep(10 * time.Second) + fmt.Fprintf(w, "Done!\n") + }) +}) +``` + ## Set Sets the response’s HTTP header field to the specified `key`, `value`.