Skip to content

Commit

Permalink
Reorganize some code and separate the threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ilor committed Oct 24, 2012
1 parent 49d6ed2 commit ed2add1
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 91 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ add_definitions("-DHAVE_SCTPCAT_CONFIG_H")

set (SctpCat_SOURCES
addrinfo.cpp
consolethread.cpp
pingthread.cpp
sctpcat.cpp
util.cpp
)
Expand Down
15 changes: 15 additions & 0 deletions consolethread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "consolethread.h"
#include <iostream>

void consoleThread(ISctpSink& sink)
{
for (;;)
{
std::string input;
std::getline(std::cin, input);
if (!input.empty())
{
sink.send(input.c_str(), input.size());
}
}
}
8 changes: 8 additions & 0 deletions consolethread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef CONSOLETHREAD_H
#define CONSOLETHREAD_H

#include "isctpsink.h"

void consoleThread(ISctpSink& sink);

#endif // CONSOLETHREAD_H
12 changes: 12 additions & 0 deletions isctpsink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#ifndef ISCTPSINK_H
#define ISCTPSINK_H

#include <memory>

class ISctpSink
{
public:
virtual void send(const char* buf, size_t len) = 0;
};

#endif // ISCTPSINK_H
26 changes: 26 additions & 0 deletions pingthread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include "pingthread.h"
#include <boost/thread.hpp>

PingThread::PingThread(ISctpSink& sink, int bytes, int interval)
: m_sink(sink), m_bytes(bytes), m_interval(interval)
{
}

void PingThread::start()
{
m_thread = boost::thread(boost::bind(&PingThread::loop, this));
}

void PingThread::loop()
{
std::vector<char> buf(m_bytes);
for (int i = 0; i < m_bytes; ++i)
{
buf[i] = 'A' + (i % ('Z' - 'A'));
}
for (;;)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(m_interval));
m_sink.send(&buf[0], m_bytes);
}
}
22 changes: 22 additions & 0 deletions pingthread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef PINGTHREAD_H
#define PINGTHREAD_H

#include "isctpsink.h"
#include <boost/thread.hpp>

class PingThread
{
public:
PingThread(ISctpSink& sink, int bytes, int interval);

void start();
private:
void loop();

ISctpSink& m_sink;
int m_bytes;
int m_interval;
boost::thread m_thread;
};

#endif // PINGTHREAD_H
138 changes: 48 additions & 90 deletions sctpcat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,51 @@
#include <boost/optional.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <boost/program_options.hpp>

#include <boost/thread.hpp>

#include "addrinfo.hpp"
#include "exception.hpp"
#include "sctpcat.h"
#include "util.hpp"
#include "pingthread.h"
#include "consolethread.h"

typedef boost::program_options::variables_map varmap;

class SctpCat
void disableHb(int fd, sctp_assoc_t assoc_id, const sockaddr_storage& addr, size_t addr_len)
{
public:
SctpCat(const varmap& options);
void setup(std::string host, const std::string& port);
void listenSocket();
void connectSocket(const std::string &host, const std::string &port);
void receiveLoop();
void pingLoop(int bytes, int interval);
void consoleLoop();
void send(const char* buf, size_t len);
void setPathMaxRetrans(sctp_assoc_t assoc_id, int count);
void setAssocMaxRetrans(sctp_assoc_t assoc_id, int count);
void setRto(int rtoMin, int rtoMax, int rtoInitial);
private:
void subscribeAllEvents(int fd);
int setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len);

void receiveMessages(int fd);
void processMessage(int fd, char* buf, int len, sockaddr* from, socklen_t fromlen,
const sctp_sndrcvinfo& sinfo, int flags);
int m_fd;
sctp_assoc_t m_assoc_id;
static const int s_maxPendingConnections = 10;
bool m_printTicks;
int m_aiFamily;
bool m_listen;
const varmap& m_options;
boost::shared_ptr<addrinfo> m_ai;
mutable boost::mutex m_mutex;
typedef boost::mutex::scoped_lock ScopedLock;
std::vector< boost::function<void(sctp_assoc_t)> > m_newAssocCallbacks;
std::vector< boost::function<void(sctp_assoc_t, sockaddr_storage&)> > m_newPaddrCallbacks;
};
sctp_paddrparams params;
memset(&params, 0, sizeof(params));
params.spp_flags = SPP_HB_DISABLE;
params.spp_assoc_id = assoc_id;
memcpy(&params.spp_address, &addr, addr_len);
if (setsockopt(fd, SOL_SCTP, SCTP_PEER_ADDR_PARAMS, &params, socklen_t(sizeof(params))) != 0)
{
SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno);
}
std::cerr << timestamp() << "Disabled HB on " << sockaddr2string(&addr) << "\n";
}

