Skip to content

Commit

Permalink
add ut and rebase main
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 4, 2024
1 parent 11cebf2 commit a52901f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.celeborn.common.metrics.source

import java.lang
import java.util.{Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -117,7 +116,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels), NamedGauge(name, gauge, labels ++ staticLabels))
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
Expand Down Expand Up @@ -484,7 +484,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder
val appMetricsSnapshot = ArrayBuffer[String]()
leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot)
Expand All @@ -510,24 +511,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
if (leftNum <= 0) {
return 0
}
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
sb.append(getCounterMetrics(c))
case g: NamedGauge[_] =>
sb.append(getGaugeMetrics(g))
case m: NamedMeter =>
sb.append(getMeterMetrics(m))
case h: NamedHistogram =>
sb.append(getHistogramMetrics(h))
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
sb.append(getTimerMetrics(t))
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
sb.append(s.toString)
var addNum = 0
val appCount0Metrics = ArrayBuffer[String]()
for (m <- metricList if addNum < leftNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CelebornSourceSuite extends CelebornFunSuite {
mockSource.stopTimer("Timer1", "key1")
mockSource.stopTimer("Timer2", "key2", user3)

mockSource.timerMetricsMap.add("testTimerMetricsMap")
mockSource.timerMetrics.add("testTimerMetricsMap")

val res = mockSource.getMetrics()
var extraLabelsStr = extraLabels
Expand Down Expand Up @@ -143,4 +143,22 @@ class CelebornSourceSuite extends CelebornFunSuite {
assert(!res3.contains(exps3(i)))
}
}

test("test getAndClearTimerMetrics in timerMetrics") {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val role = "mock"
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val exp1 = "testTimerMetrics1"
val exp2 = "testTimerMetrics2"
mockSource.timerMetrics.add(exp1)
val res1 = mockSource.getMetrics()
mockSource.timerMetrics.add(exp2)
val res2 = mockSource.getMetrics()

assert(res1.contains(exp1) && !res1.contains(exp2))
assert(res2.contains(exp2) && !res2.contains(exp1))
}
}

0 comments on commit a52901f

Please sign in to comment.