Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature (v3): Add buffered streaming support #3131

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fiber

import (
"bufio"
"bytes"
"context"
"crypto/tls"
Expand Down Expand Up @@ -1669,6 +1670,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}

// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))

return nil
}

// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)
Expand Down
3 changes: 3 additions & 0 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -4343,6 +4344,73 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}

// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check the error returned by w.WriteString

Even in test code, it's good practice to handle errors returned by write operations to ensure any unexpected issues are caught.

Apply this diff to check the error:

-       w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
+       if _, err := w.WriteString("Don't crash please"); err != nil {
+           t.Errorf("unexpected error: %s", err)
+       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
if _, err := w.WriteString("Don't crash please"); err != nil {
t.Errorf("unexpected error: %s", err)
}

})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))

err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check the error returned by fmt.Fprintf

To ensure that any write errors are handled, consider checking the error returned by fmt.Fprintf.

Apply this diff to check the error:

-           fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
+           if _, err := fmt.Fprintf(w, "Line %d\n", lineNum); err != nil {
+               t.Errorf("unexpected error: %s", err)
+               return
+           }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if _, err := fmt.Fprintf(w, "Line %d\n", lineNum); err != nil {
t.Errorf("unexpected error: %s", err)
return
}

if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))

err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}

// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

var mutex sync.Mutex
startChan := make(chan bool)
interruptStreamWriter := func() {
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check the error returned by c.Response().CloseBodyStream()

Even when errors are expected, explicitly handling them improves code robustness and clarity.

Apply this diff to check the error:

-       c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
+       if err := c.Response().CloseBodyStream(); err != nil {
+           t.Errorf("failed to close body stream: %s", err)
+       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
if err := c.Response().CloseBodyStream(); err != nil {
t.Errorf("failed to close body stream: %s", err)
}

mutex.Unlock()
}
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()

startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Check the error returned by fmt.Fprintf

As with other write operations, it's advisable to handle the error returned by fmt.Fprintf to catch any potential issues.

Apply this diff to check the error:

-           fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
+           if _, err := fmt.Fprintf(w, "Line %d\n", lineNum); err != nil {
+               if lineNum < 3 {
+                   t.Errorf("unexpected error: %s", err)
+               }
+               return
+           }

Committable suggestion was skipped due to low confidence.

mutex.Unlock()

if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}

time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
}

// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()
Expand Down
41 changes: 41 additions & 0 deletions docs/api/ctx.md
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,47 @@ app.Get("/", func(c fiber.Ctx) error {
})
```

## SendStreamWriter

Sets the response body stream writer.

:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::

```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```

```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```

:::info
To flush data before the function returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::

```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Waiting for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("User quit early")
}
time.Sleep(10 * time.Second)
fmt.Fprintf(w, "Done!\n")
})
})
```

## Set

Sets the response’s HTTP header field to the specified `key`, `value`.
Expand Down
Loading