Skip to content

Commit

Permalink
feat: support grpc metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Reasno committed Sep 24, 2020
1 parent 56b5103 commit d42cbe4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
22 changes: 14 additions & 8 deletions src/grpc-client/src/BaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected function _simpleRequest(
array $metadata = [],
array $options = []
) {
$options['headers'] = ($options['headers'] ?? []) + $metadata;
$streamId = retry($this->options['retry_attempts'] ?? 3, function () use ($method, $argument, $options) {
$streamId = $this->send($this->buildRequest($method, $argument, $options));
if ($streamId <= 0) {
Expand Down Expand Up @@ -130,7 +131,8 @@ protected function _clientStreamRequest(
$call = new ClientStreamingCall();
$call->setClient($this->_getGrpcClient())
->setMethod($method)
->setDeserialize($deserialize);
->setDeserialize($deserialize)
->setMetadata($metadata);

return $call;
}
Expand All @@ -152,11 +154,13 @@ protected function _serverStreamRequest(
$deserialize,
array $metadata = [],
array $options = []
) {
)
{
$call = new ServerStreamingCall();
$call->setClient($this->_getGrpcClient());
$call->setMethod($method);
$call->setDeserialize($deserialize);
$call->setClient($this->_getGrpcClient())
->setMethod($method)
->setDeserialize($deserialize)
->setMetadata($metadata);

return $call;
}
Expand All @@ -169,13 +173,15 @@ protected function _serverStreamRequest(
*/
protected function _bidiRequest(
string $method,
$deserialize
$deserialize,
array $metadata = [],
array $options = []
): BidiStreamingCall {
$call = new BidiStreamingCall();
$call->setClient($this->_getGrpcClient())
->setMethod($method)
->setDeserialize($deserialize);

->setDeserialize($deserialize)
->setMetadata($metadata);
return $call;
}

Expand Down
10 changes: 8 additions & 2 deletions src/grpc-client/src/GrpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public function isRunning(): bool

public function getHttpClient(): SwooleHttp2Client
{
if (! $this->httpClient instanceof SwooleHttp2Client) {
if (!$this->httpClient instanceof SwooleHttp2Client) {
$this->httpClient = $this->buildHttp2Client();
}
return $this->httpClient;
Expand All @@ -217,14 +217,20 @@ public function getHttpClient(): SwooleHttp2Client
* Open a stream and return the id.
* @param mixed $data
*/
public function openStream(string $path, $data = '', string $method = '', bool $usePipelineRead = false): int
public function openStream(
string $path, $data = '',
string $method = '',
bool $usePipelineRead = false,
array $metadata = []
): int
{
$method = $method ?: ($data ? 'POST' : 'GET');
$request = new Request($method);
$request->path = $path;
if ($data) {
$request->data = $data;
}
$request->headers = $request->headers + $metadata;
$request->pipeline = true;
if ($usePipelineRead) {
// @phpstan-ignore-next-line
Expand Down
19 changes: 17 additions & 2 deletions src/grpc-client/src/StreamingCall.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class StreamingCall
* @var int
*/
protected $streamId = 0;
/**
* @var array
*/
protected $metadata;

public function setClient(GrpcClient $client): self
{
Expand Down Expand Up @@ -67,14 +71,21 @@ public function setStreamId(int $streamId): self
return $this;
}

public function setMetadata(array $metadata): self
{
$this->metadata = $metadata;
return $this;
}

public function send($message = null): void
{
if ($this->getStreamId() <= 0) {
$streamId = $this->client->openStream(
$this->method,
Parser::serializeMessage($message),
'',
true
true,
$this->metadata
);
if ($streamId <= 0) {
throw $this->newException();
Expand All @@ -88,7 +99,11 @@ public function send($message = null): void
public function push($message): void
{
if (! $this->getStreamId()) {
$this->setStreamId($this->client->openStream($this->method, null, '', true));
$this->setStreamId($this->client->openStream($this->method,
null,
'',
true,
$this->metadata));
}
$success = $this->client->write($this->getStreamId(), Parser::serializeMessage($message), false);
if (! $success) {
Expand Down

0 comments on commit d42cbe4

Please sign in to comment.