Skip to content

Commit

Permalink
add memory reuse for RDMA
Browse files Browse the repository at this point in the history
  • Loading branch information
ymjiang authored and eric-haibin-lin committed Aug 18, 2019
1 parent 855c920 commit 56e042c
Showing 1 changed file with 69 additions and 24 deletions.
93 changes: 69 additions & 24 deletions src/rdma_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ static const int kMaxConcurrentWorkRequest = kRxDepth + kStartDepth + kReplyDept
static const int kMaxHostnameLength = 16;
static const int kMaxDataFields = 4;
static const size_t kAlignment = 8;
static const size_t kInlineThreshold = 1024 * 32; // 32 KB
static const size_t kInlineThreshold = 1024 * 1024; // 32 KB

//template<class K, class T>
//std::map<K, T> dict(K, T)
//{
// return std::map<K,T>(); //K is your key and T is your data type
//}

template <typename T>
static inline T align_floor(T v, T align) {
Expand Down Expand Up @@ -195,6 +201,7 @@ struct LocalBufferContext {
};

typedef std::unique_ptr<struct ibv_mr, std::function<void(struct ibv_mr *)>> MRPtr;
typedef std::vector<SArray<char>> KV_Type;

struct MessageBuffer {
size_t inline_len;
Expand Down Expand Up @@ -433,6 +440,14 @@ class RDMAVan : public Van {

mempool_.reset();

std::map<char *, struct ibv_mr *>::iterator map_iter;
map_iter = memory_mr_map.begin();
while(map_iter != memory_mr_map.end()){
ibv_dereg_mr(map_iter->second);
map_iter++;
}


cm_event_polling_thread_->join();
cm_event_polling_thread_.reset();

Expand Down Expand Up @@ -532,7 +547,7 @@ class RDMAVan : public Van {
int SendMsg(const Message &msg) override {
int remote_id = msg.meta.recver;
CHECK_NE(remote_id, Meta::kEmpty);

PBMeta meta;
PackMetaPB(msg.meta, &meta);

Expand All @@ -546,7 +561,7 @@ class RDMAVan : public Van {
std::make_tuple(kLocalTransfer, reinterpret_cast<BufferContext *>(buf_ctx)));
return buf_ctx->meta_len + msg.meta.data_size;
}

CHECK_NE(endpoints_.find(remote_id), endpoints_.end());
Endpoint *endpoint = endpoints_[remote_id].get();
MessageBuffer *msg_buf = new MessageBuffer();
Expand All @@ -557,29 +572,56 @@ class RDMAVan : public Van {

CHECK(meta_len);

if (total_len <= kInlineThreshold) {
msg_buf->inline_len = total_len;
msg_buf->inline_buf = mempool_->Alloc(total_len);
meta.SerializeToArray(msg_buf->inline_buf, meta_len);
char *cur = msg_buf->inline_buf + meta_len;
for (auto &sa : msg.data) {
size_t seg_len = sa.size();
memcpy(cur, sa.data(), seg_len);
cur += seg_len;
}
} else {
msg_buf->inline_len = meta_len;
msg_buf->inline_buf = mempool_->Alloc(meta_len);
msg_buf->data = msg.data;
meta.SerializeToArray(msg_buf->inline_buf, meta_len);
for (auto &sa : msg_buf->data) {
// if (total_len <= kInlineThreshold) {
// msg_buf->inline_len = total_len;
// msg_buf->inline_buf = mempool_->Alloc(total_len);
// meta.SerializeToArray(msg_buf->inline_buf, meta_len);
// char *cur = msg_buf->inline_buf + meta_len;
// for (auto &sa : msg.data) {
// size_t seg_len = sa.size();
// memcpy(cur, sa.data(), seg_len);
// cur += seg_len;
// }
// }
// else {
// msg_buf->inline_len = meta_len;
// msg_buf->inline_buf = mempool_->Alloc(meta_len);
// msg_buf->data = msg.data;
// meta.SerializeToArray(msg_buf->inline_buf, meta_len);
// for (auto &sa : msg_buf->data) {
// if (sa.size()) {
// MRPtr ptr(ibv_reg_mr(pd_, sa.data(), sa.size(), 0),
// [](struct ibv_mr *mr) { ibv_dereg_mr(mr); });
// CHECK(ptr.get()) << strerror(errno);
// msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));
// }
// }
// }



msg_buf->inline_len = meta_len;
msg_buf->inline_buf = mempool_->Alloc(meta_len);
msg_buf->data = msg.data;
meta.SerializeToArray(msg_buf->inline_buf, meta_len);
for (auto &sa : msg_buf->data) {
if (sa.size()) {
MRPtr ptr(ibv_reg_mr(pd_, sa.data(), sa.size(), 0),
[](struct ibv_mr *mr) { ibv_dereg_mr(mr); });
CHECK(ptr.get()) << strerror(errno);
msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));
auto search_map_iterator = memory_mr_map.find(sa.data());
if(search_map_iterator != memory_mr_map.end()){ //if used before
MRPtr ptr(search_map_iterator->second,
[](struct ibv_mr *mr) { });
CHECK(ptr.get()) << strerror(errno);
msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));

}else{
struct ibv_mr *temp_mr = ibv_reg_mr(pd_, sa.data(), sa.size(), 0);
memory_mr_map[sa.data()] = temp_mr;
MRPtr ptr(temp_mr,
[](struct ibv_mr *mr) { });
CHECK(ptr.get()) << strerror(errno);
msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));
}
}
}
}

WRContext *context = nullptr, *reserved = nullptr;
Expand Down Expand Up @@ -1080,6 +1122,9 @@ class RDMAVan : public Van {
struct rdma_event_channel *event_channel_ = nullptr;
struct ibv_context *context_ = nullptr;

std::map<char *, struct ibv_mr *> memory_mr_map;


// ibverbs protection domain
struct ibv_pd *pd_ = nullptr;
// Completion event channel, to wait for work completions
Expand Down

0 comments on commit 56e042c

Please sign in to comment.