Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing 'push_notifications_only' config :: Aws Only for now #136

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/aws-provider.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ fetch those messages.
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| Option | Description | Default Value |
+==========================+===========================================================================================+===============+
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
| ``push_notifications_only`` | Use only SNS without SQS. Beware of max retention period of 1 hour for messages | ``false`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
+--------------------------+-------------------------------------------------------------------------------------------+---------------+

.. code-block:: php
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ The options and their descriptions are listed below.
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``push_notifications`` | Whether or not to POST notifications to subscribers of a Queue | ``false`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``push_notifications_only`` | AWS Only for now. Disable the Queue Polling if you do not require message for 1 hour+ | ``false`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``notification_retries`` | How many attempts notifications are resent in case of errors - if supported | ``3`` |
+---------------------------------+--------------------------------------------------------------------------------------------+---------------+
| ``message_delay`` | Time in seconds before a published Message is available to be read in a Queue | ``0`` |
Expand Down Expand Up @@ -137,6 +139,7 @@ A working configuration would look like the following
options:
queue_name: my_actual_queue_name
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand All @@ -154,6 +157,7 @@ A working configuration would look like the following
queue_name: my_actual_queue_name.fifo
push_notifications: true
notification_retries: 3
push_notifications_only: false
message_delay: 0
message_timeout: 30
message_expiration: 604800
Expand Down
1 change: 1 addition & 0 deletions integration_tests/Provider/IronMqProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private function getIronMqProvider(array $options = [])
[
'logging_enabled' => false,
'push_notifications' => true,
'push_notifications_only' => false,
'push_type' => 'multicast',
'notification_retries' => 3,
'notification_retries_delay' => 60,
Expand Down
4 changes: 4 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ private function getQueuesNode()
->defaultFalse()
->info('Whether notifications are sent to the subscribers')
->end()
->booleanNode('push_notifications_only')
->defaultFalse()
->info('Where to disable queue polling')
->end()
->scalarNode('push_type')
->defaultValue('multicast')
->info('Whether the push queue is multicast or unicast')
Expand Down
1 change: 1 addition & 0 deletions src/Event/NotificationEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,5 @@ public function getNotification()
{
return $this->notification;
}

}
2 changes: 2 additions & 0 deletions src/Message/Notification.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Notification
* @var ArrayCollection
*/
protected $metadata;


/**
* Constructor.
Expand Down Expand Up @@ -101,4 +102,5 @@ public function getMetadata()
{
return $this->metadata;
}

}
145 changes: 86 additions & 59 deletions src/Provider/AwsProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ public function create()
// Create the SNS Topic
$this->createTopic();

// Add the SQS Queue as a Subscriber to the SNS Topic
$this->subscribeToTopic(
$this->topicArn,
'sqs',
$this->sqs->getQueueArn($this->queueUrl)
);
if(!$this->options['push_notifications_only']) {
// Add the SQS Queue as a Subscriber to the SNS Topic
$this->subscribeToTopic(
$this->topicArn,
'sqs',
$this->sqs->getQueueArn($this->queueUrl)
);
}

// Add configured Subscribers to the SNS Topic
foreach ($this->options['subscribers'] as $subscriber) {
Expand Down Expand Up @@ -151,11 +153,12 @@ public function destroy()

$this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
}


$topicExists = $this->topicExists();
$key = $this->getNameWithPrefix() . '_arn';
$this->cache->delete($key);

if ($this->topicExists() || !empty($this->queueUrl)) {
if ($this->options['push_notifications_only'] || ($topicExists || !empty($this->queueUrl))) {
// Delete the SNS Topic
$topicArn = !empty($this->topicArn)
? $this->topicArn
Expand Down Expand Up @@ -197,22 +200,33 @@ public function publish(array $message, array $options = [])
$publishStart = microtime(true);

// ensures that the SQS Queue and SNS Topic exist
if (!$this->queueExists()) {
$this->create();
if(!$this->options['push_notifications_only']) {
if (!$this->queueExists()) {
$this->create();
}
}

if ($options['push_notifications']) {

if (!$this->topicExists()) {
$this->create();
}

$message = [
'default' => $this->getNameWithPrefix(),
'sqs' => json_encode($message),
'http' => $this->getNameWithPrefix(),
'https' => $this->getNameWithPrefix(),
];

if($this->options['push_notifications_only']) {
$jsonMessage = json_encode($message);
$message = [
'default' => $jsonMessage,
'http' => $jsonMessage,
'https' => $jsonMessage,
];
} else {
$message = [
'default' => $this->getNameWithPrefix(),
'sqs' => json_encode($message),
'http' => $this->getNameWithPrefix(),
'https' => $this->getNameWithPrefix(),
];
}

$result = $this->sns->publish([
'TopicArn' => $this->topicArn,
Expand Down Expand Up @@ -325,22 +339,26 @@ public function receive(array $options = [])
*/
public function delete($id)
{
if (!$this->queueExists()) {
return false;
}
if($this->options['push_notifications_only']) {
return true;
} else {
if (!$this->queueExists()) {
return false;
}

$this->sqs->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $id
]);
$this->sqs->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $id
]);

$context = [
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $id
];
$this->log(200,"Message deleted from SQS Queue", $context);
$context = [
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $id
];
$this->log(200,"Message deleted from SQS Queue", $context);

return true;
return true;
}
}

/**
Expand Down Expand Up @@ -389,40 +407,42 @@ public function queueExists()
*/
public function createQueue()
{
$attributes = [
'VisibilityTimeout' => $this->options['message_timeout'],
'MessageRetentionPeriod' => $this->options['message_expiration'],
'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
];
if(!$this->options['push_notifications_only']) {
$attributes = [
'VisibilityTimeout' => $this->options['message_timeout'],
'MessageRetentionPeriod' => $this->options['message_expiration'],
'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
];

if ($this->isQueueFIFO()) {
$attributes['FifoQueue'] = 'true';
$attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true
? 'true'
: 'false';
}
if ($this->isQueueFIFO()) {
$attributes['FifoQueue'] = 'true';
$attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true
? 'true'
: 'false';
}

$result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]);
$result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]);

$this->queueUrl = $result->get('QueueUrl');
$this->queueUrl = $result->get('QueueUrl');

$key = $this->getNameWithPrefix() . '_url';
$this->cache->save($key, $this->queueUrl);
$key = $this->getNameWithPrefix() . '_url';
$this->cache->save($key, $this->queueUrl);

$this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
$this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);

if ($this->options['push_notifications']) {
if ($this->options['push_notifications']) {

$policy = $this->createSqsPolicy();
$policy = $this->createSqsPolicy();

$this->sqs->setQueueAttributes([
'QueueUrl' => $this->queueUrl,
'Attributes' => [
'Policy' => $policy,
]
]);
$this->sqs->setQueueAttributes([
'QueueUrl' => $this->queueUrl,
'Attributes' => [
'Policy' => $policy,
]
]);

$this->log(200, "Created Updated SQS Policy");
$this->log(200, "Created Updated SQS Policy");
}
}
}

Expand Down Expand Up @@ -643,13 +663,20 @@ public function onNotification(NotificationEvent $event, $eventName, EventDispat

return;
}

$messages = $this->receive();
foreach ($messages as $message) {

if($this->options['push_notifications_only']) {
$notification = $event->getNotification();
$message = new Message($notification->getId(), $notification->getBody(), (array)$notification->getMetadata());
$messageEvent = new MessageEvent($this->name, $message);
$dispatcher->dispatch(Events::Message($this->name), $messageEvent);
} else {
$messages = $this->receive();
foreach ($messages as $message) {
$messageEvent = new MessageEvent($this->name, $message);
$dispatcher->dispatch(Events::Message($this->name), $messageEvent);
}
}

}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Resources/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ uecode_qpush:
provider: aws #or ironmq
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand Down
5 changes: 5 additions & 0 deletions tests/Fixtures/config_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ uecode_qpush:
provider: aws
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand All @@ -48,6 +49,7 @@ uecode_qpush:
provider: secondary_aws
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand All @@ -60,6 +62,7 @@ uecode_qpush:
provider: aws
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand All @@ -74,6 +77,7 @@ uecode_qpush:
provider: ironmq
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand All @@ -86,6 +90,7 @@ uecode_qpush:
provider: secondary_ironmq
options:
push_notifications: true
push_notifications_only: false
notification_retries: 3
message_delay: 0
message_timeout: 30
Expand Down
19 changes: 10 additions & 9 deletions tests/Provider/AbstractProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ private function getTestProvider(array $options = [])

$options = array_merge(
[
'logging_enabled' => false,
'push_notifications' => true,
'notification_retries' => 3,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'receive_wait_time' => 3,
'subscribers' => [
'logging_enabled' => false,
'push_notifications' => true,
'push_notifications_only' => false,
'notification_retries' => 3,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'receive_wait_time' => 3,
'subscribers' => [
[ 'protocol' => 'http', 'endpoint' => 'http://fake.com' ]
]
],
Expand Down
1 change: 1 addition & 0 deletions tests/Provider/AwsProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private function getAwsProvider(array $options = [])
[
'logging_enabled' => false,
'push_notifications' => true,
'push_notifications_only' => false,
'notification_retries' => 3,
'message_delay' => 0,
'message_timeout' => 30,
Expand Down
19 changes: 10 additions & 9 deletions tests/Provider/CustomProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,16 @@ public function testReceive()
protected function getCustomProvider()
{
$options = [
'logging_enabled' => false,
'push_notifications' => true,
'notification_retries' => 3,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'receive_wait_time' => 3,
'subscribers' => []
'logging_enabled' => false,
'push_notifications' => true,
'push_notifications_only' => false,
'notification_retries' => 3,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'receive_wait_time' => 3,
'subscribers' => []
];

$cache = $this->getMock(
Expand Down
Loading