Skip to content

Commit

Permalink
1) Multiple sender issue fix, 2) Max fd issue in select fix, 3) Added…
Browse files Browse the repository at this point in the history
… transfer-id to wdt

Summary: 1) Due to some bugs, we were creating multiple senders for same
snashot-id in case of retries. Shared data structures were also not protected by
locks. Ensured that before creating a sender previous sender is destructed
properly.
2) We were getting random crashes around select/connect system call. The issue
is that max fd that select can handle is FD_SETSIZE. This value by default is
1024. So, in case of higher fds, buffer overflows were occuring in FD_SET. To
fix this, replaced select with poll, which does not have any max fd limit.
3) Added transfer-id to wdt. A separate id verification is needed to ensure that sender does not connect to wrong receiver. This can happen in case of thrift timeouts and subsequent retries.

Reviewed By: @ldemailly

Differential Revision: D2024305
  • Loading branch information
uddipta authored and ldemailly committed Jul 17, 2015
1 parent 8c0f52b commit b6ac1c7
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 61 deletions.
37 changes: 19 additions & 18 deletions ClientSocket.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "ClientSocket.h"
#include "SocketUtils.h"
#include "WdtOptions.h"
#include <folly/ScopeGuard.h>
#include <glog/logging.h>
#include <sys/socket.h>
#include <poll.h>
#include <fcntl.h>
#include <chrono>

