Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate gRPC status code when not ok #519

Merged
merged 11 commits into from
Oct 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,9 @@ protected void onKafkaData(
if (grpcStatus != null &&
!HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value()))
{
OctetsFW value = grpcStatus.value();
String16FW status = statusRW
.set(grpcStatus.value().buffer(), grpcStatus.offset(), grpcStatus.sizeof())
.set(value.buffer(), value.offset(), value.sizeof())
.build();
doGrpcAbort(traceId, authorization, status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public void shouldRejectUnaryRpc() throws Exception
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
"${grpc}/unary.rpc.error/client",
"${kafka}/unary.rpc.error/server"})
public void shouldRejectUnaryRpcWithError() throws Exception
{
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class GrpcClientFactory implements GrpcStreamFactory
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcMessageFW.Builder grpcMessageRW = new GrpcMessageFW.Builder();

private final GrpcAbortExFW grpcAbortedStatusRO;

private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer metadataBuffer;
private final MutableDirectBuffer extBuffer;
Expand All @@ -139,6 +141,11 @@ public GrpcClientFactory(
this.grpcTypeId = context.supplyTypeId(GrpcBinding.NAME);
this.bindings = new Long2ObjectHashMap<>();
this.helper = new HttpGrpcResponseHeaderHelper();

this.grpcAbortedStatusRO = grpcAbortExRW.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();
}

@Override
Expand Down Expand Up @@ -234,6 +241,7 @@ private final class GrpcClient
private int replyPad;

private int state;
private String grpcStatus;

private GrpcClient(
MessageConsumer application,
Expand Down Expand Up @@ -403,7 +411,14 @@ private void onAppWindow(
replyPad = padding;
state = GrpcState.openReply(state);

delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
if (GrpcState.replyClosing(state))
{
doAppAbortDeferred(traceId, authorization);
}
else
{
delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
}

assert replyAck <= replySeq;

Expand Down Expand Up @@ -450,21 +465,49 @@ private void doAppData(
assert replySeq <= replyAck + replyMax;
}

private void doAppAbort(
private void doAppAbortDeferring(
long traceId,
long authorization,
String16FW grpcStatus)
{
this.grpcStatus = grpcStatus != null ? grpcStatus.asString() : null;
this.state = GrpcState.closingReply(state);

if (GrpcState.replyOpened(state))
jfallows marked this conversation as resolved.
Show resolved Hide resolved
{
doAppAbortDeferred(traceId, authorization);
}
}

private void doAppAbortDeferred(
long traceId,
long authorization)
{
if (!GrpcState.replyClosed(state))
GrpcAbortExFW abortEx = grpcStatus != null
? grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(grpcStatus)
.build()
: grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_INTERNAL_ERROR)
.build();
jfallows marked this conversation as resolved.
Show resolved Hide resolved

doAppAbort(traceId, authorization, abortEx);
}

private void doAppAbort(
long traceId,
long authorization,
Flyweight extension)
{
if (GrpcState.replyOpening(state) &&
!GrpcState.replyClosed(state))
{
state = GrpcState.closeReply(state);

GrpcAbortExFW abortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, abortEx);
traceId, authorization, extension);
}
}

Expand Down Expand Up @@ -500,15 +543,19 @@ private void doAppWindow(

private void doAppReset(
long traceId,
long authorization,
Flyweight extension)
long authorization)
{
if (!GrpcState.initialClosed(state))
{
state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, extension);
traceId, authorization, resetEx);
}
}
}
Expand Down Expand Up @@ -725,21 +772,13 @@ private void onNetBegin(
replyMax = maximum;
state = GrpcState.openingReply(state);

delegate.doAppBegin(traceId, authorization, affinity);

if (!HTTP_HEADER_VALUE_STATUS_200.equals(status) ||
grpcStatus != null && !HEADER_VALUE_GRPC_OK.equals(grpcStatus))
{
final String16FW newGrpcStatus = grpcStatus == null ? HEADER_VALUE_GRPC_INTERNAL_ERROR : grpcStatus;
GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(newGrpcStatus)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
doNetAbort(traceId, authorization);
}
else
{
delegate.doAppBegin(traceId, authorization, affinity);
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus);
doNetReset(traceId, authorization);
}
}

