Skip to content

Commit

Permalink
depth_feed_subscriber.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
iamtheschmitzer committed May 21, 2013
1 parent 3599eeb commit 5719ec0
Showing 1 changed file with 361 additions and 3 deletions.
364 changes: 361 additions & 3 deletions doc/settAug2013/liquibook_sett.html
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ <h2>Subscriber Application</h2>
<code>DepthFeedSubscriber</code> class:
</p>

<div class="listing">publisher_main.cpp: main() helper functions, continued</div>
<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class declaration</div>
<pre class="code">
namespace liquibook { namespace examples {

Expand All @@ -2540,13 +2540,20 @@ <h2>Subscriber Application</h2>
DepthFeedSubscriber(
const QuickFAST::Codecs::TemplateRegistryPtr&amp; templates);

// Handle a reset of the connection
void handle_reset();

// Handle a message
// return false if failure
bool handle_message(BufferPtr&amp; bp, size_t bytes_transferred);
</pre>

// Handle a reset of the connection
void handle_reset();
<p>
The <code>DepthFeedSubscriber</code> class has a number of private members:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class declaration, continued</div>
<pre class="code">
private:
QuickFAST::Codecs::Decoder decoder_;
typedef std::map&lt;std::string, book::Depth&lt;5&gt; &gt; DepthMap;
Expand All @@ -2555,7 +2562,22 @@ <h2>Subscriber Application</h2>

static const uint64_t MSG_TYPE_DEPTH;
static const uint64_t MSG_TYPE_TRADE;
</pre>

<p>
These members include a QuickFAST decoder, an order book for each security
(kept in a <code>std::map</code>), and the expected sequence number. The
sequence number is tracked to validate the feed, so that the subscriber is
sure that every message from the publisher has been handled. Finally, there
are two constants for determining message type.
</p>

<p>
In addition, there are a few private methods, to process incoming methods:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class declaration, continued</div>
<pre class="code">
void log_depth(book::Depth&lt;5&gt;&amp; depth);
bool handle_trade_message(const std::string&amp; symbol,
uint64_t&amp; seq_num,
Expand All @@ -2569,6 +2591,342 @@ <h2>Subscriber Application</h2>
} }
</pre>

<p>
After initializing the static members comes the class constructor:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation</div>
<pre class="code">
namespace liquibook { namespace examples {

const uint64_t DepthFeedSubscriber::MSG_TYPE_DEPTH(11);
const uint64_t DepthFeedSubscriber::MSG_TYPE_TRADE(22);

using QuickFAST::ValueType;

DepthFeedSubscriber::DepthFeedSubscriber(
const QuickFAST::Codecs::TemplateRegistryPtr&amp; templates)
: decoder_(templates),
expected_seq_(1)
{
}
</pre>

<p>
The constructor passes the templates to the decoder, and initializes the
expected sequence number. Next is the <code>handle_reset()</code> method,
which is called when the connection to the publisher is reset:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation, continued</div>
<pre class="code">
void
DepthFeedSubscriber::handle_reset()
{
expected_seq_ = 1;
}
</pre>

<p>
This simple method just resets the expected sequence number to one. Next, is
the meaty method <code>handle_message()</code>:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation, continued</div>
<pre class="code">

bool
DepthFeedSubscriber::handle_message(BufferPtr&amp; bp, size_t bytes_transferred)
{
// Decode the message
QuickFAST::Codecs::DataSourceBuffer source(bp-&gt;c_array(), bytes_transferred);
QuickFAST::Codecs::SingleMessageConsumer consumer;
QuickFAST::Codecs::GenericMessageBuilder builder(consumer);
decoder_.decodeMessage(source, builder);
QuickFAST::Messages::Message&amp; msg(consumer.message());

// Examine message contents
uint64_t seq_num, msg_type, timestamp;
const QuickFAST::StringBuffer* string_buffer;
size_t bids_length, asks_length;
std::string symbol;
if (!msg.getUnsignedInteger(*id_seq_num_, ValueType::UINT32, seq_num)) {
std::cout &lt;&lt; "Could not get seq num from msg" &lt;&lt; std::endl;
return false;
}
if (seq_num != expected_seq_) {
std::cout &lt;&lt; "ERROR: Got Seq num " &lt;&lt; seq_num &lt;&lt; ", expected "
&lt;&lt; expected_seq_ &lt;&lt; std::endl;
return false;
}
if (!msg.getUnsignedInteger(*id_msg_type_, ValueType::UINT32, msg_type)) {
std::cout &lt;&lt; "Could not get msg type from msg" &lt;&lt; std::endl;
return false;
}
if (!msg.getString(*id_symbol_, ValueType::ASCII, string_buffer)) {
std::cout &lt;&lt; "Could not get symbol from msg" &lt;&lt; std::endl;
return false;
}
if (!msg.getUnsignedInteger(*id_timestamp_, ValueType::UINT32, timestamp)) {
std::cout &lt;&lt; "Could not get timestamp from msg" &lt;&lt; std::endl;
return false;
}
bool result = false;
symbol = (std::string)*string_buffer;
switch (msg_type) {
case MSG_TYPE_DEPTH:
result = handle_depth_message(symbol, seq_num, timestamp, msg);
break;
case MSG_TYPE_TRADE:
result = handle_trade_message(symbol, seq_num, timestamp, msg);
break;
default:
std::cout &lt;&lt; "ERROR: Unknown message type " &lt;&lt; msg_type
&lt;&lt; " seq num " &lt;&lt; seq_num &lt;&lt; std::endl;
return false;
}
++expected_seq_;
return result;
}
</pre>

<p>
This method first decodes the FAST message. This is done by stuffing the
message contents into a QuickFAST buffer specific for decoding. Next, a
message builder and consumer are create and associated, and the message is
decoded into the builder. After this, the message is available from the
consumer.
</p>

<p>
The common fields are then checked, by using the type-specific extractors on
the <code>QuickFAST::Messages::Message</code> class, such as
<code>getUnsignedInteger()</code>. This starts with the sequence number,
which is validated against the expected sequence number. Next the message
type, the symbol, and the timestamp are extracted. If any of these fail, the
method exits with an error value.
</p>

<p>
Finally, since the message type is known, the proper type-specific handler
is called.
</p>

<p>
The first helper method logs the contents of the depth for a security:
</p>
<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation, continued</div>
<pre class="code">
void
DepthFeedSubscriber::log_depth(book::Depth&lt;5&gt;&amp; depth)
{
book::DepthLevel* bid = depth.bids();
book::DepthLevel* ask = depth.asks();
printf("----------BID---------- ----------ASK----------\n");
while (bid || ask) {
if (bid &amp;&amp; bid-&gt;order_count()) {
printf("%8.2f %9d [%2d]",
(double)bid-&gt;price() / 100.0,
bid-&gt;aggregate_qty(), bid-&gt;order_count());
if (bid == depth.last_bid_level()) {
bid = NULL;
} else {
++bid;
}
} else {
// Blanklines
printf(" ");
bid = NULL;
}

if (ask &amp;&amp; ask-&gt;order_count()) {
printf(" %8.2f %9d [%2d]\n",
(double)ask-&gt;price() / 100.0,
ask-&gt;aggregate_qty(), ask-&gt;order_count());
if (ask == depth.last_ask_level()) {
ask = NULL;
} else {
++ask;
}
} else {
// Newline
printf("\n");
ask = NULL;
}
}
}
</pre>

<p>
This method is complex, because it logs both bid and ask on the same line.
In its loop, there could be a bid and an ask, only a bid, or only an ask.
</p>

<p>
The next helper handles a depth message:
</p>

<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation, continued</div>
<pre class="code">
bool
DepthFeedSubscriber::handle_depth_message(
const std::string&amp; symbol,
uint64_t&amp; seq_num,
uint64_t&amp; timestamp,
QuickFAST::Messages::Message&amp; msg)
{
size_t bids_length, asks_length;
std::cout &lt;&lt; timestamp
&lt;&lt; " Got depth msg " &lt;&lt; seq_num
&lt;&lt; " for symbol " &lt;&lt; symbol &lt;&lt; std::endl;

// Create or find depth
std::pair&lt;DepthMap::iterator, bool&gt; results = depth_map_.insert(
std::make_pair(symbol, book::Depth&lt;5&gt;()));
book::Depth&lt;5&gt;&amp; depth = results.first-&gt;second;

if (msg.getSequenceLength(*id_bids_, bids_length)) {
for (size_t i = 0; i &lt; bids_length; ++i) {
const QuickFAST::Messages::MessageAccessor* accessor;
if (msg.getSequenceEntry(*id_bids_, i, accessor)) {
uint64_t level_num, price, order_count, aggregate_qty;
if (!accessor-&gt;getUnsignedInteger(*id_level_num_, ValueType::UINT8,
level_num)) {
std::cout &lt;&lt; "Could not get Bid level from depth msg" &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_price_, ValueType::UINT32,
price)) {
std::cout &lt;&lt; "Could not get Bid price from depth msg" &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_order_count_, ValueType::UINT32,
order_count)) {
std::cout &lt;&lt; "Could not get Bid count from depth msg" &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_size_, ValueType::UINT32,
aggregate_qty)) {
std::cout &lt;&lt; "Could not get Bid agg qty from depth msg" &lt;&lt; std::endl;
return false;
}

book::DepthLevel&amp; level = depth.bids()[level_num];
level.set(price, aggregate_qty, order_count);

} else {
std::cout &lt;&lt; "Failed to get bid " &lt;&lt; i &lt;&lt; std::endl;
return false;
}
msg.endSequenceEntry(*id_bids_, i, accessor);
}
}
if (msg.getSequenceLength(*id_asks_, asks_length)) {
for (size_t i = 0; i &lt; asks_length; ++i) {
const QuickFAST::Messages::MessageAccessor* accessor;
if (msg.getSequenceEntry(*id_asks_, i, accessor)) {
uint64_t level_num, price, order_count, aggregate_qty;
if (!accessor-&gt;getUnsignedInteger(*id_level_num_, ValueType::UINT8,
level_num)) {
std::cout &lt;&lt; "Could not get Ask level from depth msg " &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_price_, ValueType::UINT32,
price)) {
std::cout &lt;&lt; "Could not get Ask price from depth msg" &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_order_count_, ValueType::UINT32,
order_count)) {
std::cout &lt;&lt; "Could not get Ask count from depth msg " &lt;&lt; std::endl;
return false;
}
if (!accessor-&gt;getUnsignedInteger(*id_size_, ValueType::UINT32,
aggregate_qty)) {
std::cout &lt;&lt; "Could not get Ask agg qty from depth msg " &lt;&lt; std::endl;
return false;
}

book::DepthLevel&amp; level = depth.asks()[level_num];
level.set(price, aggregate_qty, order_count);

} else {
std::cout &lt;&lt; "Failed to get ask " &lt;&lt; i &lt;&lt; std::endl;
return false;
}
msg.endSequenceEntry(*id_asks_, i, accessor);
}
}
log_depth(depth);
return true;
}
</pre>