Expand Down Expand Up @@ -36,7 +38,12 @@ ErrorCode ClientSocket::connect() {
return OK;
}
// Lookup
struct addrinfo *infoList;
struct addrinfo *infoList = nullptr;
folly::ScopeGuard guard = folly::makeGuard([&] {
if (infoList) {
freeaddrinfo(infoList);
}
});
int res = getaddrinfo(dest_.c_str(), port_.c_str(), &sa_, &infoList);
if (res) {
// not errno, can't use PLOG (perror)
Expand All @@ -52,9 +59,10 @@ ErrorCode ClientSocket::connect() {
<< SocketUtils::getNameInfo(info->ai_addr, info->ai_addrlen);
fd_ = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (fd_ == -1) {
PLOG(WARNING) << "Error making socket";
PLOG(WARNING) << "Error making socket for port " << port_;
continue;
}
VLOG(1) << "new socket " << fd_ << " for port " << port_;

// make the socket non blocking
int sockArg = fcntl(fd_, F_GETFL, nullptr);
Expand All @@ -77,35 +85,29 @@ ErrorCode ClientSocket::connect() {
int connectTimeout = WdtOptions::get().connect_timeout_millis;

while (true) {
// we need this loop because select() can return before any file handles
// we need this loop because poll() can return before any file handles
// have changes or before timing out. In that case, we check whether it
// is becuse of EINTR or not. If true, we have to try select with
// is becuse of EINTR or not. If true, we have to try poll with
// reduced timeout
int timeElapsed = durationMillis(Clock::now() - startTime);
if (timeElapsed >= connectTimeout) {
LOG(ERROR) << "connect() timed out";
this->close();
return CONN_ERROR;
}
int selectTimeout = connectTimeout - timeElapsed;
struct timeval tv;
tv.tv_sec = selectTimeout / 1000;
tv.tv_usec = (selectTimeout % 1000) * 1000;

fd_set wfds;
FD_ZERO(&wfds);
FD_SET(fd_, &wfds);
int pollTimeout = connectTimeout - timeElapsed;
struct pollfd pollFds[] = {{fd_, POLLOUT}};

int retValue;
if ((retValue = select(fd_ + 1, nullptr, &wfds, nullptr, &tv)) <= 0) {
if ((retValue = poll(pollFds, 1, pollTimeout)) <= 0) {
if (errno == EINTR) {
VLOG(1) << "select() call interrupted. retrying...";
VLOG(1) << "poll() call interrupted. retrying...";
continue;
}
if (retValue == 0) {
LOG(ERROR) << "select() timed out " << port_;
LOG(ERROR) << "poll() timed out " << port_;
} else {
PLOG(ERROR) << "select() failed " << port_;
PLOG(ERROR) << "poll() failed " << port_ << " " << fd_;
}
this->close();
return CONN_ERROR;
Expand All @@ -122,7 +124,7 @@ ErrorCode ClientSocket::connect() {
continue;
}
if (connectResult != 0) {
LOG(WARNING) << "connect did not succeed : " << strerror(connectResult);
LOG(WARNING) << "connect did not succeed : " << connectResult;
this->close();
continue;
}
Expand All @@ -142,7 +144,6 @@ ErrorCode ClientSocket::connect() {
sa_ = *info;
break;
}
freeaddrinfo(infoList);
if (fd_ < 0) {
if (count > 1) {
// Only log this if not redundant with log above (ie --ipv6=false)
Expand Down
26 changes: 14 additions & 12 deletions ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ namespace wdt {
// skipping or being DCHECK
#define WDT_CHECK CHECK

#define ERRORS \
X(OK) /** No error */ \
X(ERROR) /** Generic error */ \
X(ABORT) /** Abort */ \
X(CONN_ERROR) /** Connection Error */ \
X(CONN_ERROR_RETRYABLE) /** Retryable connection error */ \
X(SOCKET_READ_ERROR) /** Socket read error */ \
X(SOCKET_WRITE_ERROR) /** Socket write error */ \
X(BYTE_SOURCE_READ_ERROR) /** Byte source(file) read error */ \
X(FILE_WRITE_ERROR) /** file write error */ \
X(MEMORY_ALLOCATION_ERROR) /** Memory allocation failure */ \
X(PROTOCOL_ERROR) /** WDT protocol error */
#define ERRORS \
X(OK) /** No error */ \
X(ERROR) /** Generic error */ \
X(ABORT) /** Abort */ \
X(CONN_ERROR) /** Connection Error */ \
X(CONN_ERROR_RETRYABLE) /** Retryable connection error */ \
X(SOCKET_READ_ERROR) /** Socket read error */ \
X(SOCKET_WRITE_ERROR) /** Socket write error */ \
X(BYTE_SOURCE_READ_ERROR) /** Byte source(file) read error */ \
X(FILE_WRITE_ERROR) /** file write error */ \
X(MEMORY_ALLOCATION_ERROR) /** Memory allocation failure */ \
X(PROTOCOL_ERROR) /** WDT protocol error */ \
X(VERSION_MISMATCH) /** Sender and Receiver version mimatch */ \
X(ID_MISMATCH) /** Sender and Receiver id mismatch*/

enum ErrorCode {
#define X(A) A,
Expand Down
2 changes: 1 addition & 1 deletion FileCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool FileCreator::setFileSize(int fd, size_t fileSize) {
}
int status = posix_fallocate(fd, 0, fileSize);
if (status != 0) {
LOG(ERROR) << "fallocate() failed " << strerror(status);
LOG(ERROR) << "fallocate() failed " << status;
return false;
}
return true;
Expand Down
17 changes: 15 additions & 2 deletions Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,34 @@ bool Protocol::decodeDone(char *src, size_t &off, size_t max,
}

bool Protocol::encodeSettings(char *dest, size_t &off, size_t max,
int32_t protocolVersion,
int64_t readTimeoutMillis,
int64_t writeTimeoutMillis) {
int64_t writeTimeoutMillis,
const std::string &senderId) {
off += folly::encodeVarint(protocolVersion, (uint8_t *)dest + off);
off += folly::encodeVarint(readTimeoutMillis, (uint8_t *)dest + off);
off += folly::encodeVarint(writeTimeoutMillis, (uint8_t *)dest + off);
size_t idLen = senderId.size();
off += folly::encodeVarint(idLen, (uint8_t *)dest + off);
memcpy(dest + off, senderId.data(), idLen);
off += idLen;
WDT_CHECK(off <= max) << "Memory corruption:" << off << " " << max;
return true;
}

bool Protocol::decodeSettings(char *src, size_t &off, size_t max,
int32_t &protocolVersion,
int64_t &readTimeoutMillis,
int64_t &writeTimeoutMillis) {
int64_t &writeTimeoutMillis,
std::string &senderId) {
folly::ByteRange br((uint8_t *)(src + off), max);
try {
protocolVersion = folly::decodeVarint(br);
readTimeoutMillis = folly::decodeVarint(br);
writeTimeoutMillis = folly::decodeVarint(br);
size_t idLen = folly::decodeVarint(br);
senderId.assign((const char *)(br.start()), idLen);
br.advance(idLen);
} catch (const std::exception &ex) {
LOG(ERROR) << "got exception " << folly::exceptionStr(ex);
return false;
Expand Down
17 changes: 11 additions & 6 deletions Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class Protocol {
EXIT_CMD = 0x65, // e)xit
};

/// Max size of sender or receiver id
static const size_t kMaxTransferIdLength = 50;
/// Max size of filename + 4 max varints + 1 byte for cmd + 1 byte for status
static const size_t kMaxHeader = PATH_MAX + 5 * 10 + 2 + 2;
/// min number of bytes that must be send to unblock receiver
Expand All @@ -34,7 +36,7 @@ class Protocol {
/// max size of done command encoding
static const size_t kMaxDone = 2 + 10;
/// max size of settings command encoding
static const size_t kMaxSettings = 1 + 2 * 10;
static const size_t kMaxSettings = 1 + 4 * 10 + kMaxTransferIdLength;

/// encodes id, sequence-id, block-size, block-offset and file-size
/// into dest+off
Expand Down Expand Up @@ -74,19 +76,22 @@ class Protocol {
static bool decodeDone(char *src, size_t &off, size_t max,
int64_t &numBlocks);

/// encodes read and write timeout into dest+off
/// encodes protocol version, read and write timeout, sender-id into dest+off
/// moves the off into dest pointer, not going past max
/// @return false if there isn't enough room to encode
static bool encodeSettings(char *dest, size_t &off, size_t max,
int64_t readTimeoutMillis,
int64_t writeTimeoutMillis);
int32_t protocolVersion, int64_t readTimeoutMillis,
int64_t writeTimeoutMillis,
const std::string &senderId);

/// decodes from src+off and consumes/moves off but not past max
/// sets readTimeoutMillis and writeTimeoutMillis
/// sets protocolVersion, readTimeoutMillis, writeTimeoutMillis and senderId
/// @return false if there isn't enough data in src+off to src+max
static bool decodeSettings(char *src, size_t &off, size_t max,
int32_t &protocolVersion,
int64_t &readTimeoutMillis,
int64_t &writeTimeoutMillis);
int64_t &writeTimeoutMillis,
std::string &senderId);
};
}
} // namespace facebook::wdt
34 changes: 30 additions & 4 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,13 @@ int32_t Receiver::registerPorts(bool stopOnFailure) {
}

void Receiver::setDir(const std::string &destDir) {
this->destDir_ = destDir;
destDir_ = destDir;
}

void Receiver::setReceiverId(const std::string &receiverId) {
WDT_CHECK(receiverId.length() <= Protocol::kMaxTransferIdLength);
receiverId_ = receiverId;
LOG(INFO) << "receiver id " << receiverId_;
}

void Receiver::cancelTransfer() {
Expand Down Expand Up @@ -604,14 +610,29 @@ Receiver::ReceiverState Receiver::processSettingsCmd(ThreadData &data) {
auto &senderReadTimeout = data.senderReadTimeout_;
auto &senderWriteTimeout = data.senderWriteTimeout_;
auto &threadStats = data.threadStats_;
bool success =
Protocol::decodeSettings(buf, off, oldOffset + Protocol::kMaxSettings,
senderReadTimeout, senderWriteTimeout);
auto &options = WdtOptions::get();
int32_t protocolVersion;
std::string senderId;
bool success = Protocol::decodeSettings(
buf, off, oldOffset + Protocol::kMaxSettings, protocolVersion,
senderReadTimeout, senderWriteTimeout, senderId);
if (!success) {
LOG(ERROR) << "Unable to decode settings cmd";
threadStats.setErrorCode(PROTOCOL_ERROR);
return WAIT_FOR_FINISH_WITH_THREAD_ERROR;
}
if (protocolVersion != options.protocol_version) {
LOG(ERROR) << "Receiver and sender protocol version mismatch"
<< protocolVersion << " " << options.protocol_version;
threadStats.setErrorCode(VERSION_MISMATCH);
return SEND_ABORT_CMD;
}
if (receiverId_ != senderId) {
LOG(ERROR) << "Receiver and sender id mismatch " << senderId << " "
<< receiverId_;
threadStats.setErrorCode(ID_MISMATCH);
return SEND_ABORT_CMD;
}
auto msgLen = off - oldOffset;
numRead -= msgLen;
return READ_NEXT_CMD;
Expand Down Expand Up @@ -900,8 +921,13 @@ Receiver::ReceiverState Receiver::sendAbortCmd(ThreadData &data) {
auto &threadStats = data.threadStats_;
char *buf = data.getBuf();
auto &socket = data.socket_;
auto &options = WdtOptions::get();
int32_t protocolVersion = options.protocol_version;
int offset = 0;
buf[offset++] = Protocol::ABORT_CMD;
folly::storeUnaligned<int32_t>(buf + offset,
folly::Endian::little(protocolVersion));
offset += sizeof(int32_t);
buf[offset++] = threadStats.getErrorCode();
int64_t checkpoint = folly::Endian::little(threadStats.getNumBlocks());
folly::storeUnaligned<int64_t>(buf + offset, checkpoint);
Expand Down
7 changes: 7 additions & 0 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class Receiver {
*/
void setDir(const std::string &destDir);

/// @param receiverId unique id of this receiver
void setReceiverId(const std::string &receiverId);

/**
* Destructor for the receiver should try to join threads.
* Since the threads are part of the object. We can't destroy the
Expand Down Expand Up @@ -426,6 +429,10 @@ class Receiver {
/// Responsible for writing files on the disk
std::unique_ptr<FileCreator> fileCreator_;

/// Unique id of this receiver object, this must match sender-id sent as part
/// of settings
std::string receiverId_;

/**
* Progress tracker thread is a thread which has to be joined when the
* transfer is finished. The root thread in finish() and the progress
Expand Down
23 changes: 18 additions & 5 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ void Sender::setProgressReportIntervalMillis(
progressReportIntervalMillis_ = progressReportIntervalMillis;
}

void Sender::setSenderId(const std::string &senderId) {
WDT_CHECK(senderId.length() <= Protocol::kMaxTransferIdLength);
senderId_ = senderId;
LOG(INFO) << "sender id " << senderId_;
}

void Sender::setProgressReporter(
std::unique_ptr<ProgressReporter> &progressReporter) {
progressReporter_ = std::move(progressReporter);
Expand Down Expand Up @@ -558,7 +564,8 @@ Sender::SenderState Sender::sendSettings(ThreadData &data) {
buf[off++] = Protocol::SETTINGS_CMD;

bool success = Protocol::encodeSettings(
buf, off, Protocol::kMaxSettings, readTimeoutMillis, writeTimeoutMillis);
buf, off, Protocol::kMaxSettings, options.protocol_version,
readTimeoutMillis, writeTimeoutMillis, senderId_);
WDT_CHECK(success);
ssize_t written = socket->write(buf, off);
if (written != off) {
Expand Down Expand Up @@ -776,20 +783,26 @@ Sender::SenderState Sender::processAbortCmd(ThreadData &data) {
ThreadTransferHistory &transferHistory = data.getTransferHistory();

threadStats.setErrorCode(ABORT);
int toRead = 1 + sizeof(int64_t);
int toRead = 1 + sizeof(int32_t) + sizeof(int64_t);
auto numRead = socket->read(buf, toRead);
if (numRead != toRead) {
// can not read checkpoint, but still must exit because of ABORT
LOG(ERROR) << "Error while trying to read ABORT cmd " << numRead << " "
<< toRead;
return END;
}
int64_t checkpoint = folly::loadUnaligned<int64_t>(buf + 1);
checkpoint = folly::Endian::little(checkpoint);
ErrorCode remoteError = (ErrorCode)buf[0];

int offset = 0;
int32_t protocolVersion = folly::loadUnaligned<int32_t>(buf + offset);
protocolVersion = folly::Endian::little(protocolVersion);
offset += sizeof(int32_t);
ErrorCode remoteError = (ErrorCode)buf[offset++];
threadStats.setRemoteErrorCode(remoteError);
int64_t checkpoint = folly::loadUnaligned<int64_t>(buf + offset);
checkpoint = folly::Endian::little(checkpoint);
std::string failedFileName = transferHistory.getSourceId(checkpoint);
LOG(WARNING) << "Received abort on " << data.threadIndex_
<< " remote protocol version " << protocolVersion
<< " remote error code " << errorCodeToStr(remoteError)
<< " file " << failedFileName << " checkpoint " << checkpoint;
abort();
Expand Down
6 changes: 6 additions & 0 deletions Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class Sender {
/// Sets whether to follow symlink or not
void setFollowSymlinks(const bool followSymlinks);

/// @param senderId unique id of the sender
void setSenderId(const std::string &senderId);

/**
* @param progressReportIntervalMillis_ interval(ms) between progress
* reports. A value of 0 indicates no
Expand Down Expand Up @@ -388,6 +391,9 @@ class Sender {

/// Pointer to DirectorySourceQueue which reads the srcDir and the files
std::unique_ptr<DirectorySourceQueue> dirQueue_;
/// unique id of this sender object. This is send to the receiver for
/// identification
std::string senderId_;
/// List of ports where the receiver threads are running on the destination
std::vector<int32_t> ports_;
/// Number of active threads, decremented everytime a thread is finished
Expand Down
Loading

0 comments on commit b6ac1c7

Please sign in to comment.