Skip to content

Commit

Permalink
Fixed consume failed, when publish message in consumer. (hyperf#1472)
Browse files Browse the repository at this point in the history
* Fixed consume failed, when publish message in consumer.

* Update CHANGELOG.md

* Fixed connection name not found.

* Update KeepaliveConnection.php
  • Loading branch information
limingxinleo committed Mar 27, 2020
1 parent eff0723 commit 293cb66
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# v1.1.23 - TBD

## Fixed

- [#1472](https://github.com/hyperf/hyperf/pull/1472) Fixed consume failed, when publish message in consumer.

# v1.1.22 - 2020-03-26

## Added
Expand Down
39 changes: 17 additions & 22 deletions src/nsq/src/Nsq.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ public function publish(string $topic, $message, float $deferTime = 0.0): bool

public function subscribe(string $topic, string $channel, callable $callback): void
{
$this->sendSub($topic, $channel);

while ($this->sendRdy()) {
$this->call(function (Socket $socket) use ($callback) {
$this->call(function (Socket $socket) use ($topic, $channel,$callback) {
$this->sendSub($socket, $topic, $channel);
while ($this->sendRdy($socket)) {
$reader = new Subscriber($socket);
$reader->recv();

Expand All @@ -111,8 +110,8 @@ public function subscribe(string $topic, string $channel, callable $callback): v
$socket->sendAll($this->builder->buildFin($message->getMessageId()));
}
}
});
}
}
});
}

protected function sendMPub(string $topic, array $messages): bool
Expand Down Expand Up @@ -162,26 +161,22 @@ protected function call(Closure $closure)
}
}

protected function sendSub(string $topic, string $channel): void
protected function sendSub(Socket $socket, string $topic, string $channel): void
{
$this->call(function (Socket $socket) use ($topic, $channel) {
$result = $socket->sendAll($this->builder->buildSub($topic, $channel));
if ($result === false) {
throw new SocketSendException('SUB send failed, the errorCode is ' . $socket->errCode);
}
$socket->recv();
});
$result = $socket->sendAll($this->builder->buildSub($topic, $channel));
if ($result === false) {
throw new SocketSendException('SUB send failed, the errorCode is ' . $socket->errCode);
}
$socket->recv();
}

protected function sendRdy()
protected function sendRdy(Socket $socket)
{
return $this->call(function (Socket $socket) {
$result = $socket->sendAll($this->builder->buildRdy(1));
if ($result === false) {
throw new SocketSendException('RDY send failed, the errorCode is ' . $socket->errCode);
}
$result = $socket->sendAll($this->builder->buildRdy(1));
if ($result === false) {
throw new SocketSendException('RDY send failed, the errorCode is ' . $socket->errCode);
}

return $result;
});
return $result;
}
}
3 changes: 3 additions & 0 deletions src/nsq/src/Pool/NsqConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public function __construct(ContainerInterface $container, Pool $pool, array $co
{
$this->config = array_merge($this->config, $config);
$this->builder = $container->get(MessageBuilder::class);
if ($pool instanceof NsqPool) {
$this->name = 'nsq.connection.' . $pool->getName();
}
parent::__construct($container, $pool);
}

Expand Down
5 changes: 5 additions & 0 deletions src/nsq/src/Pool/NsqPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public function __construct(ContainerInterface $container, string $name)
parent::__construct($container, $options);
}

public function getName(): string
{
return $this->name;
}

protected function createConnection(): ConnectionInterface
{
return make(NsqConnection::class, [
Expand Down
8 changes: 7 additions & 1 deletion src/pool/src/KeepaliveConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ abstract class KeepaliveConnection implements ConnectionInterface
*/
protected $connected = false;

/**
* @var string
*/
protected $name = 'keepalive.connection';

public function __construct(ContainerInterface $container, Pool $pool)
{
$this->container = $container;
Expand Down Expand Up @@ -152,7 +157,8 @@ public function close(): bool

public function isTimeout(): bool
{
return $this->lastUseTime < microtime(true) - $this->pool->getOption()->getMaxIdleTime();
return $this->lastUseTime < microtime(true) - $this->pool->getOption()->getMaxIdleTime()
&& $this->channel->length() > 0;
}

protected function addHeartbeat()
Expand Down
10 changes: 5 additions & 5 deletions src/pool/src/SimplePool/PoolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public function get(string $name, callable $callback, array $option = []): Pool
return $this->pools[$name];
}

public function getPoolNames(): array
{
return array_keys($this->pools);
}

protected function hasConfig(string $name): bool
{
return isset($this->configs[$name]);
Expand All @@ -70,9 +75,4 @@ protected function getConfig(string $name): Config
{
return $this->configs[$name];
}

public function getPoolNames(): array
{
return array_keys($this->pools);
}
}

0 comments on commit 293cb66

Please sign in to comment.