Skip to content

Commit

Permalink
Avoid unnecessary copy in add-scores stage (#438)
Browse files Browse the repository at this point in the history
This requires ensuring that all newly built Tensors using the `create` method have their strides specified in elements not bytes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #438
  • Loading branch information
dagardner-nv authored Nov 7, 2022
1 parent ebc31ff commit 52ffe41
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 49 deletions.
5 changes: 5 additions & 0 deletions morpheus/_lib/src/objects/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "morpheus/objects/rmm_tensor.hpp"
#include "morpheus/objects/tensor_object.hpp"
#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride
#include "morpheus/utilities/type_util.hpp"

#include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpyDeviceToHost
Expand Down Expand Up @@ -79,6 +80,10 @@ TensorObject Tensor::create(std::shared_ptr<rmm::device_buffer> buffer,
{
auto md = nullptr;

if (!strides.empty())
{
strides = TensorUtils::get_element_stride<TensorIndex>(strides);
}
auto tensor = std::make_shared<RMMTensor>(buffer, offset, dtype, shape, strides);

return TensorObject(md, tensor);
Expand Down
13 changes: 3 additions & 10 deletions morpheus/_lib/src/stages/add_classification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "morpheus/objects/tensor.hpp"
#include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject
#include "morpheus/utilities/matx_util.hpp"
#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride
#include "morpheus/utilities/type_util.hpp" // for DType
#include "morpheus/utilities/type_util_detail.hpp" // for DataType

Expand Down Expand Up @@ -77,16 +78,8 @@ AddClassificationsStage::subscribe_fn_t AddClassificationsStage::build_operator(
SRF_CHECK_CUDA(
cudaMemcpy(tmp_buffer->data(), probs.data(), tmp_buffer->size(), cudaMemcpyDeviceToDevice));

// Depending on the input the stride is given in bytes or elements,
// divide the stride elements by the smallest item to ensure tensor_stride is defined in
// terms of elements
std::vector<TensorIndex> tensor_stride(stride.size());
auto min_stride = std::min_element(stride.cbegin(), stride.cend());

std::transform(stride.cbegin(),
stride.cend(),
tensor_stride.begin(),
std::bind(std::divides<>(), std::placeholders::_1, *min_stride));
// Depending on the input the stride is given in bytes or elements, convert to elements
auto tensor_stride = TensorUtils::get_element_stride<TensorIndex, std::size_t>(stride);

// Now call the threshold function
auto thresh_bool_buffer =
Expand Down
46 changes: 7 additions & 39 deletions morpheus/_lib/src/stages/add_scores.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,13 @@
#include "morpheus/objects/tensor.hpp"
#include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject

#include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpyDeviceToDevice
#include <glog/logging.h>
#include <rmm/cuda_stream_view.hpp> // for cuda_stream_per_thread
#include <rmm/device_buffer.hpp> // for device_buffer
#include <srf/channel/status.hpp> // for Status
#include <srf/cuda/common.hpp> // for SRF_CHECK_CUDA
#include <srf/node/sink_properties.hpp> // for SinkProperties<>::sink_type_t
#include <srf/node/source_properties.hpp> // for SourceProperties<>::source_type_t
#include <srf/segment/object.hpp> // for Object

#include <algorithm> // for min_element, transform
#include <cstddef> // for size_t
#include <cstddef> // for size_t
#include <exception>
#include <functional> // for divides, placeholders
#include <map>
#include <memory>
#include <ostream> // for logging
Expand All @@ -59,9 +52,8 @@ AddScoresStage::subscribe_fn_t AddScoresStage::build_operator()
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t x) {
const auto& probs = x->get_probs();
const auto& shape = probs.get_shape();
const auto& stride = probs.get_stride();
const auto& probs = x->get_probs();
const auto& shape = probs.get_shape();

CHECK(shape.size() == 2 && shape[1] == m_num_class_labels)
<< "Label count does not match output of model. Label count: " << m_num_class_labels
Expand All @@ -70,38 +62,16 @@ AddScoresStage::subscribe_fn_t AddScoresStage::build_operator()
const std::size_t num_rows = shape[0];
const std::size_t num_columns = shape[1];

auto tmp_buffer = std::make_shared<rmm::device_buffer>(probs.bytes(), rmm::cuda_stream_per_thread);

SRF_CHECK_CUDA(
cudaMemcpy(tmp_buffer->data(), probs.data(), tmp_buffer->size(), cudaMemcpyDeviceToDevice));

// Depending on the input the stride is given in bytes or elements,
// divide the stride elements by the smallest item to ensure tensor_stride is defined in
// terms of elements
std::vector<TensorIndex> tensor_stride(stride.size());
auto min_stride = std::min_element(stride.cbegin(), stride.cend());

std::transform(stride.cbegin(),
stride.cend(),
tensor_stride.begin(),
std::bind(std::divides<>(), std::placeholders::_1, *min_stride));

auto tensor_obj = Tensor::create(
tmp_buffer,
probs.dtype(),
std::vector<TensorIndex>{static_cast<long long>(shape[0]), static_cast<long long>(shape[1])},
tensor_stride);

std::vector<std::string> columns(m_idx2label.size());
std::vector<TensorObject> tensors(m_idx2label.size());

std::size_t i = 0;
for (const auto& [column_num, column_name] : m_idx2label)
{
columns[i] = column_name;
tensors[i] = tensor_obj.slice(std::vector<TensorIndex>{0, static_cast<TensorIndex>(column_num)},
std::vector<TensorIndex>{static_cast<TensorIndex>(num_rows),
static_cast<TensorIndex>(column_num + 1)});
tensors[i] = probs.slice(std::vector<TensorIndex>{0, static_cast<TensorIndex>(column_num)},
std::vector<TensorIndex>{static_cast<TensorIndex>(num_rows),
static_cast<TensorIndex>(column_num + 1)});

++i;
}
Expand All @@ -122,8 +92,6 @@ std::shared_ptr<srf::segment::Object<AddScoresStage>> AddScoresStageInterfacePro
std::size_t num_class_labels,
std::map<std::size_t, std::string> idx2label)
{
auto stage = builder.construct_object<AddScoresStage>(name, num_class_labels, std::move(idx2label));

return stage;
return builder.construct_object<AddScoresStage>(name, num_class_labels, std::move(idx2label));
}
} // namespace morpheus

0 comments on commit 52ffe41

Please sign in to comment.