Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 91 additions & 60 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems unnecessary change? I do not see new concurrent class involved in this class.


import scala.Tuple2;
import scala.reflect.ClassTag$;
Expand Down Expand Up @@ -63,6 +60,7 @@
import org.apache.celeborn.common.rpc.RpcAddress;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcEnv;
import org.apache.celeborn.common.rpc.RpcTimeoutException;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.*;
import org.apache.celeborn.common.write.DataBatches;
Expand All @@ -81,6 +79,7 @@ public class ShuffleClientImpl extends ShuffleClient {

private final int registerShuffleMaxRetries;
private final long registerShuffleRetryWaitMs;
private final long lifecycleManagerRpcTimeoutRetryWaitMs;
private final int maxReviveTimes;
private final boolean testRetryRevive;
private final int pushBufferMaxSize;
Expand Down Expand Up @@ -179,6 +178,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
this.userIdentifier = userIdentifier;
registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry();
registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs();
lifecycleManagerRpcTimeoutRetryWaitMs = conf.clientCallLifecycleManagerRetryWaitMs();
maxReviveTimes = conf.clientPushMaxReviveTimes();
testRetryRevive = conf.testRetryRevive();
pushBufferMaxSize = conf.clientPushBufferMaxSize();
Expand Down Expand Up @@ -667,7 +667,7 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffleInternal(
StatusCode lastFailedStatusCode = null;
while (numRetries > 0) {
try {
PbRegisterShuffleResponse response = callable.call();
PbRegisterShuffleResponse response = callLifecycleManagerWithRetry(callable);
StatusCode respStatus = Utils.toStatusCode(response.getStatus());
if (StatusCode.SUCCESS.equals(respStatus)) {
ConcurrentHashMap<Integer, PartitionLocation> result = JavaUtils.newConcurrentHashMap();
Expand Down Expand Up @@ -1700,14 +1700,14 @@ private void mapEndInternal(
throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = getPushState(mapKey);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

try {
limitZeroInFlight(mapKey, pushState);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

MapperEndResponse response =
lifecycleManagerRef.askSync(
new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId),
ClassTag$.MODULE$.apply(MapperEndResponse.class));
callLifecycleManagerWithRetry(
() ->
lifecycleManagerRef.askSync(
new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId),
ClassTag$.MODULE$.apply(MapperEndResponse.class)));
if (response.status() != StatusCode.SUCCESS) {
throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status());
}
Expand Down Expand Up @@ -1741,55 +1741,48 @@ public boolean cleanupShuffle(int shuffleId) {

protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
int shuffleId, boolean isSegmentGranularityVisible) {
{
long getReducerFileGroupStartTime = System.nanoTime();
String exceptionMsg = null;
long getReducerFileGroupStartTime = System.nanoTime();
String exceptionMsg = null;
if (lifecycleManagerRef != null) {
try {
if (lifecycleManagerRef == null) {
exceptionMsg = "Driver endpoint is null!";
logger.warn(exceptionMsg);
} else {
GetReducerFileGroup getReducerFileGroup =
new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);

GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));

switch (response.status()) {
case SUCCESS:
logger.info(
"Shuffle {} request reducer file group success using {} ms, result partition size {}.",
shuffleId,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime),
response.fileGroup().size());
return Tuple2.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null);
case SHUFFLE_NOT_REGISTERED:
logger.warn(
"Request {} return {} for {}.",
getReducerFileGroup,
response.status(),
shuffleId);
// return empty result
return Tuple2.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null);
case STAGE_END_TIME_OUT:
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(
"Request %s return %s for %s.",
getReducerFileGroup, response.status(), shuffleId);
logger.warn(exceptionMsg);
break;
default: // fall out
}
GetReducerFileGroup getReducerFileGroup =
new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);
GetReducerFileGroupResponse response =
callLifecycleManagerWithRetry(
() ->
lifecycleManagerRef.askSync(
getReducerFileGroup,
conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)));
switch (response.status()) {
case SUCCESS:
logger.info(
"Shuffle {} request reducer file group success using {} ms, result partition size {}.",
shuffleId,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime),
response.fileGroup().size());
return Tuple2.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null);
case SHUFFLE_NOT_REGISTERED:
logger.warn(
"Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId);
// return empty result
return Tuple2.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null);
case STAGE_END_TIME_OUT:
break;
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(
"Request %s return %s for %s.",
getReducerFileGroup, response.status(), shuffleId);
logger.warn(exceptionMsg);
break;
default: // fall out
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
Expand All @@ -1798,8 +1791,11 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(
logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e);
exceptionMsg = e.getMessage();
}
return Tuple2.apply(null, exceptionMsg);
} else {
exceptionMsg = "Driver endpoint is null!";
logger.warn(exceptionMsg);
}
return Tuple2.apply(null, exceptionMsg);
}

@Override
Expand Down Expand Up @@ -1929,10 +1925,45 @@ public void shutdown() {
public void setupLifecycleManagerRef(String host, int port) {
logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port);
lifecycleManagerRef =
rpcEnv.setupEndpointRef(new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP);
callLifecycleManagerWithRetry(
() ->
rpcEnv.setupEndpointRef(
new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP));
initDataClientFactoryIfNeeded();
}

public <T> T callLifecycleManagerWithRetry(Callable<T> callable) {
return callLifecycleManagerWithRetry(callable, 3);
}

public <T> T callLifecycleManagerWithRetry(Callable<T> callable, int numRetries) {
T result;
while (numRetries > 0) {
numRetries--;
try {
result = callable.call();
return result;
} catch (Exception error) {
if (error instanceof RpcTimeoutException && numRetries > 0) {
logger.warn(
"RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries);
try {
Random random = new Random();
int waitTimeBound = (int) lifecycleManagerRpcTimeoutRetryWaitMs;
long retryWaitMs = random.nextInt(waitTimeBound);
TimeUnit.MILLISECONDS.sleep(retryWaitMs);
} catch (InterruptedException e) {
break;
}
} else {
logger.error("Exception raised while calling LifecycleManager");
break;
}
}
}
return null;
}

@Override
public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) {
lifecycleManagerRef = endpointRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
def clientCallLifecycleManagerRetryWaitMs: Long = get(CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT)
def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
Expand Down Expand Up @@ -4884,6 +4885,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")

val CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.client.callLifecycleManager.retryWait")
.withAlternative("celeborn.shuffle.callLifecycleManager.retryWait")
.categories("client")
.version("0.6.0")
.doc("Wait time before next retry if call LifecycleManager failed.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.client.reserveSlots.maxRetries")
.withAlternative("celeborn.slots.reserve.maxRetries")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ license: |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
| celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | |
| celeborn.client.callLifecycleManager.retryWait | 1s | false | Wait time before next retry if call LifecycleManager failed. | 0.6.0 | celeborn.shuffle.callLifecycleManager.retryWait |
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
| celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ license: |
| celeborn.&lt;module&gt;.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |
| celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
| celeborn.&lt;module&gt;.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @SteNicholas Seems the doc generation depends on the developer environment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can not pass the GA, need to revert it.

| celeborn.&lt;module&gt;.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | | |
| celeborn.&lt;module&gt;.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | 0.2.0 | |
Expand Down
Loading