<p>
This method has the common fields passed to it, and logs the symbol of the
message. It then finds the proper depth object for the security, or creates
one if this is the first depth message for the security.
</p>
<p>
Next the method must iterate through the sequnce of changed bids. To do
this, the sequence length is first extracted, and each entry accessed through
an accessor.
</p>
<p>
Each bid field is then accessed using type-specific extractors, including the
level number, the price, the number of orders, and the aggregate quantity.
These updated values are then used to update the depth level for that
security.
</p>

<p>
Similar logic is performed for changed ask levels, and the resulting depth
is logged using <code>log_depth()</code>.
</p>

<p>
The final helper method handles a trade message:
</p>
<div class="listing">depth_feed_subscriber.h: DepthFeedSubscriber class implementation, continued</div>
<pre class="code">
bool
DepthFeedSubscriber::handle_trade_message(
const std::string&amp; symbol,
uint64_t&amp; seq_num,
uint64_t&amp; timestamp,
QuickFAST::Messages::Message&amp; msg)
{
uint64_t qty, cost;
// Get trade fields
if (!msg.getUnsignedInteger(*id_qty_, ValueType::UINT32, qty)) {
std::cout &lt;&lt; "Could not qty from trade msg" &lt;&lt; std::endl;
return false;
}
if (!msg.getUnsignedInteger(*id_cost_, ValueType::UINT32, cost)) {
std::cout &lt;&lt; "Could not get cost from trade msg" &lt;&lt; std::endl;
return false;
}

double price = (double) cost / (qty * 100);
std::cout &lt;&lt; timestamp
&lt;&lt; " Got trade msg " &lt;&lt; seq_num
&lt;&lt; " for symbol " &lt;&lt; symbol
&lt;&lt; ": " &lt;&lt; qty &lt;&lt; "@" &lt;&lt; price
&lt;&lt; std::endl;

return true;
}

} }
</pre>

<p>
Like <code>handle_depth_message()</code>, <code>handle_trade_message</code>
is passed the common fields. It extracts the trade quantity and cost, from
which it calculates the price of the trade (after adjusting for the tick
size). The result is then logged.
</p>

<h2>Final Big Topic</h2>

<p>
Expand Down

0 comments on commit 5719ec0

Please sign in to comment.