Skip to content

Commit

Permalink
Add metric representing failed connection attempts on proxy and node …
Browse files Browse the repository at this point in the history
…levels (#136)

Fixes #135.
  • Loading branch information
lukasz-antoniak authored Nov 29, 2024
1 parent 23f1a1d commit e6411a5
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 10 deletions.
2 changes: 2 additions & 0 deletions integration-tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func checkMetrics(
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedWritesOnTarget)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsTarget)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedReadsOrigin)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsOrigin)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.FailedConnectionsTarget)))

require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightWrites)))
require.Contains(t, lines, fmt.Sprintf("%v 0", getPrometheusName(prefix, metrics.InFlightReadsOrigin)))
Expand Down
98 changes: 98 additions & 0 deletions integration-tests/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package integration_tests
import (
"context"
"fmt"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/datastax/zdm-proxy/integration-tests/client"
"github.com/datastax/zdm-proxy/integration-tests/setup"
"github.com/datastax/zdm-proxy/integration-tests/utils"
"github.com/datastax/zdm-proxy/proxy/pkg/config"
"github.com/datastax/zdm-proxy/proxy/pkg/health"
"github.com/datastax/zdm-proxy/proxy/pkg/httpzdmproxy"
"github.com/datastax/zdm-proxy/proxy/pkg/metrics"
"github.com/datastax/zdm-proxy/proxy/pkg/runner"
"github.com/datastax/zdm-proxy/proxy/pkg/zdmproxy"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -42,6 +50,10 @@ func TestWithHttpHandlers(t *testing.T) {
t.Run("testHttpEndpointsWithUnavailableNode", func(t *testing.T) {
testHttpEndpointsWithUnavailableNode(t, metricsHandler, readinessHandler)
})

t.Run("testMetricsWithUnavailableNode", func(t *testing.T) {
testMetricsWithUnavailableNode(t, metricsHandler)
})
}

func testHttpEndpointsWithProxyNotInitialized(
Expand Down Expand Up @@ -137,6 +149,92 @@ func testHttpEndpointsWithProxyInitialized(
require.Equal(t, health.UP, report.Status)
}

func testMetricsWithUnavailableNode(
t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback) {

simulacronSetup, err := setup.NewSimulacronTestSetupWithSession(t, false, false)
require.Nil(t, err)
defer simulacronSetup.Cleanup()

conf := setup.NewTestConfig(simulacronSetup.Origin.GetInitialContactPoint(), simulacronSetup.Target.GetInitialContactPoint())
modifyConfForHealthTests(conf, 2)

waitGroup := &sync.WaitGroup{}
ctx, cancelFunc := context.WithCancel(context.Background())

defer waitGroup.Wait()
defer cancelFunc()

srv := httpzdmproxy.StartHttpServer(fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort), waitGroup)
defer func(srv *http.Server, ctx context.Context) {
err := srv.Shutdown(ctx)
if err != nil {
log.Error("Failed to shutdown metrics server:", err.Error())
}
}(srv, ctx)

b := &backoff.Backoff{
Factor: 2,
Jitter: false,
Min: 100 * time.Millisecond,
Max: 500 * time.Millisecond,
}
proxy := atomic.Value{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
p, err := zdmproxy.RunWithRetries(conf, ctx, b)
if err == nil {
metricsHandler.SetHandler(p.GetMetricHandler().GetHttpHandler())
proxy.Store(&p)
<-ctx.Done()
p.Shutdown()
}
}()

httpAddr := fmt.Sprintf("%s:%d", conf.MetricsAddress, conf.MetricsPort)

// check that metrics endpoint has been initialized
utils.RequireWithRetries(t, func() (err error, fatal bool) {
fatal = false
err = utils.CheckMetricsEndpointResult(httpAddr, true)
return
}, 10, 100*time.Millisecond)

// stop origin cluster
err = simulacronSetup.Origin.DisableConnectionListener()
require.Nil(t, err, "failed to disable origin connection listener: %v", err)
err = simulacronSetup.Origin.DropAllConnections()
require.Nil(t, err, "failed to drop origin connections: %v", err)

// send a request
testClient, err := client.NewTestClient(context.Background(), "127.0.0.1:14002")
require.Nil(t, err)
queryMsg := &message.Query{
Query: "SELECT * FROM table1",
}
_, _, _ = testClient.SendMessage(context.Background(), primitive.ProtocolVersion4, queryMsg)

utils.RequireWithRetries(t, func() (err error, fatal bool) {
// expect connection failure to origin cluster
statusCode, rspStr, err := utils.GetMetrics(httpAddr)
require.Nil(t, err)
require.Equal(t, http.StatusOK, statusCode)
originEndpoint := fmt.Sprintf("%v:9042", simulacronSetup.Origin.GetInitialContactPoint())
// search for:
// zdm_proxy_failed_connections_total{cluster="origin"} 1
// zdm_origin_failed_connections_total{node="127.0.0.40:9042"} 1
if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusName("zdm", metrics.FailedConnectionsOrigin))) {
err = fmt.Errorf("did not observe failed connection attempts at proxy metric")
} else if !strings.Contains(rspStr, fmt.Sprintf("%v 1", getPrometheusNameWithNodeLabel("zdm", metrics.FailedOriginConnections, originEndpoint))) {
err = fmt.Errorf("did not observe failed connection attempts at node metric")
} else {
err = nil
}
return
}, 10, 500*time.Millisecond)
}

