diff --git a/runtime/observability/telemetry-api/api/telemetry-api.api b/runtime/observability/telemetry-api/api/telemetry-api.api index 7d36837b2..9ad273852 100644 --- a/runtime/observability/telemetry-api/api/telemetry-api.api +++ b/runtime/observability/telemetry-api/api/telemetry-api.api @@ -125,7 +125,7 @@ public final class aws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurement$ public static synthetic fun record$default (Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurement;Ljava/lang/Number;Laws/smithy/kotlin/runtime/util/Attributes;Laws/smithy/kotlin/runtime/telemetry/context/Context;ILjava/lang/Object;)V } -public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle { +public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle { public abstract fun stop ()V } @@ -138,18 +138,20 @@ public final class aws/smithy/kotlin/runtime/telemetry/metrics/Histogram$Default } public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/Meter { - public abstract fun createDoubleGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle; + public abstract fun createAsyncUpDownCounter (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; + public abstract fun createDoubleGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; public abstract fun createDoubleHistogram (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram; - public abstract fun createLongGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle; + public abstract fun createLongGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; public abstract fun createLongHistogram (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram; public abstract fun createMonotonicCounter (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/MonotonicCounter; public abstract fun createUpDownCounter (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/UpDownCounter; } public final class aws/smithy/kotlin/runtime/telemetry/metrics/Meter$DefaultImpls { - public static synthetic fun createDoubleGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle; + public static synthetic fun createAsyncUpDownCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; + public static synthetic fun createDoubleGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; public static synthetic fun createDoubleHistogram$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram; - public static synthetic fun createLongGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle; + public static synthetic fun createLongGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle; public static synthetic fun createLongHistogram$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram; public static synthetic fun createMonotonicCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/MonotonicCounter; public static synthetic fun createUpDownCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/UpDownCounter; diff --git a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Gauge.kt b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Gauge.kt index c8586f77c..54ae5da19 100644 --- a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Gauge.kt +++ b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Gauge.kt @@ -23,22 +23,23 @@ public interface AsyncMeasurement { public fun record( value: T, attributes: Attributes = emptyAttributes(), - context: Context, + context: Context? = null, ) } public typealias LongAsyncMeasurement = AsyncMeasurement public typealias LongGaugeCallback = (LongAsyncMeasurement) -> Unit +public typealias LongUpDownCounterCallback = (LongAsyncMeasurement) -> Unit public typealias DoubleAsyncMeasurement = AsyncMeasurement public typealias DoubleGaugeCallback = (DoubleAsyncMeasurement) -> Unit /** - * A handle to a registered guage + * A handle to a registered async measurement (e.g. Gauge or AsyncUpDownCounter) */ -public interface GaugeHandle { +public interface AsyncMeasurementHandle { /** - * Stop recording this gauge value. The registered callback function will + * Stop recording this async value. The registered callback function will * stop being invoked after calling this function. */ public fun stop() diff --git a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Meter.kt b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Meter.kt index ab1e4f3d1..8cbd4cfc7 100644 --- a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Meter.kt +++ b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/Meter.kt @@ -23,6 +23,24 @@ public interface Meter { description: String? = null, ): UpDownCounter + /** + * Create a new async up down counter. + * + * @param name the instrument name + * @param callback the callback to invoke when reading the counter value. NOTE: Unlike synchronous counters that + * record delta values, async measurements record absolute/current values. + * @param units the unit of measure + * @param description the human-readable description of the measurement + * @return a [AsyncMeasurementHandle] which can be used for de-registering the counter + * and stopping the callback from being invoked. + */ + public fun createAsyncUpDownCounter( + name: String, + callback: LongUpDownCounterCallback, + units: String? = null, + description: String? = null, + ): AsyncMeasurementHandle + /** * Create a new [MonotonicCounter] * @@ -69,7 +87,7 @@ public interface Meter { * @param callback the callback to invoke when reading the gauge value * @param units the unit of measure * @param description the human-readable description of the measurement - * @return a [GaugeHandle] which can be used for de-registering the gauge + * @return a [AsyncMeasurementHandle] which can be used for de-registering the gauge * and stopping the callback from being invoked. */ public fun createLongGauge( @@ -77,7 +95,7 @@ public interface Meter { callback: LongGaugeCallback, units: String? = null, description: String? = null, - ): GaugeHandle + ): AsyncMeasurementHandle /** * Create a new Gauge. @@ -86,7 +104,7 @@ public interface Meter { * @param callback the callback to invoke when reading the gauge value * @param units the unit of measure * @param description the human-readable description of the measurement - * @return a [GaugeHandle] which can be used for de-registering the gauge + * @return a [AsyncMeasurementHandle] which can be used for de-registering the gauge * and stopping the callback from being invoked. */ public fun createDoubleGauge( @@ -94,5 +112,5 @@ public interface Meter { callback: DoubleGaugeCallback, units: String? = null, description: String? = null, - ): GaugeHandle + ): AsyncMeasurementHandle } diff --git a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/NoOpMeterProvider.kt b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/NoOpMeterProvider.kt index 182d52768..bc89ea24d 100644 --- a/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/NoOpMeterProvider.kt +++ b/runtime/observability/telemetry-api/common/src/aws/smithy/kotlin/runtime/telemetry/metrics/NoOpMeterProvider.kt @@ -18,6 +18,13 @@ internal object NoOpMeterProvider : MeterProvider { private object NoOpMeter : Meter { override fun createUpDownCounter(name: String, units: String?, description: String?): UpDownCounter = NoOpUpDownCounter + override fun createAsyncUpDownCounter( + name: String, + callback: LongUpDownCounterCallback, + units: String?, + description: String?, + ): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle + override fun createMonotonicCounter(name: String, units: String?, description: String?): MonotonicCounter = NoOpMonotonicCounter override fun createLongHistogram(name: String, units: String?, description: String?): LongHistogram = NoOpLongHistogram @@ -29,14 +36,14 @@ private object NoOpMeter : Meter { callback: LongGaugeCallback, units: String?, description: String?, - ): GaugeHandle = NoOpGaugeHandle + ): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle override fun createDoubleGauge( name: String, callback: DoubleGaugeCallback, units: String?, description: String?, - ): GaugeHandle = NoOpGaugeHandle + ): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle } private object NoOpUpDownCounter : UpDownCounter { @@ -53,6 +60,6 @@ private object NoOpDoubleHistogram : DoubleHistogram { override fun record(value: Double, attributes: Attributes, context: Context?) {} } -private object NoOpGaugeHandle : GaugeHandle { +private object NoOpAsyncMeasurementHandle : AsyncMeasurementHandle { override fun stop() {} } diff --git a/runtime/observability/telemetry-provider-otel/jvm/src/aws/smithy/kotlin/runtime/telemetry/otel/OtelMeterProvider.kt b/runtime/observability/telemetry-provider-otel/jvm/src/aws/smithy/kotlin/runtime/telemetry/otel/OtelMeterProvider.kt index 00ce5580a..48ef08764 100644 --- a/runtime/observability/telemetry-provider-otel/jvm/src/aws/smithy/kotlin/runtime/telemetry/otel/OtelMeterProvider.kt +++ b/runtime/observability/telemetry-provider-otel/jvm/src/aws/smithy/kotlin/runtime/telemetry/otel/OtelMeterProvider.kt @@ -37,6 +37,24 @@ private class OtelMeter( return OtelUpDownCounterImpl(counter) } + override fun createAsyncUpDownCounter( + name: String, + callback: LongUpDownCounterCallback, + units: String?, + description: String?, + ): AsyncMeasurementHandle { + val observer = otelMeter.upDownCounterBuilder(name) + .apply { + description?.let { setDescription(it) } + units?.let { setUnit(units) } + } + .buildWithCallback { + callback(OtelLongAsyncMeasurementImpl(it)) + } + + return OtelAsyncMeasurementHandleImpl(observer) + } + override fun createMonotonicCounter(name: String, units: String?, description: String?): MonotonicCounter { val counter = otelMeter.counterBuilder(name) .apply { @@ -73,7 +91,7 @@ private class OtelMeter( callback: LongGaugeCallback, units: String?, description: String?, - ): GaugeHandle { + ): AsyncMeasurementHandle { val observer = otelMeter.gaugeBuilder(name) .apply { description?.let { setDescription(it) } @@ -83,7 +101,7 @@ private class OtelMeter( .buildWithCallback { callback(OtelLongAsyncMeasurementImpl(it)) } - return OtelGaugeHandleImpl(observer) + return OtelAsyncMeasurementHandleImpl(observer) } override fun createDoubleGauge( @@ -91,7 +109,7 @@ private class OtelMeter( callback: DoubleGaugeCallback, units: String?, description: String?, - ): GaugeHandle { + ): AsyncMeasurementHandle { val observer = otelMeter.gaugeBuilder(name) .apply { description?.let { setDescription(it) } @@ -100,7 +118,7 @@ private class OtelMeter( .buildWithCallback { callback(OtelDoubleAsyncMeasurementImpl(it)) } - return OtelGaugeHandleImpl(observer) + return OtelAsyncMeasurementHandleImpl(observer) } } @@ -157,7 +175,7 @@ private class OtelDoubleHistogramImpl( } private class OtelLongAsyncMeasurementImpl(private val measurement: ObservableLongMeasurement) : LongAsyncMeasurement { - override fun record(value: Long, attributes: Attributes, context: Context) { + override fun record(value: Long, attributes: Attributes, context: Context?) { if (attributes.isEmpty) { measurement.record(value) } else { @@ -167,7 +185,7 @@ private class OtelLongAsyncMeasurementImpl(private val measurement: ObservableLo } private class OtelDoubleAsyncMeasurementImpl(private val measurement: ObservableDoubleMeasurement) : DoubleAsyncMeasurement { - override fun record(value: Double, attributes: Attributes, context: Context) { + override fun record(value: Double, attributes: Attributes, context: Context?) { if (attributes.isEmpty) { measurement.record(value) } else { @@ -176,7 +194,7 @@ private class OtelDoubleAsyncMeasurementImpl(private val measurement: Observable } } -private class OtelGaugeHandleImpl(private val otelHandle: AutoCloseable) : GaugeHandle { +private class OtelAsyncMeasurementHandleImpl(private val otelHandle: AutoCloseable) : AsyncMeasurementHandle { override fun stop() { otelHandle.close() } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api b/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api index 5f923e70a..2f1b47c11 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api @@ -22,7 +22,9 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConf public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl { public fun ()V public final fun getMaxConnectionsPerHost-0hXNFcg ()Lkotlin/UInt; + public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider; public final fun setMaxConnectionsPerHost-ExVfyTY (Lkotlin/UInt;)V + public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V } public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion { diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts index d18c42ca2..e3831672b 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/build.gradle.kts @@ -16,6 +16,7 @@ kotlin { dependencies { implementation(project(":runtime:runtime-core")) api(project(":runtime:protocol:http-client")) + implementation(project(":runtime:observability:telemetry-defaults")) implementation("com.squareup.okhttp3:okhttp:$okHttpVersion") implementation("com.squareup.okhttp3:okhttp-coroutines:$okHttpVersion") diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/HttpEngineEventListener.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/HttpEngineEventListener.kt index dbb1663b3..f6b6c83cb 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/HttpEngineEventListener.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/HttpEngineEventListener.kt @@ -4,7 +4,6 @@ */ package aws.smithy.kotlin.runtime.http.engine.okhttp -import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetricAttributes import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.net.HostResolver import aws.smithy.kotlin.runtime.net.toHostAddress @@ -13,13 +12,9 @@ import aws.smithy.kotlin.runtime.telemetry.logging.LoggerProvider import aws.smithy.kotlin.runtime.telemetry.logging.MessageSupplier import aws.smithy.kotlin.runtime.telemetry.logging.getLogger import aws.smithy.kotlin.runtime.telemetry.logging.logger -import aws.smithy.kotlin.runtime.telemetry.metrics.decrement -import aws.smithy.kotlin.runtime.telemetry.metrics.increment import aws.smithy.kotlin.runtime.telemetry.telemetryProvider import aws.smithy.kotlin.runtime.telemetry.trace.SpanStatus import aws.smithy.kotlin.runtime.telemetry.trace.recordException -import aws.smithy.kotlin.runtime.util.AttributeKey -import aws.smithy.kotlin.runtime.util.get import okhttp3.* import java.io.IOException import java.net.InetAddress @@ -29,29 +24,26 @@ import kotlin.time.ExperimentalTime import kotlin.time.TimeMark import kotlin.time.TimeSource -private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.okhttp" +internal const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.okhttp" +// see https://square.github.io/okhttp/features/events/#eventlistener for example callback flow @OptIn(ExperimentalTime::class) internal class HttpEngineEventListener( private val pool: ConnectionPool, private val hr: HostResolver, + private val dispatcher: Dispatcher, + private val metrics: HttpClientMetrics, call: Call, ) : EventListener() { private val provider: TelemetryProvider = call.request().tag()?.callContext?.telemetryProvider ?: TelemetryProvider.None private val traceSpan = provider.tracerProvider .getOrCreateTracer(TELEMETRY_SCOPE) .createSpan("HTTP") - private val logger = call.request().tag()?.callContext?.logger() ?: LoggerProvider.None.getLogger() - - private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, provider) - // FIXME - need idle/warm connection usage but requires tracking delta? Better suited for async UpDownCounter... - init { - // listener is created at same time as a call, the call is then enqueued until dispatcher can execute it - metrics.requests.increment(HttpClientMetricAttributes.QueuedRequest) - } + private val logger = call.request().tag()?.callContext?.logger() ?: LoggerProvider.None.getLogger() - // see https://square.github.io/okhttp/features/events/#eventlistener for flow + // FIXME - this isn't actually when a connection is started - this includes time in queue + // see https://github.com/square/okhttp/blob/7c92ed0879477eddb2fce6b4066d151525d5687f/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt#L167-L175 private var connectStart: TimeMark? = null private inline fun trace(crossinline msg: MessageSupplier) { @@ -64,19 +56,21 @@ internal class HttpEngineEventListener( override fun callStart(call: Call) { connectStart = TimeSource.Monotonic.markNow() - metrics.requests.decrement(HttpClientMetricAttributes.QueuedRequest) - metrics.requests.increment(HttpClientMetricAttributes.InFlightRequest) + metrics.queuedRequests = dispatcher.queuedCallsCount().toLong() + metrics.inFlightRequests = dispatcher.runningCallsCount().toLong() trace { "call started" } } override fun callEnd(call: Call) { - metrics.requests.decrement(HttpClientMetricAttributes.InFlightRequest) + metrics.queuedRequests = dispatcher.queuedCallsCount().toLong() + metrics.inFlightRequests = dispatcher.runningCallsCount().toLong() trace { "call complete" } traceSpan.close() } override fun callFailed(call: Call, ioe: IOException) { - metrics.requests.decrement(HttpClientMetricAttributes.InFlightRequest) + metrics.queuedRequests = dispatcher.queuedCallsCount().toLong() + metrics.inFlightRequests = dispatcher.runningCallsCount().toLong() trace(ioe) { "call failed" } traceSpan.recordException(ioe, true) traceSpan.setStatus(SpanStatus.ERROR) @@ -103,7 +97,8 @@ internal class HttpEngineEventListener( } override fun connectionAcquired(call: Call, connection: Connection) { - metrics.connectionUsage.increment(HttpClientMetricAttributes.AcquiredConnection) + metrics.acquiredConnections = pool.connectionCount().toLong() + metrics.idleConnections = pool.idleConnectionCount().toLong() connectStart?.let { metrics.connectionAcquireDuration.record(it.elapsedNow().inWholeMilliseconds) } @@ -113,7 +108,8 @@ internal class HttpEngineEventListener( } override fun connectionReleased(call: Call, connection: Connection) { - metrics.connectionUsage.decrement(HttpClientMetricAttributes.AcquiredConnection) + metrics.acquiredConnections = pool.connectionCount().toLong() + metrics.idleConnections = pool.idleConnectionCount().toLong() val connId = System.identityHashCode(connection) trace { "connection released: conn(id=$connId)=$connection; connPool: total=${pool.connectionCount()}, idle=${pool.idleConnectionCount()}" } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index 26d22bf2b..73aeb0aa4 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -40,13 +40,16 @@ public class OkHttpEngine( override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke } - private val client = config.buildClient() + private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) + private val client = config.buildClient(metrics) + + init { + metrics.connectionsLimit = config.maxConnections.toLong() + } override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { val callContext = callContext() - // FIXME - publish connection limit - val engineRequest = request.toOkHttpRequest(context, callContext) val engineCall = client.newCall(engineRequest) val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() } @@ -65,13 +68,14 @@ public class OkHttpEngine( override fun shutdown() { client.connectionPool.evictAll() client.dispatcher.executorService.shutdown() + metrics.close() } } /** * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client */ -private fun OkHttpEngineConfig.buildClient(): OkHttpClient { +private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient { val config = this return OkHttpClient.Builder().apply { @@ -106,7 +110,7 @@ private fun OkHttpEngineConfig.buildClient(): OkHttpClient { dispatcher(dispatcher) // Log events coming from okhttp. Allocate a new listener per-call to facilitate dedicated trace spans. - eventListenerFactory { call -> HttpEngineEventListener(pool, config.hostResolver, call) } + eventListenerFactory { call -> HttpEngineEventListener(pool, config.hostResolver, dispatcher, metrics, call) } // map protocols if (config.tlsContext.alpn.isNotEmpty()) { diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt index 2b6822126..48de768b5 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig.kt @@ -7,6 +7,8 @@ package aws.smithy.kotlin.runtime.http.engine.okhttp import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfigImpl +import aws.smithy.kotlin.runtime.telemetry.Global +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider /** * The configuration parameters for an OkHttp HTTP client engine. @@ -47,5 +49,7 @@ public class OkHttpEngineConfig private constructor(builder: Builder) : HttpClie * The maximum number of connections to open to a single host. Defaults to [maxConnections]. */ public var maxConnectionsPerHost: UInt? = null + + override var telemetryProvider: TelemetryProvider = TelemetryProvider.Global } } diff --git a/runtime/protocol/http-client/api/http-client.api b/runtime/protocol/http-client/api/http-client.api index 5b0f2cd70..ea77cc0f6 100644 --- a/runtime/protocol/http-client/api/http-client.api +++ b/runtime/protocol/http-client/api/http-client.api @@ -77,6 +77,7 @@ public abstract interface class aws/smithy/kotlin/runtime/http/engine/HttpClient public abstract fun getProxySelector ()Laws/smithy/kotlin/runtime/http/engine/ProxySelector; public abstract fun getSocketReadTimeout-UwyO8pc ()J public abstract fun getSocketWriteTimeout-UwyO8pc ()J + public abstract fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider; public abstract fun getTlsContext ()Laws/smithy/kotlin/runtime/http/engine/TlsContext; } @@ -90,6 +91,7 @@ public abstract interface class aws/smithy/kotlin/runtime/http/engine/HttpClient public abstract fun getProxySelector ()Laws/smithy/kotlin/runtime/http/engine/ProxySelector; public abstract fun getSocketReadTimeout-UwyO8pc ()J public abstract fun getSocketWriteTimeout-UwyO8pc ()J + public abstract fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider; public abstract fun getTlsContext ()Laws/smithy/kotlin/runtime/http/engine/TlsContext; public abstract fun setConnectTimeout-LRDsOJo (J)V public abstract fun setConnectionAcquireTimeout-LRDsOJo (J)V @@ -99,6 +101,7 @@ public abstract interface class aws/smithy/kotlin/runtime/http/engine/HttpClient public abstract fun setProxySelector (Laws/smithy/kotlin/runtime/http/engine/ProxySelector;)V public abstract fun setSocketReadTimeout-LRDsOJo (J)V public abstract fun setSocketWriteTimeout-LRDsOJo (J)V + public abstract fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V public abstract fun setTlsContext (Laws/smithy/kotlin/runtime/http/engine/TlsContext;)V public abstract fun tlsContext (Lkotlin/jvm/functions/Function1;)V } diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfig.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfig.kt index 5a8c586a4..d5def05b7 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfig.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfig.kt @@ -7,6 +7,7 @@ package aws.smithy.kotlin.runtime.http.engine import aws.smithy.kotlin.runtime.InternalApi import aws.smithy.kotlin.runtime.http.config.HttpEngineConfigDsl import aws.smithy.kotlin.runtime.net.HostResolver +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -80,6 +81,11 @@ public interface HttpClientEngineConfig { */ public val tlsContext: TlsContext + /** + * The telemetry provider that the HTTP client will be instrumented with + */ + public val telemetryProvider: TelemetryProvider + @InternalApi public fun toBuilderApplicator(): Builder.() -> Unit @@ -162,6 +168,11 @@ public interface HttpClientEngineConfig { */ public var tlsContext: TlsContext + /** + * The telemetry provider that the HTTP client will be instrumented with + */ + public var telemetryProvider: TelemetryProvider + /** * Settings related to TLS and secure connections */ @@ -183,6 +194,7 @@ public open class HttpClientEngineConfigImpl(builder: HttpClientEngineConfig.Bui override val proxySelector: ProxySelector = builder.proxySelector override val hostResolver: HostResolver = builder.hostResolver override val tlsContext: TlsContext = builder.tlsContext + override val telemetryProvider: TelemetryProvider = builder.telemetryProvider override fun toBuilderApplicator(): HttpClientEngineConfig.Builder.() -> Unit = { socketReadTimeout = this@HttpClientEngineConfigImpl.socketReadTimeout @@ -194,6 +206,7 @@ public open class HttpClientEngineConfigImpl(builder: HttpClientEngineConfig.Bui proxySelector = this@HttpClientEngineConfigImpl.proxySelector hostResolver = this@HttpClientEngineConfigImpl.hostResolver tlsContext = this@HttpClientEngineConfigImpl.tlsContext + telemetryProvider = this@HttpClientEngineConfigImpl.telemetryProvider } @InternalApi @@ -207,6 +220,7 @@ public open class HttpClientEngineConfigImpl(builder: HttpClientEngineConfig.Bui override var proxySelector: ProxySelector = EnvironmentProxySelector() override var hostResolver: HostResolver = HostResolver.Default override var tlsContext: TlsContext = TlsContext.Default + override var telemetryProvider: TelemetryProvider = TelemetryProvider.None override fun tlsContext(block: TlsContext.Builder.() -> Unit) { tlsContext = TlsContext(tlsContext.toBuilder().apply(block)) } diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt index 109e7d951..c600d55f5 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt @@ -5,14 +5,29 @@ package aws.smithy.kotlin.runtime.http.engine.internal import aws.smithy.kotlin.runtime.InternalApi +import aws.smithy.kotlin.runtime.io.Closeable import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider +import aws.smithy.kotlin.runtime.telemetry.metrics.LongAsyncMeasurement import aws.smithy.kotlin.runtime.telemetry.metrics.LongHistogram -import aws.smithy.kotlin.runtime.telemetry.metrics.UpDownCounter import aws.smithy.kotlin.runtime.util.Attributes import aws.smithy.kotlin.runtime.util.attributesOf +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update /** - * Container for common HTTP engine related metrics + * Common attributes for [HttpClientMetrics] + */ +@InternalApi +public object HttpClientMetricAttributes { + public val IdleConnection: Attributes = attributesOf { "state" to "idle" } + public val AcquiredConnection: Attributes = attributesOf { "state" to "acquired" } + public val QueuedRequest: Attributes = attributesOf { "state" to "queued" } + public val InFlightRequest: Attributes = attributesOf { "state" to "in-flight" } +} + +/** + * Container for common HTTP engine related metrics. Engine implementations can re-use this and update + * the various fields in whatever manner fits best (increment/decrement vs current absolute value). * * @param scope the instrumentation scope * @param provider the telemetry provider to instrument with @@ -21,22 +36,131 @@ import aws.smithy.kotlin.runtime.util.attributesOf public class HttpClientMetrics( scope: String, public val provider: TelemetryProvider, -) { +) : Closeable { private val meter = provider.meterProvider.getOrCreateMeter(scope) - public val connectionLimit: UpDownCounter = meter.createUpDownCounter("aws.smithy.http.connections.limit", "{connection}", "Max connections configured for the HTTP client") - public val connectionUsage: UpDownCounter = meter.createUpDownCounter("aws.smithy.http.connections.usage", "{connection}", "Current state of connections (idle, acquired)") + private val _connectionsLimit = atomic(0L) + private val _idleConnections = atomic(0L) + private val _acquiredConnections = atomic(0L) + private val _queuedRequests = atomic(0L) + private val _inFlightRequests = atomic(0L) + + /** + * The amount of time it takes to acquire a connection from the pool + */ public val connectionAcquireDuration: LongHistogram = meter.createLongHistogram("aws.smithy.http.connections.acquire_duration", "ms", "The amount of time requests take to acquire a connection from the pool") - public val requests: UpDownCounter = meter.createUpDownCounter("aws.smithy.http.requests", "{request}", "The current state of requests (queued, in-flight)") -} -/** - * Common attributes for [HttpClientMetrics] - */ -@InternalApi -public object HttpClientMetricAttributes { - public val IdleConnection: Attributes = attributesOf { "state" to "idle" } - public val AcquiredConnection: Attributes = attributesOf { "state" to "acquired" } - public val QueuedRequest: Attributes = attributesOf { "state" to "queued" } - public val InFlightRequest: Attributes = attributesOf { "state" to "in-flight" } + private val connectionLimitHandle = meter.createAsyncUpDownCounter( + "aws.smithy.http.connections.limit", + { it.record(_connectionsLimit.value) }, + "{connection}", + "Max connections configured for the HTTP client", + ) + + private val connectionUsageHandle = meter.createAsyncUpDownCounter( + "aws.smithy.http.connections.usage", + ::recordConnectionState, + "{connection}", + "Current state of connections (idle, acquired)", + ) + + private val requestsHandle = meter.createAsyncUpDownCounter( + "aws.smithy.http.requests", + ::recordRequestsState, + "{request}", + "The current state of requests (queued, in-flight)", + ) + + /** + * The maximum number of connections configured for the client + */ + public var connectionsLimit: Long + get() = _connectionsLimit.value + set(value) { + _connectionsLimit.update { value } + } + + /** + * The number of idle (warm) connections in the pool right now + */ + public var idleConnections: Long + get() = _idleConnections.value + set(value) { + _idleConnections.update { value } + } + + public fun incrementIdleConnections() { + _idleConnections += 1 + } + + public fun decrementIdleConnections() { + _idleConnections -= 1 + } + + /** + * The number of acquired (active) connections used right now + */ + public var acquiredConnections: Long + get() = _acquiredConnections.value + set(value) { + _acquiredConnections.update { value } + } + + public fun incrementAcquiredConnections() { + _acquiredConnections += 1 + } + + public fun decrementAcquiredConnections() { + _acquiredConnections -= 1 + } + + /** + * The number of requests currently queued waiting to be dispatched/executed by the client + */ + public var queuedRequests: Long + get() = _queuedRequests.value + set(value) { + _queuedRequests.update { value } + } + + public fun incrementQueuedRequests() { + _queuedRequests += 1 + } + + public fun decrementQueuedRequests() { + _queuedRequests -= 1 + } + + /** + * The number of requests currently in-flight (actively processing) + */ + public var inFlightRequests: Long + get() = _inFlightRequests.value + set(value) { + _inFlightRequests.update { value } + } + + public fun incrementInFlightRequests() { + _inFlightRequests += 1 + } + + public fun decrementInFlightRequests() { + _inFlightRequests -= 1 + } + + private fun recordRequestsState(measurement: LongAsyncMeasurement) { + measurement.record(inFlightRequests, HttpClientMetricAttributes.InFlightRequest) + measurement.record(queuedRequests, HttpClientMetricAttributes.QueuedRequest) + } + + private fun recordConnectionState(measurement: LongAsyncMeasurement) { + measurement.record(idleConnections, HttpClientMetricAttributes.IdleConnection) + measurement.record(acquiredConnections, HttpClientMetricAttributes.AcquiredConnection) + } + + override fun close() { + connectionLimitHandle.stop() + connectionUsageHandle.stop() + requestsHandle.stop() + } }