diff --git a/src/Driver/Http1Driver.php b/src/Driver/Http1Driver.php index 12960ab2..f7a8d2bc 100644 --- a/src/Driver/Http1Driver.php +++ b/src/Driver/Http1Driver.php @@ -825,6 +825,14 @@ protected function write(Request $request, Response $response): void $this->send($lastWrite, $response, $request); if ($response->isUpgraded()) { + if ($request->getMethod() === "CONNECT") { + $status = $response->getStatus(); + if ($status < 200 || $status > 299) { + return; + } + } elseif ($response->getStatus() !== HttpStatus::SWITCHING_PROTOCOLS) { + return; + } $this->upgrade($request, $response); } } finally { @@ -960,7 +968,7 @@ private function filter(Response $response, ?Request $request, string $protocol $shouldClose = $request === null || \array_reduce($requestConnectionHeaders, $closeReduce, false) || \array_reduce($responseConnectionHeaders, $closeReduce, false) - || $protocol === "1.0" && !\array_reduce($requestConnectionHeaders, $keepAliveReduce, false); + || ($protocol === "1.0" && !\array_reduce($requestConnectionHeaders, $keepAliveReduce, false)); if ($contentLength !== null) { unset($headers["transfer-encoding"]); diff --git a/src/Driver/Http2Driver.php b/src/Driver/Http2Driver.php index 5ae86511..84e257d8 100644 --- a/src/Driver/Http2Driver.php +++ b/src/Driver/Http2Driver.php @@ -2,6 +2,7 @@ namespace Amp\Http\Server\Driver; +use Amp\ByteStream\Pipe; use Amp\ByteStream\ReadableIterableStream; use Amp\ByteStream\ReadableStream; use Amp\ByteStream\StreamException; @@ -20,6 +21,7 @@ use Amp\Http\Server\ClientException; use Amp\Http\Server\Driver\Internal\Http2Stream; use Amp\Http\Server\Driver\Internal\StreamHttpDriver; +use Amp\Http\Server\Driver\Internal\UnbufferedBodyStream; use Amp\Http\Server\ErrorHandler; use Amp\Http\Server\Push; use Amp\Http\Server\Request; @@ -326,6 +328,15 @@ private function send(int $id, Response $response, Request $request, Cancellatio return; } + if ($response->isUpgraded()) { + if ($request->getMethod() === "CONNECT") { + $status = $response->getStatus(); + if ($status >= 200 && $status <= 299) { + $this->upgrade($request, $response); + } + } + } + $body = $response->getBody(); $chunk = $body->read($cancellation); @@ -733,7 +744,9 @@ private function readPreface(): string Http2Parser::MAX_HEADER_LIST_SIZE, $this->headerSizeLimit, Http2Parser::MAX_FRAME_SIZE, - self::DEFAULT_MAX_FRAME_SIZE + self::DEFAULT_MAX_FRAME_SIZE, + 0x8, // TODO move to Http2Parser::ENABLE_CONNECT_PROTOCOL + 1 ), Http2Parser::SETTINGS, Http2Parser::NO_FLAG @@ -867,11 +880,23 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool { foreach ($pseudo as $name => $value) { if (!isset(Http2Parser::KNOWN_REQUEST_PSEUDO_HEADERS[$name])) { - throw new Http2StreamException( - "Invalid pseudo header", - $streamId, - Http2Parser::PROTOCOL_ERROR - ); + if ($name === ":protocol") { + if ($pseudo[":method"] !== "CONNECT") { + throw new Http2StreamException( + "The :protocol pseudo header is only allowed for CONNECT methods", + $streamId, + Http2Parser::PROTOCOL_ERROR + ); + } + // Stuff it into the headers for applications to read + $headers[":protocol"] = [$value]; + } else { + throw new Http2StreamException( + "Invalid pseudo header", + $streamId, + Http2Parser::PROTOCOL_ERROR + ); + } } } @@ -952,7 +977,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool throw new Http2StreamException("Shutting down", $streamId, Http2Parser::REFUSED_STREAM); } - if (!isset($pseudo[":method"], $pseudo[":path"], $pseudo[":scheme"], $pseudo[":authority"]) + if (!isset($pseudo[":method"], $pseudo[":authority"]) || isset($headers["connection"]) || $pseudo[":path"] === '' || (isset($headers["te"]) && \implode($headers["te"]) !== "trailers") @@ -964,6 +989,19 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool ); } + // Per RFC 8441 Section 4, Extended CONNECT (recognized by the existence of :protocol) must include :path and :scheme, + // but normal CONNECT must not according to RFC 9113 Section 8.5. + if ($pseudo[":method"] === "CONNECT" && !isset($pseudo[":protocol"]) && !isset($pseudo[":path"]) && !isset($pseudo[":scheme"])) { + $pseudo[":path"] = ""; + $pseudo[":scheme"] = null; + } elseif (!isset($pseudo[":path"], $pseudo[":scheme"])) { + throw new Http2StreamException( + "Invalid header values", + $streamId, + Http2Parser::PROTOCOL_ERROR + ); + } + [':method' => $method, ':path' => $target, ':scheme' => $scheme, ':authority' => $host] = $pseudo; $query = null; @@ -1108,6 +1146,44 @@ function (int $bodySize) use ($streamId) { $stream->pendingResponse = async($this->handleRequest(...), $request); } + /** + * Invokes the upgrade handler of the Response with the socket upgraded from the HTTP server. + */ + private function upgrade(Request $request, Response $response): void + { + $upgradeHandler = $response->getUpgradeHandler(); + if (!$upgradeHandler) { + throw new \Error('Response was not upgraded'); + } + + $client = $request->getClient(); + + // The input RequestBody are parsed raw DATA frames - exactly what we need (see CONNECT) + $inputStream = new UnbufferedBodyStream($request->getBody()); + $request->setBody(""); // hide the body from the upgrade handler, it's available in the UpgradedSocket + + // The output of an upgraded connection is just DATA frames + $outputPipe = new Pipe(0); + + $upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink()); + + try { + $upgradeHandler($upgraded, $request, $response); + } catch (\Throwable $exception) { + $exceptionClass = $exception::class; + + $this->logger->error( + "Unexpected {$exceptionClass} thrown during socket upgrade, closing stream.", + ['exception' => $exception] + ); + + throw new StreamException(previous: $exception); + } + + $response->removeTrailers(); + $response->setBody($outputPipe->getSource()); + } + public function handleData(int $streamId, string $data): void { $length = \strlen($data); @@ -1382,4 +1458,6 @@ public function getApplicationLayerProtocols(): array { return ['h2']; } + + // TODO add necessary functions to implement webtransport over HTTP/2 - have a closer read of the draft RFC... Needs new stream handlers. } diff --git a/src/Driver/Http3Driver.php b/src/Driver/Http3Driver.php index b3095de0..0d6318ba 100644 --- a/src/Driver/Http3Driver.php +++ b/src/Driver/Http3Driver.php @@ -2,6 +2,7 @@ namespace Amp\Http\Server\Driver; +use Amp\ByteStream\Pipe; use Amp\ByteStream\ReadableIterableStream; use Amp\CancelledException; use Amp\DeferredCancellation; @@ -11,6 +12,7 @@ use Amp\Http\Server\ClientException; use Amp\Http\Server\Driver\Internal\ConnectionHttpDriver; use Amp\Http\Server\Driver\Internal\Http3\Http3ConnectionException; +use Amp\Http\Server\Driver\Internal\Http3\Http3DatagramStream; use Amp\Http\Server\Driver\Internal\Http3\Http3Error; use Amp\Http\Server\Driver\Internal\Http3\Http3Frame; use Amp\Http\Server\Driver\Internal\Http3\Http3Parser; @@ -18,6 +20,7 @@ use Amp\Http\Server\Driver\Internal\Http3\Http3StreamException; use Amp\Http\Server\Driver\Internal\Http3\Http3Writer; use Amp\Http\Server\Driver\Internal\Http3\QPack; +use Amp\Http\Server\Driver\Internal\UnbufferedBodyStream; use Amp\Http\Server\ErrorHandler; use Amp\Http\Server\Request; use Amp\Http\Server\RequestBody; @@ -42,11 +45,17 @@ class Http3Driver extends ConnectionHttpDriver /** @var \WeakMap */ private \WeakMap $requestStreams; + private Http3Parser $parser; private Http3Writer $writer; private QPack $qpack; private int $highestStreamId = 0; private bool $stopping = false; private DeferredCancellation $closeCancellation; + /** @var array */ + private array $settings = []; + private DeferredFuture $parsedSettings; + /** @var arrayqpack = new QPack; $this->requestStreams = new \WeakMap; $this->closeCancellation = new DeferredCancellation; + $this->settings[Http3Settings::MAX_FIELD_SECTION_SIZE->value] = $this->headerSizeLimit; + $this->settings[Http3Settings::ENABLE_CONNECT_PROTOCOL->value] = 1; + $this->parsedSettings = new DeferredFuture; + } + + public function getSettings(): \SplObjectStorage + { + return $this->parsedSettings->getFuture()->await(); + } + + public function addSetting(Http3Settings|int $setting, int $value) + { + $this->settings[\is_int($setting) ? $setting : $setting->value] = $value; + } + + /** @param \Closure(string $buf, QuicSocket $stream): void $handler */ + public function addStreamHandler(int $type, \Closure $handler) + { + $this->streamHandlers[$type] = $handler; } // TODO copied from Http2Driver... @@ -92,7 +119,7 @@ protected function write(Request $request, Response $response): void 'date' => [formatDateHeader()], ]; - // Remove headers that are obsolete in HTTP/2. + // Remove headers that are obsolete in HTTP/3. unset($headers["connection"], $headers["keep-alive"], $headers["transfer-encoding"]); $trailers = $response->getTrailers(); @@ -111,6 +138,15 @@ protected function write(Request $request, Response $response): void return; } + if ($response->isUpgraded()) { + if ($request->getMethod() === "CONNECT") { + $status = $response->getStatus(); + if ($status >= 200 && $status <= 299) { + $this->upgrade($stream, $request, $response); + } + } + } + try { $cancellation = $this->closeCancellation->getCancellation(); @@ -138,6 +174,47 @@ protected function write(Request $request, Response $response): void unset($this->requestStreams[$request]); } + /** + * Invokes the upgrade handler of the Response with the socket upgraded from the HTTP server. + */ + private function upgrade(QuicSocket $socket, Request $request, Response $response): void + { + $upgradeHandler = $response->getUpgradeHandler(); + if (!$upgradeHandler) { + throw new \Error('Response was not upgraded'); + } + + $client = $request->getClient(); + + // The input RequestBody are parsed raw DATA frames - exactly what we need (see CONNECT) + $inputStream = new UnbufferedBodyStream($request->getBody()); + $request->setBody(""); // hide the body from the upgrade handler, it's available in the UpgradedSocket + + // The output of an upgraded connection is just DATA frames + $outputPipe = new Pipe(0); + + $settings = $this->parsedSettings->getFuture()->await(); + $datagramStream = empty($settings[Http3Settings::H3_DATAGRAM->value]) ? null : new Http3DatagramStream($this->parser->receiveDatagram(...), $this->writer->writeDatagram(...), $this->writer->maxDatagramSize(...), $socket); + + $upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink(), $datagramStream); + + try { + $upgradeHandler($upgraded, $request, $response); + } catch (\Throwable $exception) { + $exceptionClass = $exception::class; + + $this->logger->error( + "Unexpected {$exceptionClass} thrown during socket upgrade, closing stream.", + ['exception' => $exception] + ); + + $socket->resetSending(Http3Error::H3_INTERNAL_ERROR->value); + } + + $response->removeTrailers(); + $response->setBody($outputPipe->getSource()); + } + public function getApplicationLayerProtocols(): array { return ["h3"]; // that's a property of the server itself...? "h3" is the default mandated by RFC 9114, but section 3.1 allows for custom mechanisms too, technically. @@ -149,19 +226,20 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti \assert(!isset($this->client), "The driver has already been setup"); $this->client = $client; - $this->writer = new Http3Writer($connection, [[Http3Settings::MAX_FIELD_SECTION_SIZE, $this->headerSizeLimit]]); + $this->writer = new Http3Writer($connection, $this->settings); $largestPushId = (1 << 62) - 1; $maxAllowedPushId = 0; $connection->onClose($this->closeCancellation->cancel(...)); - $parser = new Http3Parser($connection, $this->headerSizeLimit, $this->qpack); + $this->parser = $parser = new Http3Parser($connection, $this->headerSizeLimit, $this->qpack); try { foreach ($parser->process() as $frame) { $type = $frame[0]; switch ($type) { case Http3Frame::SETTINGS: - // something to do? + [, $settings] = $frame; + $this->parsedSettings->complete($settings); break; case Http3Frame::HEADERS: @@ -177,17 +255,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti $streamId = $stream->getId(); [$headers, $pseudo] = $generator->current(); - foreach ($pseudo as $name => $value) { - if (!isset(Http2Parser::KNOWN_REQUEST_PSEUDO_HEADERS[$name])) { - throw new Http3StreamException( - "Invalid pseudo header", - $stream, - Http3Error::H3_MESSAGE_ERROR - ); - } - } - - if (!isset($pseudo[":method"], $pseudo[":path"], $pseudo[":scheme"], $pseudo[":authority"]) + if (!isset($pseudo[":method"], $pseudo[":authority"]) || isset($headers["connection"]) || $pseudo[":path"] === '' || (isset($headers["te"]) && \implode($headers["te"]) !== "trailers") @@ -199,6 +267,41 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti ); } + // Per RFC 9220 Section 3 & RFC 8441 Section 4, Extended CONNECT (recognized by the existence of :protocol) must include :path and :scheme, + // but normal CONNECT must not according to RFC 9114 Section 4.4. + if ($pseudo[":method"] === "CONNECT" && !isset($pseudo[":protocol"]) && !isset($pseudo[":path"]) && !isset($pseudo[":scheme"])) { + $pseudo[":path"] = ""; + $pseudo[":scheme"] = null; + } elseif (!isset($pseudo[":path"], $pseudo[":scheme"])) { + throw new Http3StreamException( + "Invalid header values", + $stream, + Http3Error::H3_MESSAGE_ERROR + ); + } + + foreach ($pseudo as $name => $value) { + if (!isset(Http2Parser::KNOWN_REQUEST_PSEUDO_HEADERS[$name])) { + if ($name === ":protocol") { + if ($pseudo[":method"] !== "CONNECT") { + throw new Http3StreamException( + "The :protocol pseudo header is only allowed for CONNECT methods", + $stream, + Http3Error::H3_MESSAGE_ERROR + ); + } + // Stuff it into the headers for applications to read + $headers[":protocol"] = [$value]; + } else { + throw new Http3StreamException( + "Invalid pseudo header", + $stream, + Http3Error::H3_MESSAGE_ERROR + ); + } + } + } + [':method' => $method, ':path' => $target, ':scheme' => $scheme, ':authority' => $host] = $pseudo; $query = null; @@ -455,6 +558,11 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { break; default: + if (isset($this->streamHandlers[$type])) { + [, $buf, $stream] = $frame; + $this->streamHandlers[$type]($buf, $stream); + break; + } $parser->abort(new Http3ConnectionException("An unexpected stream or frame was received", Http3Error::H3_FRAME_UNEXPECTED)); } } diff --git a/src/Driver/HttpDriverMiddleware.php b/src/Driver/HttpDriverMiddleware.php new file mode 100644 index 00000000..7467160a --- /dev/null +++ b/src/Driver/HttpDriverMiddleware.php @@ -0,0 +1,21 @@ + + */ + public function getApplicationLayerProtocols(HttpDriverMiddleware $next): array; +} diff --git a/src/Driver/Internal/ConnectionHttpDriver.php b/src/Driver/Internal/ConnectionHttpDriver.php index 74c5d8c2..fa7415ed 100644 --- a/src/Driver/Internal/ConnectionHttpDriver.php +++ b/src/Driver/Internal/ConnectionHttpDriver.php @@ -152,6 +152,6 @@ public function handleClient( ReadableStream $readableStream, WritableStream $writableStream, ): void { - throw new \Error(get_class($this) . " cannot handle Clients in a stream independent way. Use handleConnection() instead."); + throw new \Error(static::class . " cannot handle Clients in a stream independent way. Use handleConnection() instead."); } } diff --git a/src/Driver/Internal/Http/CapsuleReader.php b/src/Driver/Internal/Http/CapsuleReader.php new file mode 100644 index 00000000..d212c5c6 --- /dev/null +++ b/src/Driver/Internal/Http/CapsuleReader.php @@ -0,0 +1,12 @@ +reader->read()) { + [$type, $len, $content] = $data; + if ($type === self::TYPE) { + if ($len > 65535) { + // Too big, we don't want that + return null; + } + + $buf = \implode(\iterator_to_array($content)); + if (\strlen($buf) < $len) { + return null; + } + + if ($this->datagramSuspension) { + $this->datagramSuspension->resume($buf); + $this->datagramSuspension = null; + $this->cancellation?->unsubscribe($this->cancellationId); + $this->cancellation = null; + } else { + $this->nextDatagram = $buf; + + // We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames + $suspension = EventLoop::getSuspension(); + EventLoop::queue($suspension->resume(...)); + $suspension->suspend(); + } + continue; + } + return $data; + } + return null; + } + + public function send(string $data, ?Cancellation $cancellation = null): void + { + $this->writer->write(self::TYPE, $data); + } + + public function receive(?Cancellation $cancellation = null): ?string + { + if ($this->nextDatagram !== null) { + $data = $this->nextDatagram; + $this->nextDatagram = null; + return $data; + } + + if ($this->datagramSuspension) { + throw new PendingReceiveError; + } + + $this->datagramSuspension = EventLoop::getSuspension(); + $this->cancellation = $cancellation; + $this->cancellationId = $cancellation?->subscribe(function ($e) { + $this->datagramSuspension->throw($e); + $this->datagramSuspension = null; + }); + return $this->datagramSuspension->suspend(); + } + + public function maxDatagramSize(): int + { + return 1192; // 8 bytes overhead plus ... what MTU to assume - QUIC also has overhead itself? + } +} diff --git a/src/Driver/Internal/Http/Rfc9297Reader.php b/src/Driver/Internal/Http/Rfc9297Reader.php new file mode 100644 index 00000000..c611ae99 --- /dev/null +++ b/src/Driver/Internal/Http/Rfc9297Reader.php @@ -0,0 +1,48 @@ +activeReader) { + throw new PendingReadError; + } + $this->activeReader = true; + + $off = 0; + $type = Http3Parser::decodeVarintFromStream($this->stream, $this->buf, $off); + $length = Http3Parser::decodeVarintFromStream($this->stream, $this->buf, $off); + if ($length === -1) { + return null; + } + $this->buf = \substr($this->buf, $off); + + $reader = function () use ($length) { + while (\strlen($this->buf) < $length) { + yield $this->buf; + $length -= \strlen($this->buf); + + if (null === ($this->buf = $this->read())) { + return; + } + } + yield \substr($this->buf, 0, $length); + $this->buf = \substr($this->buf, $length); + $this->activeReader = false; + }; + return [$type, $length, $reader]; + } +} diff --git a/src/Driver/Internal/Http/Rfc9297Writer.php b/src/Driver/Internal/Http/Rfc9297Writer.php new file mode 100644 index 00000000..c0c684e6 --- /dev/null +++ b/src/Driver/Internal/Http/Rfc9297Writer.php @@ -0,0 +1,18 @@ +writer->write(Http3Writer::encodeVarint($type) . Http3Writer::encodeVarint(\strlen($buf)) . $buf); + } +} diff --git a/src/Driver/Internal/Http3/Http3DatagramStream.php b/src/Driver/Internal/Http3/Http3DatagramStream.php new file mode 100644 index 00000000..0d334108 --- /dev/null +++ b/src/Driver/Internal/Http3/Http3DatagramStream.php @@ -0,0 +1,29 @@ +writer)($this->stream, $data, $cancellation); + } + + public function receive(?Cancellation $cancellation = null): ?string + { + return ($this->reader)($this->stream, $cancellation); + } + + public function maxDatagramSize(): int + { + return ($this->maxSize)(); + } +} diff --git a/src/Driver/Internal/Http3/Http3Parser.php b/src/Driver/Internal/Http3/Http3Parser.php index 5d72e22f..ac0ebb2b 100644 --- a/src/Driver/Internal/Http3/Http3Parser.php +++ b/src/Driver/Internal/Http3/Http3Parser.php @@ -2,13 +2,17 @@ namespace Amp\Http\Server\Driver\Internal\Http3; +use Amp\ByteStream\ReadableStream; +use Amp\Cancellation; +use Amp\DeferredCancellation; +use Amp\Http\StructuredFields\Boolean; +use Amp\Http\StructuredFields\Number; use Amp\Http\StructuredFields\Rfc8941; -use Amp\Http\StructuredFields\Rfc8941\Boolean; -use Amp\Http\StructuredFields\Rfc8941\Number; use Amp\Pipeline\ConcurrentIterator; use Amp\Pipeline\Queue; use Amp\Quic\QuicConnection; use Amp\Quic\QuicSocket; +use Amp\Socket\PendingReceiveError; use Revolt\EventLoop; class Http3Parser @@ -16,8 +20,13 @@ class Http3Parser private ?QuicSocket $qpackDecodeStream = null; private ?QuicSocket $qpackEncodeStream = null; private Queue $queue; + /** @var array */ + private array $datagramReceivers = []; + private DeferredCancellation $datagramReceiveEmpty; + /** @var array */ + private array $datagramCloseHandlerInstalled = []; - private static function decodeVarint(string $string, int &$off): int + public static function decodeVarint(string $string, int &$off): int { if (!isset($string[$off])) { return -1; @@ -49,7 +58,7 @@ private static function decodeVarint(string $string, int &$off): int } } - private static function decodeVarintFromStream(QuicSocket $stream, string &$buf, int &$off): int + public static function decodeVarintFromStream(ReadableStream $stream, string &$buf, int &$off): int { while (-1 === $int = self::decodeVarint($buf, $off)) { if (null === $chunk = $stream->read()) { @@ -130,10 +139,10 @@ public static function readFrameWithoutType(QuicSocket $stream, string &$buf, in private function parseSettings(string $contents) { $off = 0; - $settings = []; + $settings = new \SplObjectStorage; while ((-1 !== $key = self::decodeVarint($contents, $off)) && (-1 !== $value = self::decodeVarint($contents, $off))) { if ($key = Http3Settings::tryFrom($key)) { - $settings[] = [$key, $value]; + $settings[$key] = $value; } } $this->queue->push([Http3Frame::SETTINGS, $settings]); @@ -278,8 +287,15 @@ public function process(): ConcurrentIterator try { $off = 0; $buf = $stream->read(); + $type = self::decodeVarintFromStream($stream, $buf, $off); if ($stream->isWritable()) { // client-initiated bidirectional stream + if ($type > 0x0d /* bigger than any default frame */ && $type % 0x1f !== 0x2 /* and not a padding frame */) { + // Unknown frame type. Users may handle it (e.g. WebTransport). + $this->queue->push([$type, \substr($buf, $off), $stream]); + return; + } + $off = 0; $messageGenerator = $this->readHttpMessage($stream, $buf, $off); if (!$messageGenerator->valid()) { return; // Nothing happens. That's allowed. Just bye then. @@ -290,7 +306,6 @@ public function process(): ConcurrentIterator $this->queue->push([Http3Frame::HEADERS, $stream, $messageGenerator]); } else { // unidirectional stream - $type = self::decodeVarintFromStream($stream, $buf, $off); switch (Http3StreamType::tryFrom($type)) { case Http3StreamType::Control: if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x1000)) { @@ -357,7 +372,8 @@ public function process(): ConcurrentIterator break; default: - // Stream was probably reset or unknown type. Just don't care. + // Unknown stream type. Users may handle it (e.g. WebTransport). + $this->queue->push([$type, \substr($buf, $off), $stream]); return; } } @@ -406,4 +422,73 @@ public function abort(Http3ConnectionException $exception) $this->queue->error($exception); } } + + private function datagramReceiver() + { + $this->datagramReceiveEmpty = new DeferredCancellation; + $cancellation = $this->datagramReceiveEmpty->getCancellation(); + EventLoop::queue(function () use ($cancellation) { + while (null !== $buf = $this->connection->receive($cancellation)) { + $off = 0; + $quarterStreamId = self::decodeVarint($buf, $off); + if (isset($this->datagramReceivers[$quarterStreamId])) { + $this->datagramReceivers[$quarterStreamId]->resume(\substr($buf, $off)); + unset($this->datagramReceivers[$quarterStreamId]); + + if (!$this->datagramReceivers) { + return; + } + + // We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames + $suspension = EventLoop::getSuspension(); + EventLoop::queue($suspension->resume(...)); + $suspension->suspend(); + } + } + }); + } + + public function receiveDatagram(QuicSocket $stream, ?Cancellation $cancellation = null): ?string + { + $quarterStreamId = $stream->getId() >> 2; + if (isset($this->datagramReceivers[$quarterStreamId])) { + throw new PendingReceiveError; + } + + if (!$stream->isReadable()) { + return null; + } + + if (!isset($this->datagramCloseHandlerInstalled[$quarterStreamId])) { + $this->datagramCloseHandlerInstalled[$quarterStreamId] = true; + $stream->onClose(function () use ($quarterStreamId) { + $this->datagramReceivers[$quarterStreamId]->resume(); + unset($this->datagramReceivers[$quarterStreamId], $this->datagramCloseHandlerInstalled[$quarterStreamId]); + if (!$this->datagramReceivers) { + $this->datagramReceiveEmpty->cancel(); + } + }); + } + + if (!$this->datagramReceivers) { + $this->datagramReceiver(); + } + + $suspension = EventLoop::getSuspension(); + $this->datagramReceivers[$quarterStreamId] = $suspension; + + $cancellationId = $cancellation?->subscribe(function ($e) use ($suspension, $quarterStreamId) { + unset($this->datagramReceivers[$quarterStreamId]); + if (!$this->datagramReceivers) { + $this->datagramReceiveEmpty->cancel($e); + } + $suspension->throw($e); + }); + + try { + return $suspension->suspend(); + } finally { + $cancellation?->unsubscribe($cancellationId); + } + } } diff --git a/src/Driver/Internal/Http3/Http3Settings.php b/src/Driver/Internal/Http3/Http3Settings.php index 7ef1d8a5..dd383308 100644 --- a/src/Driver/Internal/Http3/Http3Settings.php +++ b/src/Driver/Internal/Http3/Http3Settings.php @@ -1,8 +1,8 @@ -startControlStream(); } - private static function encodeVarint(int $int): string + public static function encodeVarint(int $int): string { if ($int <= 0x3F) { return \chr($int); @@ -54,13 +54,23 @@ private function startControlStream() $this->controlStream->endReceiving(); // unidirectional please $ints = []; - foreach ($this->settings as [$setting, $value]) { - $ints[] = self::encodeVarint($setting->value); + foreach ($this->settings as $setting => $value) { + $ints[] = self::encodeVarint($setting); $ints[] = self::encodeVarint($value); } self::sendFrame($this->controlStream, Http3Frame::SETTINGS, \implode($ints)); } + public function maxDatagramSize() + { + return $this->connection->maxDatagramSize() - 8; // to include the longest quarter stream id varint + } + + public function writeDatagram(QuicSocket $stream, string $buf) + { + $this->connection->send(self::encodeVarint($stream->getId() >> 2) . $buf); + } + public function close() { $this->connection->close(Http3Error::H3_NO_ERROR->value); diff --git a/src/Driver/Internal/StreamHttpDriver.php b/src/Driver/Internal/StreamHttpDriver.php index 60dc199d..eec49c77 100644 --- a/src/Driver/Internal/StreamHttpDriver.php +++ b/src/Driver/Internal/StreamHttpDriver.php @@ -1,4 +1,4 @@ -body->close(); + } + + public function isClosed(): bool + { + return $this->body->isClosed(); + } + + public function onClose(\Closure $onClose): void + { + $this->body->onClose($onClose); + } + + public function read(?Cancellation $cancellation = null): ?string + { + $this->body->increaseSizeLimit($this->dataRead + 1); + $read = $this->body->read($cancellation); + if ($read === null) { + return null; + } + $this->dataRead += \strlen($read); + $this->body->increaseSizeLimit($this->dataRead); + return $read; + } + + public function isReadable(): bool + { + return $this->body->isReadable(); + } +} diff --git a/src/Driver/UpgradedSocket.php b/src/Driver/UpgradedSocket.php index 1173686c..90abc4a9 100644 --- a/src/Driver/UpgradedSocket.php +++ b/src/Driver/UpgradedSocket.php @@ -7,6 +7,7 @@ use Amp\ByteStream\ResourceStream; use Amp\ByteStream\WritableStream; use Amp\Cancellation; +use Amp\Quic\DatagramStream; use Amp\Socket\Socket; use Amp\Socket\SocketAddress; use Amp\Socket\TlsException; @@ -26,6 +27,7 @@ public function __construct( private readonly Client $client, private readonly ReadableStream $readableStream, private readonly WritableStream $writableStream, + public readonly ?DatagramStream $datagramClient = null, ) { } diff --git a/src/Response.php b/src/Response.php index 39d68cc5..d6585a61 100644 --- a/src/Response.php +++ b/src/Response.php @@ -187,10 +187,6 @@ public function removeHeader(string $name): void public function setStatus(int $status, string $reason = null): void { parent::setStatus($this->validateStatusCode($status), $reason); - - if ($this->upgrade && $status !== HttpStatus::SWITCHING_PROTOCOLS) { - $this->upgrade = null; - } } /** @@ -331,9 +327,8 @@ public function isUpgraded(): bool } /** - * Sets a callback to be invoked once the response has been written to the client and changes the status of the - * response to 101 (Switching Protocols) and removes any trailers. The callback may be removed by changing the - * response status to any value other than 101. + * Sets a callback to be invoked once the response has been written to the client. + * The HttpDriver MUST ignore this callback if an incompatible status code was set. * * @param \Closure(Driver\UpgradedSocket, Request, Response):void $upgrade Callback invoked once the response has * been written to the client. The callback is given three parameters: an instance of {@see Driver\UpgradedSocket}, @@ -342,7 +337,6 @@ public function isUpgraded(): bool public function upgrade(\Closure $upgrade): void { $this->upgrade = $upgrade; - $this->setStatus(HttpStatus::SWITCHING_PROTOCOLS); $this->removeTrailers(); } diff --git a/src/SocketHttpServer.php b/src/SocketHttpServer.php index 1ffb9d25..141f1f14 100644 --- a/src/SocketHttpServer.php +++ b/src/SocketHttpServer.php @@ -10,6 +10,7 @@ use Amp\Http\Server\Driver\DefaultHttpDriverFactory; use Amp\Http\Server\Driver\HttpDriver; use Amp\Http\Server\Driver\HttpDriverFactory; +use Amp\Http\Server\Driver\HttpDriverMiddleware; use Amp\Http\Server\Driver\SocketClientFactory; use Amp\Http\Server\Middleware\AllowedMethodsMiddleware; use Amp\Http\Server\Middleware\CompressionMiddleware; @@ -295,7 +296,7 @@ public function start(RequestHandler $requestHandler, ErrorHandler $errorHandler foreach ($this->addresses as [$address, $bindContext]) { if ($bindContext instanceof QuicServerConfig) { $quicConnections[$bindContext] ??= []; - $quicConnections[$bindContext] = array_merge($quicConnections[$bindContext], [$address]); + $quicConnections[$bindContext] = \array_merge($quicConnections[$bindContext], [$address]); } else { $tlsContext = $bindContext?->getTlsContext()?->withApplicationLayerProtocols( $this->httpDriverFactory->getApplicationLayerProtocols(), @@ -383,11 +384,20 @@ private function handleClient( return; } - $this->drivers[$id] = $driver = $this->httpDriverFactory->createHttpDriver( - $requestHandler, - $errorHandler, - $client, - ); + if ($requestHandler instanceof HttpDriverMiddleware) { + $this->drivers[$id] = $driver = $requestHandler->createHttpDriver( + $this->httpDriverFactory, + $requestHandler, + $errorHandler, + $client, + ); + } else { + $this->drivers[$id] = $driver = $this->httpDriverFactory->createHttpDriver( + $requestHandler, + $errorHandler, + $client, + ); + } try { $driver->handleConnection($client, $socket);