Skip to content

Commit

Permalink
add async updown counter and rework http metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
aajtodd committed Jun 28, 2023
1 parent 3a69e91 commit ccb3234
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 65 deletions.
12 changes: 7 additions & 5 deletions runtime/observability/telemetry-api/api/telemetry-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final class aws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurement$
public static synthetic fun record$default (Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurement;Ljava/lang/Number;Laws/smithy/kotlin/runtime/util/Attributes;Laws/smithy/kotlin/runtime/telemetry/context/Context;ILjava/lang/Object;)V
}

public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle {
public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle {
public abstract fun stop ()V
}

Expand All @@ -138,18 +138,20 @@ public final class aws/smithy/kotlin/runtime/telemetry/metrics/Histogram$Default
}

public abstract interface class aws/smithy/kotlin/runtime/telemetry/metrics/Meter {
public abstract fun createDoubleGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle;
public abstract fun createAsyncUpDownCounter (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public abstract fun createDoubleGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public abstract fun createDoubleHistogram (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram;
public abstract fun createLongGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle;
public abstract fun createLongGauge (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public abstract fun createLongHistogram (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram;
public abstract fun createMonotonicCounter (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/MonotonicCounter;
public abstract fun createUpDownCounter (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Laws/smithy/kotlin/runtime/telemetry/metrics/UpDownCounter;
}

public final class aws/smithy/kotlin/runtime/telemetry/metrics/Meter$DefaultImpls {
public static synthetic fun createDoubleGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle;
public static synthetic fun createAsyncUpDownCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public static synthetic fun createDoubleGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public static synthetic fun createDoubleHistogram$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram;
public static synthetic fun createLongGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/GaugeHandle;
public static synthetic fun createLongGauge$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/AsyncMeasurementHandle;
public static synthetic fun createLongHistogram$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/Histogram;
public static synthetic fun createMonotonicCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/MonotonicCounter;
public static synthetic fun createUpDownCounter$default (Laws/smithy/kotlin/runtime/telemetry/metrics/Meter;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/telemetry/metrics/UpDownCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@ public interface AsyncMeasurement<T : Number> {
public fun record(
value: T,
attributes: Attributes = emptyAttributes(),
context: Context,
context: Context? = null,
)
}

public typealias LongAsyncMeasurement = AsyncMeasurement<Long>
public typealias LongGaugeCallback = (LongAsyncMeasurement) -> Unit
public typealias LongUpDownCounterCallback = (LongAsyncMeasurement) -> Unit

public typealias DoubleAsyncMeasurement = AsyncMeasurement<Double>
public typealias DoubleGaugeCallback = (DoubleAsyncMeasurement) -> Unit

/**
* A handle to a registered guage
* A handle to a registered async measurement (e.g. Gauge or AsyncUpDownCounter)
*/
public interface GaugeHandle {
public interface AsyncMeasurementHandle {
/**
* Stop recording this gauge value. The registered callback function will
* Stop recording this async value. The registered callback function will
* stop being invoked after calling this function.
*/
public fun stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ public interface Meter {
description: String? = null,
): UpDownCounter

/**
* Create a new async up down counter.
*
* @param name the instrument name
* @param callback the callback to invoke when reading the counter value. NOTE: Unlike synchronous counters that
* record delta values, async measurements record absolute/current values.
* @param units the unit of measure
* @param description the human-readable description of the measurement
* @return a [AsyncMeasurementHandle] which can be used for de-registering the counter
* and stopping the callback from being invoked.
*/
public fun createAsyncUpDownCounter(
name: String,
callback: LongUpDownCounterCallback,
units: String? = null,
description: String? = null,
): AsyncMeasurementHandle

/**
* Create a new [MonotonicCounter]
*
Expand Down Expand Up @@ -69,15 +87,15 @@ public interface Meter {
* @param callback the callback to invoke when reading the gauge value
* @param units the unit of measure
* @param description the human-readable description of the measurement
* @return a [GaugeHandle] which can be used for de-registering the gauge
* @return a [AsyncMeasurementHandle] which can be used for de-registering the gauge
* and stopping the callback from being invoked.
*/
public fun createLongGauge(
name: String,
callback: LongGaugeCallback,
units: String? = null,
description: String? = null,
): GaugeHandle
): AsyncMeasurementHandle

/**
* Create a new Gauge.
Expand All @@ -86,13 +104,13 @@ public interface Meter {
* @param callback the callback to invoke when reading the gauge value
* @param units the unit of measure
* @param description the human-readable description of the measurement
* @return a [GaugeHandle] which can be used for de-registering the gauge
* @return a [AsyncMeasurementHandle] which can be used for de-registering the gauge
* and stopping the callback from being invoked.
*/
public fun createDoubleGauge(
name: String,
callback: DoubleGaugeCallback,
units: String? = null,
description: String? = null,
): GaugeHandle
): AsyncMeasurementHandle
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ internal object NoOpMeterProvider : MeterProvider {
private object NoOpMeter : Meter {
override fun createUpDownCounter(name: String, units: String?, description: String?): UpDownCounter = NoOpUpDownCounter

override fun createAsyncUpDownCounter(
name: String,
callback: LongUpDownCounterCallback,
units: String?,
description: String?,
): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle

override fun createMonotonicCounter(name: String, units: String?, description: String?): MonotonicCounter = NoOpMonotonicCounter

override fun createLongHistogram(name: String, units: String?, description: String?): LongHistogram = NoOpLongHistogram
Expand All @@ -29,14 +36,14 @@ private object NoOpMeter : Meter {
callback: LongGaugeCallback,
units: String?,
description: String?,
): GaugeHandle = NoOpGaugeHandle
): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle

override fun createDoubleGauge(
name: String,
callback: DoubleGaugeCallback,
units: String?,
description: String?,
): GaugeHandle = NoOpGaugeHandle
): AsyncMeasurementHandle = NoOpAsyncMeasurementHandle
}

private object NoOpUpDownCounter : UpDownCounter {
Expand All @@ -53,6 +60,6 @@ private object NoOpDoubleHistogram : DoubleHistogram {
override fun record(value: Double, attributes: Attributes, context: Context?) {}
}

private object NoOpGaugeHandle : GaugeHandle {
private object NoOpAsyncMeasurementHandle : AsyncMeasurementHandle {
override fun stop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ private class OtelMeter(
return OtelUpDownCounterImpl(counter)
}

override fun createAsyncUpDownCounter(
name: String,
callback: LongUpDownCounterCallback,
units: String?,
description: String?,
): AsyncMeasurementHandle {
val observer = otelMeter.upDownCounterBuilder(name)
.apply {
description?.let { setDescription(it) }
units?.let { setUnit(units) }
}
.buildWithCallback {
callback(OtelLongAsyncMeasurementImpl(it))
}

return OtelAsyncMeasurementHandleImpl(observer)
}

override fun createMonotonicCounter(name: String, units: String?, description: String?): MonotonicCounter {
val counter = otelMeter.counterBuilder(name)
.apply {
Expand Down Expand Up @@ -73,7 +91,7 @@ private class OtelMeter(
callback: LongGaugeCallback,
units: String?,
description: String?,
): GaugeHandle {
): AsyncMeasurementHandle {
val observer = otelMeter.gaugeBuilder(name)
.apply {
description?.let { setDescription(it) }
Expand All @@ -83,15 +101,15 @@ private class OtelMeter(
.buildWithCallback {
callback(OtelLongAsyncMeasurementImpl(it))
}
return OtelGaugeHandleImpl(observer)
return OtelAsyncMeasurementHandleImpl(observer)
}

override fun createDoubleGauge(
name: String,
callback: DoubleGaugeCallback,
units: String?,
description: String?,
): GaugeHandle {
): AsyncMeasurementHandle {
val observer = otelMeter.gaugeBuilder(name)
.apply {
description?.let { setDescription(it) }
Expand All @@ -100,7 +118,7 @@ private class OtelMeter(
.buildWithCallback {
callback(OtelDoubleAsyncMeasurementImpl(it))
}
return OtelGaugeHandleImpl(observer)
return OtelAsyncMeasurementHandleImpl(observer)
}
}

Expand Down Expand Up @@ -157,7 +175,7 @@ private class OtelDoubleHistogramImpl(
}

private class OtelLongAsyncMeasurementImpl(private val measurement: ObservableLongMeasurement) : LongAsyncMeasurement {
override fun record(value: Long, attributes: Attributes, context: Context) {
override fun record(value: Long, attributes: Attributes, context: Context?) {
if (attributes.isEmpty) {
measurement.record(value)
} else {
Expand All @@ -167,7 +185,7 @@ private class OtelLongAsyncMeasurementImpl(private val measurement: ObservableLo
}

private class OtelDoubleAsyncMeasurementImpl(private val measurement: ObservableDoubleMeasurement) : DoubleAsyncMeasurement {
override fun record(value: Double, attributes: Attributes, context: Context) {
override fun record(value: Double, attributes: Attributes, context: Context?) {
if (attributes.isEmpty) {
measurement.record(value)
} else {
Expand All @@ -176,7 +194,7 @@ private class OtelDoubleAsyncMeasurementImpl(private val measurement: Observable
}
}

private class OtelGaugeHandleImpl(private val otelHandle: AutoCloseable) : GaugeHandle {
private class OtelAsyncMeasurementHandleImpl(private val otelHandle: AutoCloseable) : AsyncMeasurementHandle {
override fun stop() {
otelHandle.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConf
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Builder : aws/smithy/kotlin/runtime/http/engine/HttpClientEngineConfigImpl$BuilderImpl {
public fun <init> ()V
public final fun getMaxConnectionsPerHost-0hXNFcg ()Lkotlin/UInt;
public fun getTelemetryProvider ()Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;
public final fun setMaxConnectionsPerHost-ExVfyTY (Lkotlin/UInt;)V
public fun setTelemetryProvider (Laws/smithy/kotlin/runtime/telemetry/TelemetryProvider;)V
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig$Companion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kotlin {
dependencies {
implementation(project(":runtime:runtime-core"))
api(project(":runtime:protocol:http-client"))
implementation(project(":runtime:observability:telemetry-defaults"))

implementation("com.squareup.okhttp3:okhttp:$okHttpVersion")
implementation("com.squareup.okhttp3:okhttp-coroutines:$okHttpVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetricAttributes
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.net.HostResolver
import aws.smithy.kotlin.runtime.net.toHostAddress
Expand All @@ -13,13 +12,9 @@ import aws.smithy.kotlin.runtime.telemetry.logging.LoggerProvider
import aws.smithy.kotlin.runtime.telemetry.logging.MessageSupplier
import aws.smithy.kotlin.runtime.telemetry.logging.getLogger
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import aws.smithy.kotlin.runtime.telemetry.metrics.decrement
import aws.smithy.kotlin.runtime.telemetry.metrics.increment
import aws.smithy.kotlin.runtime.telemetry.telemetryProvider
import aws.smithy.kotlin.runtime.telemetry.trace.SpanStatus
import aws.smithy.kotlin.runtime.telemetry.trace.recordException
import aws.smithy.kotlin.runtime.util.AttributeKey
import aws.smithy.kotlin.runtime.util.get
import okhttp3.*
import java.io.IOException
import java.net.InetAddress
Expand All @@ -29,29 +24,26 @@ import kotlin.time.ExperimentalTime
import kotlin.time.TimeMark
import kotlin.time.TimeSource

private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.okhttp"
internal const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.okhttp"

// see https://square.github.io/okhttp/features/events/#eventlistener for example callback flow
@OptIn(ExperimentalTime::class)
internal class HttpEngineEventListener(
private val pool: ConnectionPool,
private val hr: HostResolver,
private val dispatcher: Dispatcher,
private val metrics: HttpClientMetrics,
call: Call,
) : EventListener() {
private val provider: TelemetryProvider = call.request().tag<SdkRequestTag>()?.callContext?.telemetryProvider ?: TelemetryProvider.None
private val traceSpan = provider.tracerProvider
.getOrCreateTracer(TELEMETRY_SCOPE)
.createSpan("HTTP")
private val logger = call.request().tag<SdkRequestTag>()?.callContext?.logger<OkHttpEngine>() ?: LoggerProvider.None.getLogger<OkHttpEngine>()

private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, provider)

// FIXME - need idle/warm connection usage but requires tracking delta? Better suited for async UpDownCounter...
init {
// listener is created at same time as a call, the call is then enqueued until dispatcher can execute it
metrics.requests.increment(HttpClientMetricAttributes.QueuedRequest)
}
private val logger = call.request().tag<SdkRequestTag>()?.callContext?.logger<OkHttpEngine>() ?: LoggerProvider.None.getLogger<OkHttpEngine>()

// see https://square.github.io/okhttp/features/events/#eventlistener for flow
// FIXME - this isn't actually when a connection is started - this includes time in queue
// see https://github.com/square/okhttp/blob/7c92ed0879477eddb2fce6b4066d151525d5687f/okhttp/src/jvmMain/kotlin/okhttp3/internal/connection/RealCall.kt#L167-L175
private var connectStart: TimeMark? = null

private inline fun trace(crossinline msg: MessageSupplier) {
Expand All @@ -64,19 +56,21 @@ internal class HttpEngineEventListener(

override fun callStart(call: Call) {
connectStart = TimeSource.Monotonic.markNow()
metrics.requests.decrement(HttpClientMetricAttributes.QueuedRequest)
metrics.requests.increment(HttpClientMetricAttributes.InFlightRequest)
metrics.queuedRequests = dispatcher.queuedCallsCount().toLong()
metrics.inFlightRequests = dispatcher.runningCallsCount().toLong()
trace { "call started" }
}

override fun callEnd(call: Call) {
metrics.requests.decrement(HttpClientMetricAttributes.InFlightRequest)
metrics.queuedRequests = dispatcher.queuedCallsCount().toLong()
metrics.inFlightRequests = dispatcher.runningCallsCount().toLong()
trace { "call complete" }
traceSpan.close()
}

override fun callFailed(call: Call, ioe: IOException) {
metrics.requests.decrement(HttpClientMetricAttributes.InFlightRequest)
metrics.queuedRequests = dispatcher.queuedCallsCount().toLong()
metrics.inFlightRequests = dispatcher.runningCallsCount().toLong()
trace(ioe) { "call failed" }
traceSpan.recordException(ioe, true)
traceSpan.setStatus(SpanStatus.ERROR)
Expand All @@ -103,7 +97,8 @@ internal class HttpEngineEventListener(
}

override fun connectionAcquired(call: Call, connection: Connection) {
metrics.connectionUsage.increment(HttpClientMetricAttributes.AcquiredConnection)
metrics.acquiredConnections = pool.connectionCount().toLong()
metrics.idleConnections = pool.idleConnectionCount().toLong()
connectStart?.let {
metrics.connectionAcquireDuration.record(it.elapsedNow().inWholeMilliseconds)
}
Expand All @@ -113,7 +108,8 @@ internal class HttpEngineEventListener(
}

override fun connectionReleased(call: Call, connection: Connection) {
metrics.connectionUsage.decrement(HttpClientMetricAttributes.AcquiredConnection)
metrics.acquiredConnections = pool.connectionCount().toLong()
metrics.idleConnections = pool.idleConnectionCount().toLong()
val connId = System.identityHashCode(connection)
trace { "connection released: conn(id=$connId)=$connection; connPool: total=${pool.connectionCount()}, idle=${pool.idleConnectionCount()}" }
}
Expand Down
Loading

0 comments on commit ccb3234

Please sign in to comment.