Expand Down Expand Up @@ -817,13 +856,13 @@ private void onNetEnd(
final Array32FW<HttpHeaderFW> trailers = endEx != null ? endEx.trailers() : TRAILERS_EMPTY;
final HttpHeaderFW grpcStatus = trailers.matchFirst(t -> t.name().equals(HTTP_HEADER_GRPC_STATUS));

if (grpcStatus != null && HEADER_VALUE_GRPC_OK.equals(grpcStatus.value()))
if (grpcStatus == null || HEADER_VALUE_GRPC_OK.equals(grpcStatus.value()))
{
delegate.doAppEnd(traceId, authorization);
}
else
{
delegate.doAppAbort(traceId, authorization);
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus.value());
}


Expand All @@ -846,7 +885,7 @@ private void onNetAbort(

state = GrpcState.closeReply(state);

delegate.doAppAbort(traceId, authorization);
delegate.doAppAbort(traceId, authorization, grpcAbortedStatusRO);
}

private void onNetReset(
Expand All @@ -857,12 +896,7 @@ private void onNetReset(

state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
delegate.doAppReset(traceId, authorization);
}

private void onNetWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,14 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception
}


@Test
@Configuration("client.when.yaml")
@Specification({
"${app}/server.send.write.abort.on.open.response/client",
"${net}/response.with.grpc.error/server"
})
public void shouldAbortResponseWithGrpcError() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,9 @@ private void doKafkaEnd(
if (KafkaGrpcState.initialOpened(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, HEADER_VALUE_GRPC_OK);
Expand All @@ -806,9 +806,9 @@ private void doKafkaAbort(
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, status);
Expand Down Expand Up @@ -1453,7 +1453,6 @@ private void onGrpcAbort(
final String16FW status = abortEx != null ? abortEx.status() : HEADER_VALUE_GRPC_ABORTED;

correlater.doKafkaAbort(traceId, authorization, status);

}

private void onGrpcReset(
Expand All @@ -1464,6 +1463,7 @@ private void onGrpcReset(
final int maximum = reset.maximum();
final long traceId = reset.traceId();
final long authorization = reset.authorization();
final OctetsFW extension = reset.extension();

assert acknowledge <= sequence;
assert sequence <= initialSeq;
Expand All @@ -1472,7 +1472,7 @@ private void onGrpcReset(

initialAck = acknowledge;
initialMax = maximum;
state = KafkaGrpcState.closingInitial(state);
state = KafkaGrpcState.closeInitial(state);

cleanup(traceId, authorization);

Expand Down Expand Up @@ -1501,7 +1501,7 @@ private void onGrpcWindow(
initialBud = budgetId;
initialPad = padding;
initialCap = capabilities;
state = KafkaGrpcState.openReply(state);
state = KafkaGrpcState.openInitial(state);

assert initialAck <= initialSeq;

Expand Down Expand Up @@ -1592,7 +1592,7 @@ private void doGrpcBegin(
OctetsFW service,
OctetsFW method)
{
state = KafkaGrpcState.openingReply(state);
state = KafkaGrpcState.openingInitial(state);

grpc = newGrpcStream(this::onGrpcMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, server.condition.scheme(), server.condition.authority(),
Expand Down Expand Up @@ -1621,7 +1621,8 @@ private void doGrpcAbort(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpened(state) && !KafkaGrpcState.replyClosed(state))
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
final GrpcAbortExFW grpcAbortEx =
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
Expand All @@ -1639,9 +1640,9 @@ private void doGrpcEnd(
long traceId,
long authorization)
{
if (!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.initialClosed(state))
{
state = KafkaGrpcState.closeReply(state);
state = KafkaGrpcState.closeInitial(state);

doEnd(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
Expand All @@ -1666,8 +1667,7 @@ private void doGrpcReset(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpening(state) &&
!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.replyClosed(state))
{
state = KafkaGrpcState.closeReply(state);

Expand All @@ -1682,6 +1682,7 @@ private void doGrpcReset(
}
}
}

private void doBegin(
MessageConsumer receiver,
long originId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.scheme("http")
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoUnary")
.metadata("custom", "test")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected

write ${grpc:protobuf()
.string(1, "Hello World")
.build()}
write flush

write close

read zilla:abort.ext ${grpc:abortEx()
.typeId(zilla:id("grpc"))
.status("9")
.build()}
read aborted
Loading
Loading