Skip to content

Commit

Permalink
writing model from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fmcclean committed Sep 8, 2020
1 parent 9171102 commit a2e40f5
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ jobs:
POSTGRES_PASSWORD: password
rabbit:
image: rabbitmq
ports:
- 5672:5672
steps:
- name: checkout
uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions citycatpg/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def add(self, con: connection):

def get_model(self, con):

assert self.rain_table is not None or self.rain_total is not None

rainfall_polygons = self.get_rainfall_polygons(con)
if rainfall_polygons is not None:
gids = rainfall_polygons.gid.unique().tolist()
Expand Down
19 changes: 10 additions & 9 deletions citycatpg/server.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import pika
from .run import fetch


def callback(ch, method, properties, body):
print(body)
def run_server(con, queue='runs', host='localhost', port=5672):
def callback(ch, method, properties, body):

if method.routing_key == 'test':
ch.close()
run = fetch(con, body.decode('utf8'))
run.get_model(con)
run.model.write('tests/test_model_from_queue')
if queue == 'test':
ch.close()


def run_server(queue='runs'):

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', port=5672))
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
channel = connection.channel()
channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback)
channel.start_consuming()
channel.start_consuming()
5 changes: 3 additions & 2 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ def test_add_run(self):
with con:
with con.cursor() as cur:
cur.execute('DROP TABLE IF EXISTS runs')

Run(run_duration=500, srid=3035, resolution=90).add(con)
run = Run(run_duration=500, srid=3035, resolution=90, rain_total=100, rain_duration=200)
run.add(con)
return run

def test_fetch(self):
with con:
Expand Down
9 changes: 7 additions & 2 deletions tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from citycatpg import server
from unittest import TestCase
import pika
from .test_run import TestRun, con


class TestServer(TestCase):
Expand All @@ -10,7 +11,11 @@ def test_run_server(self):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', port=5672))
channel = connection.channel()
channel.queue_declare(queue=queue)

test_run = TestRun()
run = test_run.test_add_run()

channel.basic_publish(exchange='',
routing_key=queue,
body='Hello World!')
server.run_server(queue)
body=run.run_id)
server.run_server(queue=queue, con=con)

0 comments on commit a2e40f5

Please sign in to comment.