Skip to content

Commit

Permalink
Feat(loggr-syslog-agent): Add https-batch protocol for more efficient…
Browse files Browse the repository at this point in the history
… https drains (#491)

Closes #332
  • Loading branch information
nicklas-dohrn authored Oct 3, 2024
1 parent 0ef8d81 commit 75f9a92
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 102 deletions.
50 changes: 31 additions & 19 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -32,7 +33,6 @@ func NewHTTPSWriter(
) egress.WriteCloser {

client := httpClient(netConf, tlsConf)

return &HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
Expand All @@ -43,31 +43,43 @@ func NewHTTPSWriter(
}
}

func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
req.Header.SetContentType("text/plain")
req.SetBody(msg)

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)

err := w.client.Do(req, resp)
if err != nil {
return w.sanitizeError(w.url, err)
}

if resp.StatusCode() < 200 || resp.StatusCode() > 299 {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
}

w.egressMetric.Add(msgCount)

return nil
}

func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
log.Printf("failed to parse syslog, dropping faulty message, err: %s", err)
return nil
}

for _, msg := range msgs {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
req.Header.SetContentType("text/plain")
req.SetBody(msg)

resp := fasthttp.AcquireResponse()

err := w.client.Do(req, resp)
err = w.sendHttpRequest(msg, 1)
if err != nil {
return w.sanitizeError(w.url, err)
}

if resp.StatusCode() < 200 || resp.StatusCode() > 299 {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
return err
}

w.egressMetric.Add(1)
}

return nil
Expand All @@ -92,7 +104,7 @@ func (*HTTPSWriter) Close() error {
return nil
}

func httpClient(netConf NetworkTimeoutConfig, tlsConf *tls.Config) *fasthttp.Client {
func httpClient(_ NetworkTimeoutConfig, tlsConf *tls.Config) *fasthttp.Client {
return &fasthttp.Client{
MaxConnsPerHost: 5,
MaxIdleConnDuration: 90 * time.Second,
Expand Down
98 changes: 98 additions & 0 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package syslog

import (
"bytes"
"crypto/tls"
"log"
"time"

"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
msgs chan []byte
batchSize int
sendInterval time.Duration
egrMsgCount float64
}

func NewHTTPSBatchWriter(
binding *URLBinding,
netConf NetworkTimeoutConfig,
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
BatchWriter := &HTTPSBatchWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
hostname: binding.Hostname,
client: client,
egressMetric: egressMetric,
syslogConverter: c,
},
batchSize: BATCHSIZE,
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
}
go BatchWriter.startSender()
return BatchWriter
}

// Modified Write function
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err)
return nil
}

for _, msg := range msgs {
//There is no correct way of implementing error based retries in the current architecture.
//Retries for https-batching will be implemented at a later point in time.
w.msgs <- msg
}
return nil
}

func (w *HTTPSBatchWriter) startSender() {
t := time.NewTimer(w.sendInterval)

var msgBatch bytes.Buffer
var msgCount float64
reset := func() {
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)
}
for {
select {
case msg := <-w.msgs:
length, buffer_err := msgBatch.Write(msg)
if buffer_err != nil {
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err)
reset()
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
}
}
}
}
135 changes: 135 additions & 0 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package syslog_test

import (
"bytes"
"crypto/tls"
"io"
"net/http"
"net/http/httptest"
"time"

"code.cloudfoundry.org/go-loggregator/v10/rfc5424"
"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf"

var _ = Describe("HTTPS_batch", func() {
var (
netConf syslog.NetworkTimeoutConfig
skipSSLTLSConfig = &tls.Config{
InsecureSkipVerify: true, //nolint:gosec
}
c = syslog.NewConverter()
drain *SpyDrain
b *syslog.URLBinding
writer egress.WriteCloser
)
string_to_1024_chars += string_to_1024_chars

BeforeEach(func() {
drain = newBatchMockDrain(200)
b = buildURLBinding(
drain.URL,
"test-app-id",
"test-hostname",
)
writer = syslog.NewHTTPSBatchWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
)
})

It("testing simple appending of one log", func() {
env1 := buildLogEnvelope("APP", "1", "message 1", loggregator_v2.Log_OUT)
Expect(writer.Write(env1)).To(Succeed())
env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT)
Expect(writer.Write(env2)).To(Succeed())
time.Sleep(1050 * time.Millisecond)

Expect(drain.getMessagesSize()).Should(Equal(2))
expected := &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(14),
ProcessID: "[APP/1]",
Message: []byte("message 1\n"),
}
Expect(drain.messages[0].AppName).To(Equal(expected.AppName))
Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[0].Message).To(Equal(expected.Message))
expected = &rfc5424.Message{
AppName: "test-app-id",
Hostname: "test-hostname",
Priority: rfc5424.Priority(14),
ProcessID: "[APP/2]",
Message: []byte("message 2\n"),
}
Expect(drain.messages[1].AppName).To(Equal(expected.AppName))
Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname))
Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority))
Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID))
Expect(drain.messages[1].Message).To(Equal(expected.Message))
})

It("test batch dispatching with all logs in a given timeframe", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
Expect(writer.Write(env1)).To(Succeed())
time.Sleep(99 * time.Millisecond)
}
Expect(drain.getMessagesSize()).Should(Equal(0))
time.Sleep(100 * time.Millisecond)
Expect(drain.getMessagesSize()).Should(Equal(10))
})

It("probabilistic test for race condition", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
Expect(writer.Write(env1)).To(Succeed())
time.Sleep(99 * time.Millisecond)
}
time.Sleep(100 * time.Millisecond)
Expect(drain.getMessagesSize()).Should(Equal(10))
})
})

func newBatchMockDrain(status int) *SpyDrain {
drain := &SpyDrain{}
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

body, err := io.ReadAll(r.Body)
Expect(err).ToNot(HaveOccurred())
defer r.Body.Close()

println(body)

message := &rfc5424.Message{}

messages := bytes.SplitAfter(body, []byte("\n"))
for _, raw := range messages {
if bytes.Equal(raw, []byte("")) {
continue
}
message = &rfc5424.Message{}
err = message.UnmarshalBinary(raw)
Expect(err).ToNot(HaveOccurred())
drain.appendMessage(message)
drain.appendHeader(r.Header)
}
w.WriteHeader(status)
})
server := httptest.NewTLSServer(handler)
drain.Server = server
return drain
}
Loading

0 comments on commit 75f9a92

Please sign in to comment.