SctpCat::SctpCat(const varmap& options)
: m_fd(-1), m_options(options)
{
m_printTicks = options.count("ticks");
m_aiFamily = options.count("ipv6") ? AF_INET6 : AF_INET;
m_listen = options.count("listen");

if (m_options.count("no-hb-on-secondary"))
{
registerPeerAddressCallback(boost::bind(disableHb, _1, _2, _3, sizeof(sockaddr_storage)));
}
}

void SctpCat::registerAssociationCallback(boost::function<void (int, sctp_assoc_t)> cb)
{
m_associationCallbacks.push_back(cb);
}

void SctpCat::registerPeerAddressCallback(boost::function<void (int, sctp_assoc_t, const sockaddr_storage &)> cb)
{
m_peerAddresssCallbacks.push_back(cb);
}

int SctpCat::setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len)
Expand Down Expand Up @@ -132,20 +126,6 @@ void SctpCat::subscribeAllEvents(int fd)
}
}

void disableHb(int fd, sctp_assoc_t assoc_id, void* addr, size_t addr_len)
{
sctp_paddrparams params;
memset(&params, 0, sizeof(params));
params.spp_flags = SPP_HB_DISABLE;
params.spp_assoc_id = assoc_id;
memcpy(&params.spp_address, addr, addr_len);
if (setsockopt(fd, SOL_SCTP, SCTP_PEER_ADDR_PARAMS, &params, socklen_t(sizeof(params))) != 0)
{
SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno);
}
std::cerr << timestamp() << "Disabled HB on " << sockaddr2string(reinterpret_cast<sockaddr*>(addr)) << "\n";
}

void SctpCat::receiveLoop()
{
int epollfd = epoll_create1(EPOLL_CLOEXEC);
Expand Down Expand Up @@ -181,30 +161,14 @@ void SctpCat::receiveLoop()
}
}

void SctpCat::pingLoop(int bytes, int interval)
{
std::vector<char> buf(bytes);
for (int i = 0; i < bytes; ++i)
{
buf[i] = 'A' + (i % ('Z' - 'A'));
}
for (;;)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(interval));
if (m_assoc_id == 0)
{
std::cerr << timestamp() << "No association, not pinging";
}
else
{
send(&buf[0], bytes);
}
}
}

void SctpCat::send(const char* buf, size_t len)
{
ScopedLock(m_mutex);
if (m_assoc_id == 0)
{
std::cerr << "send: no association; data discarded (" << len << ") bytes";
return;
}
sctp_sndrcvinfo sinfo;
memset(&sinfo, 0, sizeof(sinfo));
sinfo.sinfo_assoc_id = m_assoc_id;
Expand All @@ -223,19 +187,6 @@ void SctpCat::send(const char* buf, size_t len)
}
}

void SctpCat::consoleLoop()
{
for (;;)
{
std::string input;
std::getline(std::cin, input);
if (!input.empty())
{
send(input.c_str(), input.size());
}
}
}

void SctpCat::receiveMessages(int fd)
{
const int msgbufsize = 2000;
Expand All @@ -244,6 +195,7 @@ void SctpCat::receiveMessages(int fd)
socklen_t fromlen = sizeof(from);
sockaddr* from_ptr = reinterpret_cast<sockaddr*>(&from);
sctp_sndrcvinfo sinfo;
memset(&sinfo, 0, sizeof(sinfo));
int flags = 0;

for (;;)
Expand Down Expand Up @@ -277,18 +229,20 @@ void SctpCat::processMessage(int fd, char* buf, int len, sockaddr* from, socklen
if (notify->sn_assoc_change.sac_state == SCTP_COMM_UP)
{
m_assoc_id = notify->sn_assoc_change.sac_assoc_id;
ptrdiff_t info_from = offsetof(sctp_assoc_change, sac_info);
ptrdiff_t info_to = len;
std::cerr << timestamp() << "COMM_UP on assoc_id " << m_assoc_id << "\n";
for (size_t i = 0; i < m_associationCallbacks.size(); ++i)
{
m_associationCallbacks[i](fd, m_assoc_id);
}
}
}
if (notify->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE)
{
if (notify->sn_paddr_change.spc_state == SCTP_ADDR_CONFIRMED)
{
if (m_options.count("no-hb-on-secondary"))
for (size_t i = 0; i < m_peerAddresssCallbacks.size(); ++i)
{
disableHb(fd, notify->sn_paddr_change.spc_assoc_id, &notify->sn_paddr_change.spc_aaddr, sizeof(notify->sn_paddr_change.spc_aaddr));
m_peerAddresssCallbacks[i](fd, notify->sn_paddr_change.spc_assoc_id, notify->sn_paddr_change.spc_aaddr);
}
}
}
Expand Down Expand Up @@ -366,6 +320,7 @@ void SctpCat::setPathMaxRetrans(sctp_assoc_t assoc_id, int count)
{
SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno);
}
std::cerr << "PathMaxRetransmissions set to " << count << " on association " << assoc_id << "\n";
}