func testHttpEndpointsWithUnavailableNode(
t *testing.T, metricsHandler *httpzdmproxy.HandlerWithFallback, healthHandler *httpzdmproxy.HandlerWithFallback) {

Expand Down
15 changes: 14 additions & 1 deletion proxy/pkg/metrics/node_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,26 @@ var (
"origin_connections_total",
"Number of connections to Origin Cassandra currently open",
)
FailedOriginConnections = NewMetric(
"origin_failed_connections_total",
"Number of failed connection attempts to Origin Cassandra",
)
OpenTargetConnections = NewMetric(
"target_connections_total",
"Number of connections to Target Cassandra currently open",
)
FailedTargetConnections = NewMetric(
"target_failed_connections_total",
"Number of failed connection attempts to Target Cassandra",
)
OpenAsyncConnections = NewMetric(
"async_connections_total",
"Number of connections currently open for async requests",
)
FailedAsyncConnections = NewMetric(
"async_failed_connections_total",
"Number of failed connection attempts for async requests",
)

InFlightRequestsAsync = NewMetric(
"async_inflight_requests_total",
Expand Down Expand Up @@ -283,7 +295,8 @@ type NodeMetricsInstance struct {
ReadDurations Histogram
WriteDurations Histogram

OpenConnections Gauge
OpenConnections Gauge
FailedConnections Counter

InFlightRequests Gauge

Expand Down
30 changes: 25 additions & 5 deletions proxy/pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ const (
failedRequestsClusterTarget = "target"
failedRequestsClusterBoth = "both"

failedConnectionsName = "proxy_failed_connections_total"
failedConnectionsDescription = "Running total of failed requests due to inability to connect to given cluster"
failedConnectionsClusterLabel = "cluster"

failedReadsName = "proxy_failed_reads_total"
failedReadsDescription = "Running total of failed reads"
failedReadsClusterLabel = "cluster"
Expand All @@ -28,6 +32,20 @@ const (
)

var (
FailedConnectionsOrigin = NewMetricWithLabels(
failedConnectionsName,
failedConnectionsDescription,
map[string]string{
failedConnectionsClusterLabel: failedRequestsClusterOrigin,
},
)
FailedConnectionsTarget = NewMetricWithLabels(
failedConnectionsName,
failedConnectionsDescription,
map[string]string{
failedConnectionsClusterLabel: failedRequestsClusterTarget,
},
)
FailedReadsOrigin = NewMetricWithLabels(
failedReadsName,
failedReadsDescription,
Expand Down Expand Up @@ -124,11 +142,13 @@ var (
)

type ProxyMetrics struct {
FailedReadsOrigin Counter
FailedReadsTarget Counter
FailedWritesOnOrigin Counter
FailedWritesOnTarget Counter
FailedWritesOnBoth Counter
FailedConnectionsOrigin Counter
FailedConnectionsTarget Counter
FailedReadsOrigin Counter
FailedReadsTarget Counter
FailedWritesOnOrigin Counter
FailedWritesOnTarget Counter
FailedWritesOnBoth Counter

PSCacheSize GaugeFunc
PSCacheMissCount Counter
Expand Down
2 changes: 2 additions & 0 deletions proxy/pkg/zdmproxy/clienthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func NewClientHandler(
clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx,
false, nil, handshakeDone, originFrameProcessor, originCCProtoVer)
if err != nil {
metricHandler.GetProxyMetrics().FailedConnectionsOrigin.Add(1)
clientHandlerCancelFunc()
return nil, err
}
Expand All @@ -218,6 +219,7 @@ func NewClientHandler(
clientHandlerContext, clientHandlerCancelFunc, respChannel, readScheduler, writeScheduler, requestsDoneCtx,
false, nil, handshakeDone, targetFrameProcessor, targetCCProtoVer)
if err != nil {
metricHandler.GetProxyMetrics().FailedConnectionsTarget.Add(1)
clientHandlerCancelFunc()
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions proxy/pkg/zdmproxy/clusterconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,20 @@ func (cc *ClusterConnector) run() {
func openConnectionToCluster(connInfo *ClusterConnectionInfo, context context.Context, connectorType ClusterConnectorType, nodeMetrics *metrics.NodeMetrics) (net.Conn, context.Context, error) {
clusterType := connInfo.connConfig.GetClusterType()
log.Infof("[%s] Opening request connection to %v (%v).", connectorType, clusterType, connInfo.endpoint.GetEndpointIdentifier())
nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType)
if err != nil {
log.Errorf("Failed to track open connection metrics for endpoint %v: %v.", connInfo.endpoint.GetEndpointIdentifier(), err)
}

conn, timeoutCtx, err := openConnection(connInfo.connConfig, connInfo.endpoint, context, true)
if err != nil {
if nodeMetricsInstance != nil {
nodeMetricsInstance.FailedConnections.Add(1)
}
return nil, timeoutCtx, err
}

nodeMetricsInstance, err := GetNodeMetricsByClusterConnector(nodeMetrics, connectorType)
if err != nil {
log.Errorf("Failed to track open connection metrics for conn %v: %v.", conn.RemoteAddr().String(), err)
} else {
if nodeMetricsInstance != nil {
nodeMetricsInstance.OpenConnections.Add(1)
}

Expand Down
30 changes: 30 additions & 0 deletions proxy/pkg/zdmproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,16 @@ func sleepWithContext(d time.Duration, ctx context.Context, reconnectCh chan boo
}

func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*metrics.ProxyMetrics, error) {
failedConnectionsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsOrigin)
if err != nil {
return nil, err
}

failedConnectionsTarget, err := metricFactory.GetOrCreateCounter(metrics.FailedConnectionsTarget)
if err != nil {
return nil, err
}

failedReadsOrigin, err := metricFactory.GetOrCreateCounter(metrics.FailedReadsOrigin)
if err != nil {
return nil, err
Expand Down Expand Up @@ -760,6 +770,8 @@ func (p *ZdmProxy) CreateProxyMetrics(metricFactory metrics.MetricFactory) (*met
}

proxyMetrics := &metrics.ProxyMetrics{
FailedConnectionsOrigin: failedConnectionsOrigin,
FailedConnectionsTarget: failedConnectionsTarget,
FailedReadsOrigin: failedReadsOrigin,
FailedReadsTarget: failedReadsTarget,
FailedWritesOnOrigin: failedWritesOnOrigin,
Expand Down Expand Up @@ -841,6 +853,11 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
return nil, err
}

failedOriginConnections, err := metrics.CreateCounterNodeMetric(metricFactory, originNodeDescription, metrics.FailedOriginConnections)
if err != nil {
return nil, err
}

// inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics)
inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil)
if err != nil {
Expand All @@ -865,6 +882,7 @@ func (p *ZdmProxy) CreateOriginNodeMetrics(
ReadDurations: originReadRequestDuration,
WriteDurations: originWriteRequestDuration,
OpenConnections: openOriginConnections,
FailedConnections: failedOriginConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: originUsedStreamIds,
}, nil
Expand Down Expand Up @@ -932,6 +950,11 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
return nil, err
}

failedAsyncConnections, err := metrics.CreateCounterNodeMetric(metricFactory, asyncNodeDescription, metrics.FailedAsyncConnections)
if err != nil {
return nil, err
}

inflightRequestsAsync, err := metrics.CreateGaugeNodeMetric(metricFactory, asyncNodeDescription, metrics.InFlightRequestsAsync)
if err != nil {
return nil, err
Expand All @@ -955,6 +978,7 @@ func (p *ZdmProxy) CreateAsyncNodeMetrics(
ReadDurations: asyncReadRequestDuration,
WriteDurations: asyncWriteRequestDuration,
OpenConnections: openAsyncConnections,
FailedConnections: failedAsyncConnections,
InFlightRequests: inflightRequestsAsync,
UsedStreamIds: asyncUsedStreamIds,
}, nil
Expand Down Expand Up @@ -1022,6 +1046,11 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
return nil, err
}

failedTargetConnections, err := metrics.CreateCounterNodeMetric(metricFactory, targetNodeDescription, metrics.FailedTargetConnections)
if err != nil {
return nil, err
}

// inflight requests metric for non async requests are implemented as proxy level metrics (not node metrics)
inflightRequests, err := noopmetrics.NewNoopMetricFactory().GetOrCreateGauge(nil)
if err != nil {
Expand All @@ -1046,6 +1075,7 @@ func (p *ZdmProxy) CreateTargetNodeMetrics(
ReadDurations: targetReadRequestDuration,
WriteDurations: targetWriteRequestDuration,
OpenConnections: openTargetConnections,
FailedConnections: failedTargetConnections,
InFlightRequests: inflightRequests,
UsedStreamIds: targetUsedStreamIds,
}, nil
Expand Down

0 comments on commit e6411a5

Please sign in to comment.