Skip to content

Commit

Permalink
[CELEBORN-1804] Task level metric group structure of RemoteShuffleEnv…
Browse files Browse the repository at this point in the history
…ironment should use Shuffle.Remote.<Input|Output>.Buffers
  • Loading branch information
SteNicholas committed Dec 25, 2024
1 parent fde6365 commit 70dbdb4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;
Expand Down Expand Up @@ -119,13 +120,13 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo

public ShuffleIOOwnerContext createShuffleIOOwnerContext(
String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) {
MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
MetricGroup remoteGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
return new ShuffleIOOwnerContext(
checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
nettyGroup.addGroup(METRIC_GROUP_INPUT));
remoteGroup.addGroup(METRIC_GROUP_OUTPUT),
remoteGroup.addGroup(METRIC_GROUP_INPUT));
}

public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;

import java.time.Duration;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -65,7 +63,6 @@ protected static AbstractRemoteShuffleServiceParameters initializePreCreateShuff
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(numBuffers, bufferSize, requestSegmentsTimeout);

registerShuffleMetrics(metricGroup, networkBufferPool);
CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
AbstractRemoteShuffleServiceParameters result =
new AbstractRemoteShuffleServiceParameters(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.metric;

import org.apache.flink.metrics.MetricGroup;

/** Factory for remote shuffle service metrics. */
public class RemoteShuffleMetricFactory {

// task level metric group structure: Shuffle.Remote.<Input|Output>.Buffers

private static final String METRIC_GROUP_SHUFFLE = "Shuffle";
private static final String METRIC_GROUP_REMOTE = "Remote";

private RemoteShuffleMetricFactory() {}

public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) {
return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
}
}

0 comments on commit 70dbdb4

Please sign in to comment.