void SctpCat::setAssocMaxRetrans(sctp_assoc_t assoc_id, int count)
Expand All @@ -378,6 +333,7 @@ void SctpCat::setAssocMaxRetrans(sctp_assoc_t assoc_id, int count)
{
SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno);
}
std::cerr << "AssociationMaxRetransmissions set to " << count << " on association " << assoc_id << "\n";
}

int main(int argc, char** argv)
Expand Down Expand Up @@ -490,11 +446,13 @@ int main(int argc, char** argv)
}
sc.connectSocket(host, port);
}
boost::shared_ptr<PingThread> ping;
if (vm.count("ping-interval"))
{
boost::thread ping_thread(boost::bind(&SctpCat::pingLoop, &sc, vm["ping-bytes"].as<int>(), vm["ping-interval"].as<int>()));
ping = boost::make_shared<PingThread>(boost::ref(sc), vm["ping-bytes"].as<int>(), vm["ping-interval"].as<int>());
sc.registerAssociationCallback(boost::bind(&PingThread::start, ping.get()));
}
boost::thread ping_thread(boost::bind(&SctpCat::consoleLoop, &sc));
boost::thread console_thread(boost::bind(&consoleThread, boost::ref(sc)));
sc.receiveLoop();
}
catch (boost::exception & e)
Expand Down
51 changes: 51 additions & 0 deletions sctpcat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifndef SCTPCAT_H
#define SCTPCAT_H
#include <boost/program_options.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <netdb.h>
#include <netinet/sctp.h>
#include <string>

#include "isctpsink.h"

class SctpCat : public ISctpSink
{
public:
typedef boost::program_options::variables_map varmap;

SctpCat(const varmap& options);
void setup(std::string host, const std::string& port);
void listenSocket();
void connectSocket(const std::string &host, const std::string &port);
void receiveLoop();
void send(const char* buf, size_t len);
void setPathMaxRetrans(sctp_assoc_t assoc_id, int count);
void setAssocMaxRetrans(sctp_assoc_t assoc_id, int count);
void setRto(int rtoMin, int rtoMax, int rtoInitial);

void registerAssociationCallback(boost::function<void(int, sctp_assoc_t)>);
void registerPeerAddressCallback(boost::function<void(int, sctp_assoc_t, const sockaddr_storage&)>);
private:
void subscribeAllEvents(int fd);
int setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len);

void receiveMessages(int fd);
void processMessage(int fd, char* buf, int len, sockaddr* from, socklen_t fromlen,
const sctp_sndrcvinfo& sinfo, int flags);
int m_fd;
sctp_assoc_t m_assoc_id;
static const int s_maxPendingConnections = 10;
bool m_printTicks;
int m_aiFamily;
bool m_listen;
const varmap& m_options;
boost::shared_ptr<addrinfo> m_ai;
mutable boost::mutex m_mutex;
typedef boost::mutex::scoped_lock ScopedLock;
std::vector< boost::function<void(int, sctp_assoc_t)> > m_associationCallbacks;
std::vector< boost::function<void(int, sctp_assoc_t, const sockaddr_storage&)> > m_peerAddresssCallbacks;
};


#endif // SCTPCAT_H
1 change: 0 additions & 1 deletion util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class SctpNotificationPrinter
template<typename T>
void SctpNotificationPrinter::process(const T& event)
{
m_os << timestamp();
m_os << stringize_sctp_sn_type(reinterpret_cast<const sctp_notification*>(&event)->sn_header.sn_type);
}

Expand Down

0 comments on commit ed2add1

Please sign in to comment.