Skip to content

Commit

Permalink
better flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Jul 23, 2014
1 parent 765356f commit 9b9a9b9
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 224 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ omit =
*pulsar/apps/test/pep.py
*pulsar/apps/tx/*
examples/httpbin/config.py
examples/httpbin/throttle.py
examples/philosophers/config.py
examples/webmail/*
examples/taskqueue/winservice.py
Expand Down
36 changes: 34 additions & 2 deletions docs/source/tutorials/benchmarking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,40 @@ Benchmarking
Test Concurrency
======================

The simpliest way to benchmark pulsar is to use the :ref:`HttpBin` example
application. It is a web server responding to several urls.
The simpliest way to benchmark pulsar is to use the :ref:`HttpBin <tutorials-httpbin>`
example application. It is a web server responding to several urls::

python manage.py -b :9060


Test streaming
------------------

The ``stream`` url simulates a streaming response of chunks of length given by the
first url parameter repeated by the second url parameter.

For example, this measure the download speed when streaming chunks of 4KB 1,048,576 times
for a total of 4GB of data::

curl http://localhost:9060/stream/4096/1048576 > /dev/null
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 4096M 0 4096M 0 0 87.8M 0 --:--:-- 0:00:46 --:--:-- 73.1M

The download speed is relatively slow considering the test is on localhost.
However, when switching the two numbers::

curl http://localhost:9060/stream/1048576/4096 > /dev/null
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 4096M 0 4096M 0 0 658M 0 --:--:-- 0:00:06 --:--:-- 673M

We have a download speed almost 10 times higher. Why?

Because the iterator send large chunks to pulsar and the buffering is handled
by asyncio rather than the iterator and therefore it is much more efficient.
In other words the bigger the chunks the faster the transfer rate and
the more responsive (in term of concurrent connections) the server will be.

Test slow clients
======================
Expand Down
69 changes: 69 additions & 0 deletions examples/httpbin/throttle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
'''Not used yet
'''

class Throttling(object):
_types = ('read', 'write')

def __init__(self, protocol, read_limit=None, write_limit=None):
self.protocol = protocol
self.limits = (read_limit, write_limit)
self.this_second = [0, 0]
self._checks = [None, None]
self._throttle = [None, None]

def data_received(self, data):
'''Receive new data and pause the receiving end of the transport
if the raed limit per second is surpassed
'''
if not self._checks[0]:
self._check_limit(0)
self._register(data, 0)
self.protocol.data_received(data)

def write(self, data):
'''Write ``data`` and return either and empty tuple or
an :class:`~asyncio.Future` called back once the write limit
per second is not surpassed.
'''
if not self._checks[1]:
self._check_limit(1)
self._register(data, 1)
result = self.protocol.write(data)
waiter = self._throttle[1]
return self.gather(result, waiter)

# INTERNALS
def _register(self, data, rw):
self.this_second[rw] += len(data)

def _check_limit(self, rw):
loop = self.protocol._loop
limit = self.limits[rw]
if limit and self.this_second[rw] > limit:
self._thorttle(rw)
next = (float(self.this_second[rw]) / limit) - 1.0
loop.call_later(next, self._un_throttle, rw)
self.this_second[rw] = 0
self._checks[rw] = loop.call_later(1, self._check_limit, rw)

def _throttle(self, rw):
self.logger.debug('Throttling %s', self._types[rw])
if rw:
assert not self._throttle[rw]
self._throttle[rw] = Future(self.protocol._loop)
else:
self._throttle[rw] = True
t = self.protocol._transport
t.pause_reading()

def _un_throttle(self, rw):
self.protocol.logger.debug('Un-throttling %s', self._types[rw])
if rw:
waiter = self._throttle[rw]
self._throttle[rw] = None
if waiter and not waiter.done():
waiter.set_result(None)
else:
t = self.protocol._transport
if t:
t.resume_reading()
Loading

0 comments on commit 9b9a9b9

Please sign in to comment.