From 75f9a92364683adf394820f4ef62f3f07d4978da Mon Sep 17 00:00:00 2001 From: Nicklas Dohrn <20398358+nicklas-dohrn@users.noreply.github.com> Date: Thu, 3 Oct 2024 20:45:32 +0200 Subject: [PATCH] Feat(loggr-syslog-agent): Add https-batch protocol for more efficient https drains (#491) Closes #332 --- src/pkg/egress/syslog/https.go | 50 ++++--- src/pkg/egress/syslog/https_batch.go | 98 +++++++++++++ src/pkg/egress/syslog/https_batch_test.go | 135 ++++++++++++++++++ src/pkg/egress/syslog/https_test.go | 104 +++----------- src/pkg/egress/syslog/writer_factory.go | 8 ++ src/pkg/egress/syslog/writer_factory_test.go | 21 ++- .../bindings/filtered_binding_fetcher.go | 2 +- 7 files changed, 316 insertions(+), 102 deletions(-) create mode 100644 src/pkg/egress/syslog/https_batch.go create mode 100644 src/pkg/egress/syslog/https_batch_test.go diff --git a/src/pkg/egress/syslog/https.go b/src/pkg/egress/syslog/https.go index bff210dae..581fb7776 100644 --- a/src/pkg/egress/syslog/https.go +++ b/src/pkg/egress/syslog/https.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "log" "net/url" "strings" "time" @@ -32,7 +33,6 @@ func NewHTTPSWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) - return &HTTPSWriter{ url: binding.URL, appID: binding.AppID, @@ -43,31 +43,43 @@ func NewHTTPSWriter( } } +func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error { + req := fasthttp.AcquireRequest() + req.SetRequestURI(w.url.String()) + req.Header.SetMethod("POST") + req.Header.SetContentType("text/plain") + req.SetBody(msg) + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + err := w.client.Do(req, resp) + if err != nil { + return w.sanitizeError(w.url, err) + } + + if resp.StatusCode() < 200 || resp.StatusCode() > 299 { + return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + } + + w.egressMetric.Add(msgCount) + + return nil +} + func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error { msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) if err != nil { - return err + log.Printf("failed to parse syslog, dropping faulty message, err: %s", err) + return nil } for _, msg := range msgs { - req := fasthttp.AcquireRequest() - req.SetRequestURI(w.url.String()) - req.Header.SetMethod("POST") - req.Header.SetContentType("text/plain") - req.SetBody(msg) - - resp := fasthttp.AcquireResponse() - - err := w.client.Do(req, resp) + err = w.sendHttpRequest(msg, 1) if err != nil { - return w.sanitizeError(w.url, err) - } - - if resp.StatusCode() < 200 || resp.StatusCode() > 299 { - return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode()) + return err } - - w.egressMetric.Add(1) } return nil @@ -92,7 +104,7 @@ func (*HTTPSWriter) Close() error { return nil } -func httpClient(netConf NetworkTimeoutConfig, tlsConf *tls.Config) *fasthttp.Client { +func httpClient(_ NetworkTimeoutConfig, tlsConf *tls.Config) *fasthttp.Client { return &fasthttp.Client{ MaxConnsPerHost: 5, MaxIdleConnDuration: 90 * time.Second, diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go new file mode 100644 index 000000000..56bc55731 --- /dev/null +++ b/src/pkg/egress/syslog/https_batch.go @@ -0,0 +1,98 @@ +package syslog + +import ( + "bytes" + "crypto/tls" + "log" + "time" + + "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" + metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" +) + +const BATCHSIZE = 256 * 1024 + +type HTTPSBatchWriter struct { + HTTPSWriter + msgs chan []byte + batchSize int + sendInterval time.Duration + egrMsgCount float64 +} + +func NewHTTPSBatchWriter( + binding *URLBinding, + netConf NetworkTimeoutConfig, + tlsConf *tls.Config, + egressMetric metrics.Counter, + c *Converter, +) egress.WriteCloser { + client := httpClient(netConf, tlsConf) + binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme + BatchWriter := &HTTPSBatchWriter{ + HTTPSWriter: HTTPSWriter{ + url: binding.URL, + appID: binding.AppID, + hostname: binding.Hostname, + client: client, + egressMetric: egressMetric, + syslogConverter: c, + }, + batchSize: BATCHSIZE, + sendInterval: 1 * time.Second, + egrMsgCount: 0, + msgs: make(chan []byte), + } + go BatchWriter.startSender() + return BatchWriter +} + +// Modified Write function +func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { + msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname) + if err != nil { + log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err) + return nil + } + + for _, msg := range msgs { + //There is no correct way of implementing error based retries in the current architecture. + //Retries for https-batching will be implemented at a later point in time. + w.msgs <- msg + } + return nil +} + +func (w *HTTPSBatchWriter) startSender() { + t := time.NewTimer(w.sendInterval) + + var msgBatch bytes.Buffer + var msgCount float64 + reset := func() { + msgBatch.Reset() + msgCount = 0 + t.Reset(w.sendInterval) + } + for { + select { + case msg := <-w.msgs: + length, buffer_err := msgBatch.Write(msg) + if buffer_err != nil { + log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err) + reset() + } else { + msgCount++ + if length >= w.batchSize { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck + reset() + } + } + case <-t.C: + if msgBatch.Len() > 0 { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck + reset() + } + } + } +} diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go new file mode 100644 index 000000000..35789bcb6 --- /dev/null +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -0,0 +1,135 @@ +package syslog_test + +import ( + "bytes" + "crypto/tls" + "io" + "net/http" + "net/http/httptest" + "time" + + "code.cloudfoundry.org/go-loggregator/v10/rfc5424" + "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" + metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf" + +var _ = Describe("HTTPS_batch", func() { + var ( + netConf syslog.NetworkTimeoutConfig + skipSSLTLSConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + } + c = syslog.NewConverter() + drain *SpyDrain + b *syslog.URLBinding + writer egress.WriteCloser + ) + string_to_1024_chars += string_to_1024_chars + + BeforeEach(func() { + drain = newBatchMockDrain(200) + b = buildURLBinding( + drain.URL, + "test-app-id", + "test-hostname", + ) + writer = syslog.NewHTTPSBatchWriter( + b, + netConf, + skipSSLTLSConfig, + &metricsHelpers.SpyMetric{}, + c, + ) + }) + + It("testing simple appending of one log", func() { + env1 := buildLogEnvelope("APP", "1", "message 1", loggregator_v2.Log_OUT) + Expect(writer.Write(env1)).To(Succeed()) + env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT) + Expect(writer.Write(env2)).To(Succeed()) + time.Sleep(1050 * time.Millisecond) + + Expect(drain.getMessagesSize()).Should(Equal(2)) + expected := &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/1]", + Message: []byte("message 1\n"), + } + Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[0].Message).To(Equal(expected.Message)) + expected = &rfc5424.Message{ + AppName: "test-app-id", + Hostname: "test-hostname", + Priority: rfc5424.Priority(14), + ProcessID: "[APP/2]", + Message: []byte("message 2\n"), + } + Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) + Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) + Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) + Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) + Expect(drain.messages[1].Message).To(Equal(expected.Message)) + }) + + It("test batch dispatching with all logs in a given timeframe", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + Expect(writer.Write(env1)).To(Succeed()) + time.Sleep(99 * time.Millisecond) + } + Expect(drain.getMessagesSize()).Should(Equal(0)) + time.Sleep(100 * time.Millisecond) + Expect(drain.getMessagesSize()).Should(Equal(10)) + }) + + It("probabilistic test for race condition", func() { + env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT) + for i := 0; i < 10; i++ { + Expect(writer.Write(env1)).To(Succeed()) + time.Sleep(99 * time.Millisecond) + } + time.Sleep(100 * time.Millisecond) + Expect(drain.getMessagesSize()).Should(Equal(10)) + }) +}) + +func newBatchMockDrain(status int) *SpyDrain { + drain := &SpyDrain{} + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + body, err := io.ReadAll(r.Body) + Expect(err).ToNot(HaveOccurred()) + defer r.Body.Close() + + println(body) + + message := &rfc5424.Message{} + + messages := bytes.SplitAfter(body, []byte("\n")) + for _, raw := range messages { + if bytes.Equal(raw, []byte("")) { + continue + } + message = &rfc5424.Message{} + err = message.UnmarshalBinary(raw) + Expect(err).ToNot(HaveOccurred()) + drain.appendMessage(message) + drain.appendHeader(r.Header) + } + w.WriteHeader(status) + }) + server := httptest.NewTLSServer(handler) + drain.Server = server + return drain +} diff --git a/src/pkg/egress/syslog/https_test.go b/src/pkg/egress/syslog/https_test.go index 3f418b85d..9fcec9ba6 100644 --- a/src/pkg/egress/syslog/https_test.go +++ b/src/pkg/egress/syslog/https_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync" "code.cloudfoundry.org/go-loggregator/v10/rfc5424" "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" @@ -38,28 +39,7 @@ var _ = Describe("HTTPWriter", func() { ) env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) - Expect(writer.Write(env)).To(HaveOccurred()) - }) - - It("errors on an invalid syslog message", func() { - drain := newMockOKDrain() - - b := buildURLBinding( - drain.URL, - "test-app-id", - "test-hostname", - ) - writer := syslog.NewHTTPSWriter( - b, - netConf, - skipSSLTLSConfig, - &metricsHelpers.SpyMetric{}, - c, - ) - - env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) - env.SourceId = " " - Expect(writer.Write(env)).To(HaveOccurred()) + Expect(writer.Write(env)).To(HaveOccurred()) //nolint }) It("errors when the http POST fails", func() { @@ -78,7 +58,6 @@ var _ = Describe("HTTPWriter", func() { &metricsHelpers.SpyMetric{}, c, ) - env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) Expect(writer.Write(env)).To(HaveOccurred()) }) @@ -106,60 +85,6 @@ var _ = Describe("HTTPWriter", func() { Expect(err.Error()).ToNot(ContainSubstring("password")) }) - It("writes syslog formatted messages to http drain", func() { - drain := newMockOKDrain() - - b := buildURLBinding( - drain.URL, - "test-app-id", - "test-hostname", - ) - - writer := syslog.NewHTTPSWriter( - b, - netConf, - skipSSLTLSConfig, - &metricsHelpers.SpyMetric{}, - c, - ) - - env1 := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) - Expect(writer.Write(env1)).To(Succeed()) - env2 := buildLogEnvelope("CELL", "5", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env2)).To(Succeed()) - env3 := buildLogEnvelope("CELL", "", "log from cell", loggregator_v2.Log_ERR) - Expect(writer.Write(env3)).To(Succeed()) - - Expect(drain.messages).To(HaveLen(3)) - expected := &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(14), - ProcessID: "[APP/1]", - Message: []byte("just a test\n"), - } - Expect(drain.messages[0].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[0].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[0].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[0].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[0].Message).To(Equal(expected.Message)) - - expected = &rfc5424.Message{ - AppName: "test-app-id", - Hostname: "test-hostname", - Priority: rfc5424.Priority(11), - ProcessID: "[CELL/5]", - Message: []byte("log from cell\n"), - } - Expect(drain.messages[1].AppName).To(Equal(expected.AppName)) - Expect(drain.messages[1].Hostname).To(Equal(expected.Hostname)) - Expect(drain.messages[1].Priority).To(BeEquivalentTo(expected.Priority)) - Expect(drain.messages[1].ProcessID).To(Equal(expected.ProcessID)) - Expect(drain.messages[1].Message).To(Equal(expected.Message)) - - Expect(drain.messages[2].ProcessID).To(Equal("[CELL]")) - }) - It("sets Content-Type to text/plain", func() { drain := newMockOKDrain() @@ -320,19 +245,36 @@ var _ = Describe("HTTPWriter", func() { }) type SpyDrain struct { + mu sync.Mutex *httptest.Server messages []*rfc5424.Message headers []http.Header } +func (d *SpyDrain) appendMessage(message *rfc5424.Message) { + d.mu.Lock() + defer d.mu.Unlock() + d.messages = append(d.messages, message) +} + +func (d *SpyDrain) appendHeader(header http.Header) { + d.mu.Lock() + defer d.mu.Unlock() + d.headers = append(d.headers, header) +} + +func (d *SpyDrain) getMessagesSize() int { + d.mu.Lock() + defer d.mu.Unlock() + return len(d.messages) +} + func newMockOKDrain() *SpyDrain { return newMockDrain(http.StatusOK) } - func newMockErrorDrain() *SpyDrain { return newMockDrain(http.StatusBadRequest) } - func newMockDrain(status int) *SpyDrain { drain := &SpyDrain{} handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -345,8 +287,8 @@ func newMockDrain(status int) *SpyDrain { err = message.UnmarshalBinary(body) Expect(err).ToNot(HaveOccurred()) - drain.messages = append(drain.messages, message) - drain.headers = append(drain.headers, r.Header) + drain.appendMessage(message) + drain.appendHeader(r.Header) w.WriteHeader(status) }) server := httptest.NewTLSServer(handler) diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index 84b45d24c..8ec8249ab 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -106,6 +106,14 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { egressMetric, converter, ) + case "https-batch": + w = NewHTTPSBatchWriter( + ub, + f.netConf, + tlsCfg, + egressMetric, + converter, + ) case "syslog": w = NewTCPWriter( ub, diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 1fbbe35e0..5707bbf56 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -23,7 +23,7 @@ var _ = Describe("EgressFactory", func() { }) Context("when the url begins with https", func() { - It("returns an https writer", func() { + It("returns an single https writer", func() { url, err := url.Parse("https://syslog.example.com") Expect(err).ToNot(HaveOccurred()) urlBinding := &syslog.URLBinding{ @@ -41,6 +41,25 @@ var _ = Describe("EgressFactory", func() { }) }) + Context("when the url begins with https and enables batching", func() { + It("returns an single https writer", func() { + url, err := url.Parse("https-batch://syslog.example.com") + Expect(err).ToNot(HaveOccurred()) + urlBinding := &syslog.URLBinding{ + URL: url, + } + + writer, err := f.NewWriter(urlBinding) + Expect(err).ToNot(HaveOccurred()) + + retryWriter, ok := writer.(*syslog.RetryWriter) + Expect(ok).To(BeTrue()) + + _, ok = retryWriter.Writer.(*syslog.HTTPSBatchWriter) + Expect(ok).To(BeTrue()) + }) + }) + Context("when the url begins with syslog://", func() { It("returns a tcp writer", func() { url, err := url.Parse("syslog://syslog.example.com") diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index b3fda7fcd..0921c95c3 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -14,7 +14,7 @@ import ( //go:generate hel --type IPChecker -var allowedSchemes = []string{"syslog", "syslog-tls", "https"} +var allowedSchemes = []string{"syslog", "syslog-tls", "https", "https-batch"} type IPChecker interface { ResolveAddr(host string) (net.IP, error)