diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f9e947..1197ecf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,6 +12,8 @@ add_definitions("-DHAVE_SCTPCAT_CONFIG_H") set (SctpCat_SOURCES addrinfo.cpp + consolethread.cpp + pingthread.cpp sctpcat.cpp util.cpp ) diff --git a/consolethread.cpp b/consolethread.cpp new file mode 100644 index 0000000..de0b3ef --- /dev/null +++ b/consolethread.cpp @@ -0,0 +1,15 @@ +#include "consolethread.h" +#include + +void consoleThread(ISctpSink& sink) +{ + for (;;) + { + std::string input; + std::getline(std::cin, input); + if (!input.empty()) + { + sink.send(input.c_str(), input.size()); + } + } +} diff --git a/consolethread.h b/consolethread.h new file mode 100644 index 0000000..12b93ef --- /dev/null +++ b/consolethread.h @@ -0,0 +1,8 @@ +#ifndef CONSOLETHREAD_H +#define CONSOLETHREAD_H + +#include "isctpsink.h" + +void consoleThread(ISctpSink& sink); + +#endif // CONSOLETHREAD_H diff --git a/isctpsink.h b/isctpsink.h new file mode 100644 index 0000000..86bf838 --- /dev/null +++ b/isctpsink.h @@ -0,0 +1,12 @@ +#ifndef ISCTPSINK_H +#define ISCTPSINK_H + +#include + +class ISctpSink +{ +public: + virtual void send(const char* buf, size_t len) = 0; +}; + +#endif // ISCTPSINK_H diff --git a/pingthread.cpp b/pingthread.cpp new file mode 100644 index 0000000..d8ca821 --- /dev/null +++ b/pingthread.cpp @@ -0,0 +1,26 @@ +#include "pingthread.h" +#include + +PingThread::PingThread(ISctpSink& sink, int bytes, int interval) + : m_sink(sink), m_bytes(bytes), m_interval(interval) +{ +} + +void PingThread::start() +{ + m_thread = boost::thread(boost::bind(&PingThread::loop, this)); +} + +void PingThread::loop() +{ + std::vector buf(m_bytes); + for (int i = 0; i < m_bytes; ++i) + { + buf[i] = 'A' + (i % ('Z' - 'A')); + } + for (;;) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(m_interval)); + m_sink.send(&buf[0], m_bytes); + } +} diff --git a/pingthread.h b/pingthread.h new file mode 100644 index 0000000..32d6d88 --- /dev/null +++ b/pingthread.h @@ -0,0 +1,22 @@ +#ifndef PINGTHREAD_H +#define PINGTHREAD_H + +#include "isctpsink.h" +#include + +class PingThread +{ +public: + PingThread(ISctpSink& sink, int bytes, int interval); + + void start(); +private: + void loop(); + + ISctpSink& m_sink; + int m_bytes; + int m_interval; + boost::thread m_thread; +}; + +#endif // PINGTHREAD_H diff --git a/sctpcat.cpp b/sctpcat.cpp index a1e104b..69d5a8e 100644 --- a/sctpcat.cpp +++ b/sctpcat.cpp @@ -21,50 +21,29 @@ #include #include #include -#include #include #include "addrinfo.hpp" #include "exception.hpp" +#include "sctpcat.h" #include "util.hpp" +#include "pingthread.h" +#include "consolethread.h" -typedef boost::program_options::variables_map varmap; - -class SctpCat +void disableHb(int fd, sctp_assoc_t assoc_id, const sockaddr_storage& addr, size_t addr_len) { -public: - SctpCat(const varmap& options); - void setup(std::string host, const std::string& port); - void listenSocket(); - void connectSocket(const std::string &host, const std::string &port); - void receiveLoop(); - void pingLoop(int bytes, int interval); - void consoleLoop(); - void send(const char* buf, size_t len); - void setPathMaxRetrans(sctp_assoc_t assoc_id, int count); - void setAssocMaxRetrans(sctp_assoc_t assoc_id, int count); - void setRto(int rtoMin, int rtoMax, int rtoInitial); -private: - void subscribeAllEvents(int fd); - int setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len); - - void receiveMessages(int fd); - void processMessage(int fd, char* buf, int len, sockaddr* from, socklen_t fromlen, - const sctp_sndrcvinfo& sinfo, int flags); - int m_fd; - sctp_assoc_t m_assoc_id; - static const int s_maxPendingConnections = 10; - bool m_printTicks; - int m_aiFamily; - bool m_listen; - const varmap& m_options; - boost::shared_ptr m_ai; - mutable boost::mutex m_mutex; - typedef boost::mutex::scoped_lock ScopedLock; - std::vector< boost::function > m_newAssocCallbacks; - std::vector< boost::function > m_newPaddrCallbacks; -}; + sctp_paddrparams params; + memset(¶ms, 0, sizeof(params)); + params.spp_flags = SPP_HB_DISABLE; + params.spp_assoc_id = assoc_id; + memcpy(¶ms.spp_address, &addr, addr_len); + if (setsockopt(fd, SOL_SCTP, SCTP_PEER_ADDR_PARAMS, ¶ms, socklen_t(sizeof(params))) != 0) + { + SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno); + } + std::cerr << timestamp() << "Disabled HB on " << sockaddr2string(&addr) << "\n"; +} SctpCat::SctpCat(const varmap& options) : m_fd(-1), m_options(options) @@ -72,6 +51,21 @@ SctpCat::SctpCat(const varmap& options) m_printTicks = options.count("ticks"); m_aiFamily = options.count("ipv6") ? AF_INET6 : AF_INET; m_listen = options.count("listen"); + + if (m_options.count("no-hb-on-secondary")) + { + registerPeerAddressCallback(boost::bind(disableHb, _1, _2, _3, sizeof(sockaddr_storage))); + } +} + +void SctpCat::registerAssociationCallback(boost::function cb) +{ + m_associationCallbacks.push_back(cb); +} + +void SctpCat::registerPeerAddressCallback(boost::function cb) +{ + m_peerAddresssCallbacks.push_back(cb); } int SctpCat::setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len) @@ -132,20 +126,6 @@ void SctpCat::subscribeAllEvents(int fd) } } -void disableHb(int fd, sctp_assoc_t assoc_id, void* addr, size_t addr_len) -{ - sctp_paddrparams params; - memset(¶ms, 0, sizeof(params)); - params.spp_flags = SPP_HB_DISABLE; - params.spp_assoc_id = assoc_id; - memcpy(¶ms.spp_address, addr, addr_len); - if (setsockopt(fd, SOL_SCTP, SCTP_PEER_ADDR_PARAMS, ¶ms, socklen_t(sizeof(params))) != 0) - { - SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno); - } - std::cerr << timestamp() << "Disabled HB on " << sockaddr2string(reinterpret_cast(addr)) << "\n"; -} - void SctpCat::receiveLoop() { int epollfd = epoll_create1(EPOLL_CLOEXEC); @@ -181,30 +161,14 @@ void SctpCat::receiveLoop() } } -void SctpCat::pingLoop(int bytes, int interval) -{ - std::vector buf(bytes); - for (int i = 0; i < bytes; ++i) - { - buf[i] = 'A' + (i % ('Z' - 'A')); - } - for (;;) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(interval)); - if (m_assoc_id == 0) - { - std::cerr << timestamp() << "No association, not pinging"; - } - else - { - send(&buf[0], bytes); - } - } -} - void SctpCat::send(const char* buf, size_t len) { ScopedLock(m_mutex); + if (m_assoc_id == 0) + { + std::cerr << "send: no association; data discarded (" << len << ") bytes"; + return; + } sctp_sndrcvinfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); sinfo.sinfo_assoc_id = m_assoc_id; @@ -223,19 +187,6 @@ void SctpCat::send(const char* buf, size_t len) } } -void SctpCat::consoleLoop() -{ - for (;;) - { - std::string input; - std::getline(std::cin, input); - if (!input.empty()) - { - send(input.c_str(), input.size()); - } - } -} - void SctpCat::receiveMessages(int fd) { const int msgbufsize = 2000; @@ -244,6 +195,7 @@ void SctpCat::receiveMessages(int fd) socklen_t fromlen = sizeof(from); sockaddr* from_ptr = reinterpret_cast(&from); sctp_sndrcvinfo sinfo; + memset(&sinfo, 0, sizeof(sinfo)); int flags = 0; for (;;) @@ -277,18 +229,20 @@ void SctpCat::processMessage(int fd, char* buf, int len, sockaddr* from, socklen if (notify->sn_assoc_change.sac_state == SCTP_COMM_UP) { m_assoc_id = notify->sn_assoc_change.sac_assoc_id; - ptrdiff_t info_from = offsetof(sctp_assoc_change, sac_info); - ptrdiff_t info_to = len; std::cerr << timestamp() << "COMM_UP on assoc_id " << m_assoc_id << "\n"; + for (size_t i = 0; i < m_associationCallbacks.size(); ++i) + { + m_associationCallbacks[i](fd, m_assoc_id); + } } } if (notify->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE) { if (notify->sn_paddr_change.spc_state == SCTP_ADDR_CONFIRMED) { - if (m_options.count("no-hb-on-secondary")) + for (size_t i = 0; i < m_peerAddresssCallbacks.size(); ++i) { - disableHb(fd, notify->sn_paddr_change.spc_assoc_id, ¬ify->sn_paddr_change.spc_aaddr, sizeof(notify->sn_paddr_change.spc_aaddr)); + m_peerAddresssCallbacks[i](fd, notify->sn_paddr_change.spc_assoc_id, notify->sn_paddr_change.spc_aaddr); } } } @@ -366,6 +320,7 @@ void SctpCat::setPathMaxRetrans(sctp_assoc_t assoc_id, int count) { SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno); } + std::cerr << "PathMaxRetransmissions set to " << count << " on association " << assoc_id << "\n"; } void SctpCat::setAssocMaxRetrans(sctp_assoc_t assoc_id, int count) @@ -378,6 +333,7 @@ void SctpCat::setAssocMaxRetrans(sctp_assoc_t assoc_id, int count) { SCTPCAT_THROW(SctpCatError()) << clib_failure("setsockopt", errno); } + std::cerr << "AssociationMaxRetransmissions set to " << count << " on association " << assoc_id << "\n"; } int main(int argc, char** argv) @@ -490,11 +446,13 @@ int main(int argc, char** argv) } sc.connectSocket(host, port); } + boost::shared_ptr ping; if (vm.count("ping-interval")) { - boost::thread ping_thread(boost::bind(&SctpCat::pingLoop, &sc, vm["ping-bytes"].as(), vm["ping-interval"].as())); + ping = boost::make_shared(boost::ref(sc), vm["ping-bytes"].as(), vm["ping-interval"].as()); + sc.registerAssociationCallback(boost::bind(&PingThread::start, ping.get())); } - boost::thread ping_thread(boost::bind(&SctpCat::consoleLoop, &sc)); + boost::thread console_thread(boost::bind(&consoleThread, boost::ref(sc))); sc.receiveLoop(); } catch (boost::exception & e) diff --git a/sctpcat.h b/sctpcat.h new file mode 100644 index 0000000..36bcb86 --- /dev/null +++ b/sctpcat.h @@ -0,0 +1,51 @@ +#ifndef SCTPCAT_H +#define SCTPCAT_H +#include +#include +#include +#include +#include +#include + +#include "isctpsink.h" + +class SctpCat : public ISctpSink +{ +public: + typedef boost::program_options::variables_map varmap; + + SctpCat(const varmap& options); + void setup(std::string host, const std::string& port); + void listenSocket(); + void connectSocket(const std::string &host, const std::string &port); + void receiveLoop(); + void send(const char* buf, size_t len); + void setPathMaxRetrans(sctp_assoc_t assoc_id, int count); + void setAssocMaxRetrans(sctp_assoc_t assoc_id, int count); + void setRto(int rtoMin, int rtoMax, int rtoInitial); + + void registerAssociationCallback(boost::function); + void registerPeerAddressCallback(boost::function); +private: + void subscribeAllEvents(int fd); + int setupSocket(int ai_family, sockaddr* local_addr, socklen_t local_addr_len); + + void receiveMessages(int fd); + void processMessage(int fd, char* buf, int len, sockaddr* from, socklen_t fromlen, + const sctp_sndrcvinfo& sinfo, int flags); + int m_fd; + sctp_assoc_t m_assoc_id; + static const int s_maxPendingConnections = 10; + bool m_printTicks; + int m_aiFamily; + bool m_listen; + const varmap& m_options; + boost::shared_ptr m_ai; + mutable boost::mutex m_mutex; + typedef boost::mutex::scoped_lock ScopedLock; + std::vector< boost::function > m_associationCallbacks; + std::vector< boost::function > m_peerAddresssCallbacks; +}; + + +#endif // SCTPCAT_H diff --git a/util.cpp b/util.cpp index 281b3a1..48d3393 100644 --- a/util.cpp +++ b/util.cpp @@ -187,7 +187,6 @@ class SctpNotificationPrinter template void SctpNotificationPrinter::process(const T& event) { - m_os << timestamp(); m_os << stringize_sctp_sn_type(reinterpret_cast(&event)->sn_header.sn_type); }