Skip to content

Commit

Permalink
Merge pull request #317 from newrelic/fix/akka-body-NR-299709
Browse files Browse the repository at this point in the history
NR-299709: Workaround for issue #310 (#310)
  • Loading branch information
IshikaDawda authored Aug 13, 2024
2 parents 9ec9995 + 689dc34 commit e800262
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package akka.http.scaladsl.server

import akka.Done
import akka.http.scaladsl.model.HttpEntity
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
import akka.util.ByteString
Expand Down Expand Up @@ -55,15 +56,17 @@ class CsecContextWrapper(original: Function1[RequestContext, Future[RouteResult]
override def apply(ctx: RequestContext): Future[RouteResult] = {
try {

var httpRequest = ctx.request;
val httpRequest = ctx.request;
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = httpRequest.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!httpRequest.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, ctx.materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, ctx.materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, httpRequest, body, NewRelic.getAgent.getTransaction.getToken);
original.apply(ctx)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,14 @@ class AkkaAsyncRequestHandler(handler: HttpRequest ⇒ Future[HttpResponse])(imp
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)

AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val futureResponse: Future[HttpResponse] = handler.apply(param)
futureResponse.flatMap(ResponseFutureHelper.wrapResponseAsync(NewRelic.getAgent.getTransaction.getToken, materializer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,14 @@ class AkkaSyncRequestHandler(handler: HttpRequest ⇒ HttpResponse)(implicit mat
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)

if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val response: HttpResponse = handler.apply(param)
ResponseFutureHelper.wrapResponseSync(response, materializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,13 @@ class AkkaAsyncRequestHandler(handler: HttpRequest ⇒ Future[HttpResponse])(imp
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val futureResponse: Future[HttpResponse] = handler.apply(param)
futureResponse.flatMap(ResponseFutureHelper.wrapResponseAsync(NewRelic.getAgent.getTransaction.getToken, materializer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,13 @@ class AkkaSyncRequestHandler(handler: HttpRequest ⇒ HttpResponse)(implicit mat
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val response: HttpResponse = handler.apply(param)
ResponseFutureHelper.wrapResponseSync(response, materializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,13 @@ class AkkaAsyncRequestHandler(handler: HttpRequest ⇒ Future[HttpResponse])(imp
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val futureResponse: Future[HttpResponse] = handler.apply(param)
futureResponse.flatMap(ResponseFutureHelper.wrapResponseAsync(NewRelic.getAgent.getTransaction.getToken, materializer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,13 @@ class AkkaSyncRequestHandler(handler: HttpRequest ⇒ HttpResponse)(implicit mat
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val response: HttpResponse = handler.apply(param)
ResponseFutureHelper.wrapResponseSync(response, materializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -26,11 +26,14 @@ class AkkaAsyncRequestHandler(handler: HttpRequest ⇒ Future[HttpResponse])(imp
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)

if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val futureResponse: Future[HttpResponse] = handler.apply(param)
futureResponse.flatMap(ResponseFutureHelper.wrapResponseAsync(NewRelic.getAgent.getTransaction.getToken, materializer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package akka.http.scaladsl

import akka.Done
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.stream.scaladsl.Sink
Expand All @@ -27,11 +27,14 @@ class AkkaSyncRequestHandler(handler: HttpRequest ⇒ HttpResponse)(implicit mat
val body: lang.StringBuilder = new lang.StringBuilder();
val dataBytes: Source[ByteString, AnyRef] = param.entity.getDataBytes()
val isLockAquired = AkkaCoreUtils.acquireServletLockIfPossible();
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)

if (!param.entity.isInstanceOf[HttpEntity.Chunked]) {
val sink: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString] { byteString =>
val chunk = byteString.utf8String
body.append(chunk)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
}
val processingResult: Future[Done] = dataBytes.runWith(sink, materializer)
AkkaCoreUtils.preProcessHttpRequest(isLockAquired, param, body, NewRelic.getAgent.getTransaction.getToken);
val response: HttpResponse = handler.apply(param)
ResponseFutureHelper.wrapResponseSync(response, materializer)
Expand Down

0 comments on commit e800262

Please sign in to comment.