From 293cb6662ac1f9984df04d8acc507629ccb892d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= Date: Fri, 27 Mar 2020 16:30:04 +0800 Subject: [PATCH] Fixed consume failed, when publish message in consumer. (#1472) * Fixed consume failed, when publish message in consumer. * Update CHANGELOG.md * Fixed connection name not found. * Update KeepaliveConnection.php --- CHANGELOG.md | 4 +++ src/nsq/src/Nsq.php | 39 +++++++++++-------------- src/nsq/src/Pool/NsqConnection.php | 3 ++ src/nsq/src/Pool/NsqPool.php | 5 ++++ src/pool/src/KeepaliveConnection.php | 8 ++++- src/pool/src/SimplePool/PoolFactory.php | 10 +++---- 6 files changed, 41 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1c043a8b2..b8732d5de9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # v1.1.23 - TBD +## Fixed + +- [#1472](https://github.com/hyperf/hyperf/pull/1472) Fixed consume failed, when publish message in consumer. + # v1.1.22 - 2020-03-26 ## Added diff --git a/src/nsq/src/Nsq.php b/src/nsq/src/Nsq.php index 6540b32f8c..d26f66bbbe 100644 --- a/src/nsq/src/Nsq.php +++ b/src/nsq/src/Nsq.php @@ -82,10 +82,9 @@ public function publish(string $topic, $message, float $deferTime = 0.0): bool public function subscribe(string $topic, string $channel, callable $callback): void { - $this->sendSub($topic, $channel); - - while ($this->sendRdy()) { - $this->call(function (Socket $socket) use ($callback) { + $this->call(function (Socket $socket) use ($topic, $channel,$callback) { + $this->sendSub($socket, $topic, $channel); + while ($this->sendRdy($socket)) { $reader = new Subscriber($socket); $reader->recv(); @@ -111,8 +110,8 @@ public function subscribe(string $topic, string $channel, callable $callback): v $socket->sendAll($this->builder->buildFin($message->getMessageId())); } } - }); - } + } + }); } protected function sendMPub(string $topic, array $messages): bool @@ -162,26 +161,22 @@ protected function call(Closure $closure) } } - protected function sendSub(string $topic, string $channel): void + protected function sendSub(Socket $socket, string $topic, string $channel): void { - $this->call(function (Socket $socket) use ($topic, $channel) { - $result = $socket->sendAll($this->builder->buildSub($topic, $channel)); - if ($result === false) { - throw new SocketSendException('SUB send failed, the errorCode is ' . $socket->errCode); - } - $socket->recv(); - }); + $result = $socket->sendAll($this->builder->buildSub($topic, $channel)); + if ($result === false) { + throw new SocketSendException('SUB send failed, the errorCode is ' . $socket->errCode); + } + $socket->recv(); } - protected function sendRdy() + protected function sendRdy(Socket $socket) { - return $this->call(function (Socket $socket) { - $result = $socket->sendAll($this->builder->buildRdy(1)); - if ($result === false) { - throw new SocketSendException('RDY send failed, the errorCode is ' . $socket->errCode); - } + $result = $socket->sendAll($this->builder->buildRdy(1)); + if ($result === false) { + throw new SocketSendException('RDY send failed, the errorCode is ' . $socket->errCode); + } - return $result; - }); + return $result; } } diff --git a/src/nsq/src/Pool/NsqConnection.php b/src/nsq/src/Pool/NsqConnection.php index 4034c90d2b..08b76d190b 100644 --- a/src/nsq/src/Pool/NsqConnection.php +++ b/src/nsq/src/Pool/NsqConnection.php @@ -40,6 +40,9 @@ public function __construct(ContainerInterface $container, Pool $pool, array $co { $this->config = array_merge($this->config, $config); $this->builder = $container->get(MessageBuilder::class); + if ($pool instanceof NsqPool) { + $this->name = 'nsq.connection.' . $pool->getName(); + } parent::__construct($container, $pool); } diff --git a/src/nsq/src/Pool/NsqPool.php b/src/nsq/src/Pool/NsqPool.php index 4810298647..0ea4e32558 100644 --- a/src/nsq/src/Pool/NsqPool.php +++ b/src/nsq/src/Pool/NsqPool.php @@ -45,6 +45,11 @@ public function __construct(ContainerInterface $container, string $name) parent::__construct($container, $options); } + public function getName(): string + { + return $this->name; + } + protected function createConnection(): ConnectionInterface { return make(NsqConnection::class, [ diff --git a/src/pool/src/KeepaliveConnection.php b/src/pool/src/KeepaliveConnection.php index e9516a33c7..31e7b54274 100644 --- a/src/pool/src/KeepaliveConnection.php +++ b/src/pool/src/KeepaliveConnection.php @@ -54,6 +54,11 @@ abstract class KeepaliveConnection implements ConnectionInterface */ protected $connected = false; + /** + * @var string + */ + protected $name = 'keepalive.connection'; + public function __construct(ContainerInterface $container, Pool $pool) { $this->container = $container; @@ -152,7 +157,8 @@ public function close(): bool public function isTimeout(): bool { - return $this->lastUseTime < microtime(true) - $this->pool->getOption()->getMaxIdleTime(); + return $this->lastUseTime < microtime(true) - $this->pool->getOption()->getMaxIdleTime() + && $this->channel->length() > 0; } protected function addHeartbeat() diff --git a/src/pool/src/SimplePool/PoolFactory.php b/src/pool/src/SimplePool/PoolFactory.php index 187ac4c827..d80469feec 100644 --- a/src/pool/src/SimplePool/PoolFactory.php +++ b/src/pool/src/SimplePool/PoolFactory.php @@ -61,6 +61,11 @@ public function get(string $name, callable $callback, array $option = []): Pool return $this->pools[$name]; } + public function getPoolNames(): array + { + return array_keys($this->pools); + } + protected function hasConfig(string $name): bool { return isset($this->configs[$name]); @@ -70,9 +75,4 @@ protected function getConfig(string $name): Config { return $this->configs[$name]; } - - public function getPoolNames(): array - { - return array_keys($this->pools); - } }