Skip to content

Commit

Permalink
[server][dvc] Retry/Skip hosts that fail to connect or transfer files…
Browse files Browse the repository at this point in the history
… during blob transfer (#1218)
  • Loading branch information
jingy-li authored Oct 9, 2024
1 parent 1ce7d5e commit 5cc32a7
Show file tree
Hide file tree
Showing 11 changed files with 438 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import com.linkedin.venice.exceptions.VenicePeersNotFoundException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -19,6 +26,8 @@
*/
public class NettyP2PBlobTransferManager implements P2PBlobTransferManager<Void> {
private static final Logger LOGGER = LogManager.getLogger(NettyP2PBlobTransferManager.class);
protected static final int MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST = 3;
protected static final int MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN = 60;
private final P2PBlobTransferService blobTransferService;
// netty client is responsible to make requests against other peers for blob fetching
protected final NettyFileTransferClient nettyClient;
Expand All @@ -39,31 +48,125 @@ public void start() throws Exception {
blobTransferService.start();
}

/**
* Get the blobs for the given storeName and partition
* error cases:
* 1. [Fatal Case] If no peers info are found for the requested blob, a VenicePeersNotFoundException is thrown.
* In this case, blob transfer is not used for bootstrapping at all.
* 2. If one host connect error, it will throw VenicePeersCannotConnectException and retry connecting to the peer again
* After MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST times, if still failed to connect, move to the next possible host.
* 3. If the connected host does not have the requested file,
* a VeniceBlobTransferFileNotFoundException is thrown, and the process moves on to the next available host.
* 4. [Fatal Case] If any unexpected exception occurs, such as InterruptedException, ExecutionException, or TimeoutException
* during the file/metadata transfer, a VeniceException is thrown, and blob transfer is skipped for bootstrapping to save time.
* 5. [Fatal Case] If all peers fail to connect or have no snapshot, a VenicePeersNotFoundException is thrown,
* and Kafka is used for bootstrapping instead.
*
* success case:
* 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned.
*
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @return the InputStream of the blob
* @throws VenicePeersNotFoundException
*/
@Override
public CompletionStage<InputStream> get(String storeName, int version, int partition)
throws VenicePeersNotFoundException {
CompletionStage<InputStream> inputStream;
// error case 1: no peers are found for the requested blob
BlobPeersDiscoveryResponse response = peerFinder.discoverBlobPeers(storeName, version, partition);
if (response == null || response.isError()) {
throw new VenicePeersNotFoundException("Failed to obtain the peers for the requested blob");
}

List<String> discoverPeers = response.getDiscoveryResult();
if (discoverPeers == null || discoverPeers.isEmpty()) {
throw new VenicePeersNotFoundException("No peers found for the requested blob");
}
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);

Instant startTime = Instant.now();
for (String peer: discoverPeers) {
try {
// TODO: add some retry logic or strategy to choose the peers differently in case of failure
// instanceName comes as a format of <hostName>_<applicationPort>
String chosenHost = peer.split("_")[0];
LOGGER.info("Chosen host: {}", chosenHost);
inputStream = nettyClient.get(chosenHost, storeName, version, partition);
return inputStream;
} catch (Exception e) {
LOGGER.warn("Failed to connect to peer: {}", peer, e);
String chosenHost = peer.split("_")[0];
int retryCount = 0;
while (retryCount < MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST) {
try {
// instanceName comes as a format of <hostName>_<applicationPort>
LOGGER.info("Attempt {} to connect to host: {}", retryCount + 1, chosenHost);
CompletableFuture<InputStream> inputStreamFuture =
nettyClient.get(chosenHost, storeName, version, partition).toCompletableFuture();
InputStream inputStream = inputStreamFuture.get(MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN, TimeUnit.MINUTES);
LOGGER.info(
"Successfully fetched blob from peer {} for store {} partition {} version {} in {} seconds",
peer,
storeName,
partition,
version,
Duration.between(startTime, Instant.now()).getSeconds());
return CompletableFuture.completedFuture(inputStream);
} catch (Exception e) {
if (e.getCause() instanceof VenicePeersConnectionException) {
// error case 2: failed to connect to the peer,
// solution: retry connecting to the peer again up to MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST times
LOGGER.warn(
"Get error when connect to peer: {} for store {} version {} partition {}, retrying {}/{}",
peer,
storeName,
version,
partition,
retryCount + 1,
MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST,
e);
retryCount++;
} else if (e.getCause() instanceof VeniceBlobTransferFileNotFoundException) {
// error case 3: the connected host does not have the requested file,
// solution: move to next possible host
LOGGER.warn(
"Peer {} does not have the requested blob for store {} version {} partition {}, moving to next possible host.",
peer,
storeName,
version,
partition,
e);
break;
} else {
// error case 4:
// other exceptions (InterruptedException, ExecutionException, TimeoutException) that are not expected,
// solution: do not use blob transfer to bootstrap at all for saving time
String errorMessage = String.format(
"Failed to connect to peer %s for partition %d store %s version %d with exception. "
+ "Skip bootstrap the partition from blob transfer.",
peer,
partition,
storeName,
version);
LOGGER.error(errorMessage, e);
throw new VeniceException(errorMessage, e);
}
}
}

LOGGER.warn(
"Failed to connect to peer {} for partition {} store {} version {} after {} attempts, "
+ "moving to next possible host to bootstrap the partition.",
peer,
partition,
storeName,
version,
retryCount);
}
throw new VenicePeersNotFoundException("No valid peers found for the requested blob");

// error case 5: no valid peers found for the requested blob after trying all possible hosts,
// solution: do not use blob at all.
String errorMessage = String.format(
"Failed to connect to any peer for partition %d store %s version %d, after trying all possible hosts.",
partition,
storeName,
version);
LOGGER.warn(errorMessage);
throw new VenicePeersNotFoundException(errorMessage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.davinci.blobtransfer.client;

import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -52,9 +53,10 @@ public void initChannel(SocketChannel ch) {

public CompletionStage<InputStream> get(String host, String storeName, int version, int partition) {
CompletionStage<InputStream> inputStream = new CompletableFuture<>();
// Connects to the remote host
try {
Channel ch = clientBootstrap.connect(host, serverPort).sync().channel();
// Connects to the remote host
Channel ch = connectToHost(host, storeName, version, partition);

// Request to get the blob file and metadata
// Attach the file handler to the pipeline
// Attach the metadata handler to the pipeline
Expand All @@ -65,7 +67,6 @@ public CompletionStage<InputStream> get(String host, String storeName, int versi
// Send a GET request
ch.writeAndFlush(prepareRequest(storeName, version, partition));
} catch (Exception e) {
LOGGER.error("Failed to connect to the host: {}", host, e);
if (!inputStream.toCompletableFuture().isCompletedExceptionally()) {
inputStream.toCompletableFuture().completeExceptionally(e);
}
Expand All @@ -83,4 +84,22 @@ private FullHttpRequest prepareRequest(String storeName, int version, int partit
HttpMethod.GET,
String.format("/%s/%d/%d", storeName, version, partition));
}

/**
* Connects to the host
*/
private Channel connectToHost(String host, String storeName, int version, int partition) {
try {
return clientBootstrap.connect(host, serverPort).sync().channel();
} catch (Exception e) {
String errorMsg = String.format(
"Failed to connect to the host: %s for blob transfer for store: %s, version: %d, partition: %d",
host,
storeName,
version,
partition);
LOGGER.error(errorMsg, e);
throw new VenicePeersConnectionException(errorMsg, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
import com.linkedin.venice.exceptions.VeniceException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
Expand Down Expand Up @@ -62,9 +63,16 @@ public P2PFileTransferClientHandler(
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;

if (!response.status().equals(HttpResponseStatus.OK)) {
throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status());
if (response.status().equals(HttpResponseStatus.NOT_FOUND)) {
throw new VeniceBlobTransferFileNotFoundException(
"Requested files from remote peer are not found. Response: " + response.status());
} else {
throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status());
}
}

// redirect the message to the next handler if it's a metadata transfer
boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response);
if (isMetadataMessage) {
Expand Down Expand Up @@ -155,7 +163,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Exception caught in when transferring files for {}", payload.getFullResourceName());
LOGGER.error(
"Exception caught in when transferring files for {} with cause {}",
payload.getFullResourceName(),
cause);
inputStreamFuture.toCompletableFuture().completeExceptionally(cause);
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -110,32 +108,24 @@ CompletionStage<Void> bootstrapFromBlobs(Store store, int versionNumber, int par
String storeName = store.getName();
String baseDir = serverConfig.getRocksDBPath();
try {
CompletableFuture<InputStream> p2pFuture =
blobTransferManager.get(storeName, versionNumber, partitionId).toCompletableFuture();
LOGGER.info(
"Bootstrapping from blobs for store {}, version {}, partition {}",
blobTransferManager.get(storeName, versionNumber, partitionId).toCompletableFuture();
} catch (VenicePeersNotFoundException e) {
LOGGER.warn(
"No valid peers founds for store {}, version {}, partition {}, giving up the blob transfer bootstrap",
storeName,
versionNumber,
partitionId);
return CompletableFuture.runAsync(() -> {
try {
p2pFuture.get(30, TimeUnit.MINUTES);
} catch (Exception e) {
LOGGER.warn(
"Failed bootstrapping from blobs for store {}, version {}, partition {}",
storeName,
versionNumber,
partitionId,
e);
RocksDBUtils.deletePartitionDir(baseDir, storeName, versionNumber, partitionId);
p2pFuture.cancel(true);
// TODO: close channels
}
});
} catch (VenicePeersNotFoundException e) {
LOGGER.warn("No peers founds for store {}, version {}, partition {}", storeName, versionNumber, partitionId);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
LOGGER.error(
"Failed bootstrapping from blobs for store {}, version {}, partition {} with exception, "
+ "giving up the blob transfer bootstrap.",
storeName,
versionNumber,
partitionId,
e);
RocksDBUtils.deletePartitionDir(baseDir, storeName, versionNumber, partitionId);
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Loading

0 comments on commit 5cc32a7

Please sign in to comment.