Skip to content

Commit

Permalink
feat: Add GRPC error codes to GRPC streaming if enabled by user. (#7499)
Browse files Browse the repository at this point in the history
  • Loading branch information
indrajit96 authored and mc-nv committed Aug 21, 2024
1 parent e5718ac commit 329f8c8
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 21 deletions.
6 changes: 6 additions & 0 deletions Dockerfile.QA
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ RUN cp -r qa/L0_decoupled/models qa/L0_decoupled/python_models/ && \
cp /workspace/tritonbuild/python/examples/decoupled/square_config.pbtxt \
qa/L0_decoupled/python_models/square_int32/.

RUN mkdir -p qa/L0_decoupled_grpc_error && \
cp -r qa/L0_decoupled/. qa/L0_decoupled_grpc_error

RUN mkdir -p qa/L0_grpc_error_state_cleanup && \
cp -r qa/L0_grpc_state_cleanup/. qa/L0_grpc_error_state_cleanup

RUN mkdir -p qa/L0_repoagent_checksum/models/identity_int32/1 && \
cp tritonbuild/identity/install/backends/identity/libtriton_identity.so \
qa/L0_repoagent_checksum/models/identity_int32/1/.
Expand Down
10 changes: 10 additions & 0 deletions docs/customization_guide/inference_protocols.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ These options can be used to configure the KeepAlive settings:

For client-side documentation, see [Client-Side GRPC KeepAlive](https://github.com/triton-inference-server/client/blob/main/README.md#grpc-keepalive).

#### GRPC Status Codes

Triton implements GRPC error handling for streaming requests when a specific flag is enabled through headers. Upon encountering an error, Triton returns the appropriate GRPC error code and subsequently closes the stream.

* `triton_grpc_error` : The header value needs to be set to true while starting the stream.

GRPC status codes can be used for better visibility and monitoring. For more details, see [gRPC Status Codes](https://grpc.io/docs/guides/status-codes/)

For client-side documentation, see [Client-Side GRPC Status Codes](https://github.com/triton-inference-server/client/tree/main#GRPC-Status-Codes)

### Limit Endpoint Access (BETA)

Triton users may want to restrict access to protocols or APIs that are
Expand Down
130 changes: 130 additions & 0 deletions qa/L0_backend_python/lifecycle/lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
sys.path.append("../../common")

import queue
import threading
import time
import unittest
from functools import partial
Expand Down Expand Up @@ -241,6 +242,135 @@ def test_infer_pymodel_error(self):
initial_metrics_value,
)

# Test grpc stream behavior when triton_grpc_error is set to true.
# Expected to close stream and return GRPC error when model returns error.
def test_triton_grpc_error_error_on(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 2
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
stream_end = False
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
try:
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
if type(result) == InferenceServerException:
# execute_grpc_error intentionally returns error with StatusCode.INTERNAL status on 2nd request
self.assertEqual(str(result.status()), "StatusCode.INTERNAL")
stream_end = True
else:
# Stream is not killed
output_data = result.as_numpy("OUT")
self.assertIsNotNone(output_data, "error: expected 'OUT'")
except Exception as e:
if stream_end == True:
# We expect the stream to have closed
self.assertTrue(
True,
"This should always pass as cancellation should succeed",
)
else:
self.assertFalse(
True, "Unexpected Stream killed without Error from CORE"
)

# Test grpc stream behavior when triton_grpc_error is set to true in multiple open streams.
# Expected to close stream and return GRPC error when model returns error.
def test_triton_grpc_error_multithreaded(self):
thread1 = threading.Thread(target=self.test_triton_grpc_error_error_on)
thread2 = threading.Thread(target=self.test_triton_grpc_error_error_on)
# Start the threads
thread1.start()
thread2.start()
# Wait for both threads to finish
thread1.join()
thread2.join()

# Test grpc stream behavior when triton_grpc_error is set to true and subsequent stream is cancelled.
# Expected cancellation is successful.
def test_triton_grpc_error_cancel(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 1
user_data = UserData()
triton_server_url = "localhost:8001" # Replace with your Triton server address
stream_end = False
triton_client = grpcclient.InferenceServerClient(triton_server_url)

metadata = {"triton_grpc_error": "true"}

triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)

for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
try:
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
if type(result) == InferenceServerException:
stream_end = True
if i == 0:
triton_client.stop_stream(cancel_requests=True)
except Exception as e:
if stream_end == True:
# We expect the stream to have closed
self.assertTrue(
True,
"This should always pass as cancellation should succeed",
)
else:
self.assertFalse(
True, "Unexpected Stream killed without Error from CORE"
)
self.assertTrue(
True,
"This should always pass as cancellation should succeed without any exception",
)

# Test grpc stream behavior when triton_grpc_error is set to false
# and subsequent stream is NOT closed when error is reported from CORE
def test_triton_grpc_error_error_off(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 4
response_counter = 0
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
triton_client.start_stream(callback=partial(callback, user_data))
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
_ = user_data._completed_requests.get()
response_counter += 1
# we expect response_counter == number_of_requests,
# which indicates that after the first reported grpc error stream did NOT close and mode != triton_grpc_error
self.assertEqual(response_counter, number_of_requests)


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions qa/L0_backend_python/lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 12000000 }" >> config.pbtxt)

mkdir -p models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/model.py ./models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/config.pbtxt ./models/execute_grpc_error/
(cd models/execute_grpc_error && \
sed -i "s/^name:.*/name: \"execute_grpc_error\"/" config.pbtxt && \
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 1200000 }" >> config.pbtxt)

mkdir -p models/execute_return_error/1/
cp ../../python_models/execute_return_error/model.py ./models/execute_return_error/1/
cp ../../python_models/execute_return_error/config.pbtxt ./models/execute_return_error/
Expand Down
16 changes: 14 additions & 2 deletions qa/L0_decoupled/decoupled_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ def _stream_infer_with_params(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(callback=partial(callback, user_data))
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
else:
triton_client.start_stream(callback=partial(callback, user_data))
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -175,7 +181,13 @@ def _stream_infer(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(callback=partial(callback, user_data))
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
else:
triton_client.start_stream(callback=partial(callback, user_data))
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down
2 changes: 1 addition & 1 deletion qa/L0_decoupled/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ else
echo -e "\n***\n*** Test Failed\n***"
fi

exit $RET
exit $RET
42 changes: 33 additions & 9 deletions qa/L0_grpc_state_cleanup/cleanup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,17 @@ def _stream_infer_with_params(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data),
stream_timeout=stream_timeout,
headers=metadata,
)
else:
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -229,9 +237,17 @@ def _stream_infer(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data),
stream_timeout=stream_timeout,
headers=metadata,
)
else:
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -608,9 +624,17 @@ def test_non_decoupled_streaming_multi_response(self):
url="localhost:8001", verbose=True
) as client:
# Establish stream
client.start_stream(
callback=partial(callback, user_data), stream_timeout=16
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
client.start_stream(
callback=partial(callback, user_data),
stream_timeout=16,
headers=metadata,
)
else:
client.start_stream(
callback=partial(callback, user_data), stream_timeout=16
)
# Send a request
client.async_stream_infer(
model_name=self.repeat_non_decoupled_model_name,
Expand Down
51 changes: 51 additions & 0 deletions qa/python_models/execute_grpc_error/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
max_batch_size: 64

input [
{
name: "IN"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

output [
{
name: "OUT"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

instance_group [
{
count: 1
kind : KIND_CPU
}
]
52 changes: 52 additions & 0 deletions qa/python_models/execute_grpc_error/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
def __init__(self):
# Maintain total inference count, so as to return error on 2nd request, all of this to simulate model failure
self.inf_count = 1

def execute(self, requests):
"""This function is called on inference request."""
responses = []

# Generate the error for the second request
for request in requests:
input_tensor = pb_utils.get_input_tensor_by_name(request, "IN")
out_tensor = pb_utils.Tensor("OUT", input_tensor.as_numpy())
if self.inf_count % 2:
# Every odd request is success
responses.append(pb_utils.InferenceResponse([out_tensor]))
else:
# Every even request is failure
error = pb_utils.TritonError("An error occurred during execution")
responses.append(pb_utils.InferenceResponse([out_tensor], error))
self.inf_count += 1

return responses
Loading

0 comments on commit 329f8c8

Please sign in to comment.