Skip to content

Commit

Permalink
Fix unittests (#444)
Browse files Browse the repository at this point in the history
* Avoid unnecessary copies in file source stage
* When we do a copy, perform it before emitting
* Skip benchmarks by default
* Adds a `benchmark` pytest marker and associated `--run_benchmark` flag

fixes #443

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #444
  • Loading branch information
dagardner-nv authored Nov 7, 2022
1 parent d2e069a commit ebc31ff
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 27 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export S3_URL="s3://rapids-downloads/ci/morpheus"
export DISPLAY_URL="https://downloads.rapids.ai/ci/morpheus"
export ARTIFACT_ENDPOINT="/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}"
export ARTIFACT_URL="${S3_URL}${ARTIFACT_ENDPOINT}"
export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}/"
export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/"

# Set sccache env vars
export SCCACHE_S3_KEY_PREFIX=morpheus-${NVARCH}
Expand Down
33 changes: 27 additions & 6 deletions morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,26 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build()
// When index_col_count is 0 this will cause a new range index to be created
auto meta = MessageMeta::create_from_cpp(std::move(data_table), index_col_count);

// Always push at least 1
output.on_next(meta);
// next_meta stores a copy of the upcoming meta
std::shared_ptr<MessageMeta> next_meta = nullptr;

for (cudf::size_type repeat_idx = 1; repeat_idx < m_repeat; ++repeat_idx)
for (cudf::size_type repeat_idx = 0; repeat_idx < m_repeat; ++repeat_idx)
{
// Clone the previous meta object
if (!output.is_subscribed())
{
// Grab the GIL before disposing, just in case
pybind11::gil_scoped_acquire gil;

// Reset meta to allow the DCHECK after the loop to pass
meta.reset();

break;
}

// Clone the meta object before pushing while we still have access to it
if (repeat_idx + 1 < m_repeat)
{
// GIL must come after get_info
pybind11::gil_scoped_acquire gil;

// Use the copy function
Expand All @@ -82,12 +95,20 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build()

df.attr("index") = index + df_len;

meta = MessageMeta::create_from_python(std::move(df));
next_meta = MessageMeta::create_from_python(std::move(df));
}

output.on_next(meta);
DCHECK(meta) << "Cannot push null meta";

output.on_next(std::move(meta));

// Move next_meta into meta
std::swap(meta, next_meta);
}

DCHECK(!meta) << "meta was not properly pushed";
DCHECK(!next_meta) << "next_meta was not properly pushed";

output.on_completed();
};
}
Expand Down
18 changes: 7 additions & 11 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,15 @@ def _generate_frames(self):
df_type="cudf",
)

count = 0

for _ in range(self._repeat_count):
for i in range(self._repeat_count):

x = MessageMeta(df)

yield x

count += 1

# If we are looping, copy and shift the index
if (self._repeat_count > 0):
prev_df = df
df = prev_df.copy()
# If we are looping, copy the object. Do this before we push the object in case it changes
if (i + 1 < self._repeat_count):
df = df.copy()

# Shift the index to allow for unique indices without reading more data
df.index += len(df)

yield x
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ build-backend = "setuptools.build_meta"
# These show up when querying `pytest --markers`
[tool.pytest.ini_options]
markers = [
"benchmark: Benchmarks",
"slow: Slow tests",
"kafka: Tests that require a running instance of kafka",
"use_cpp: Test support C++ nodes and objects",
Expand Down
8 changes: 4 additions & 4 deletions tests/benchmarks/test_bench_e2e_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
def test_sid_nlp_e2e(benchmark, tmp_path):

config = Config()
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_sid_nlp_e2e(benchmark, tmp_path):
benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_abp_fil_e2e(benchmark, tmp_path):

config = Config()
Expand All @@ -185,7 +185,7 @@ def test_abp_fil_e2e(benchmark, tmp_path):
benchmark(fil_pipeline, config, input_filepath, repeat, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_phishing_nlp_e2e(benchmark, tmp_path):

config = Config()
Expand All @@ -207,7 +207,7 @@ def test_phishing_nlp_e2e(benchmark, tmp_path):
benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_cloudtrail_ae_e2e(benchmark, tmp_path):

config = Config()
Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/test_bench_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame):
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
@pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000])
def test_monitor_stage(benchmark, num_messages):

Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/test_bench_serialize_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def build_and_run_pipeline(config: Config,
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
@pytest.mark.parametrize("num_messages", [1, 100, 10000])
@pytest.mark.parametrize("output_type", ["json", "csv"])
def test_monitor_stage(benchmark, num_messages, output_type):
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ def pytest_addoption(parser: pytest.Parser):
help="Run kafka tests that would otherwise be skipped",
)

parser.addoption(
"--run_benchmark",
action="store_true",
dest="run_benchmark",
help="Run benchmark tests that would otherwise be skipped",
)


def pytest_generate_tests(metafunc: pytest.Metafunc):
"""
Expand Down Expand Up @@ -133,6 +140,10 @@ def pytest_runtest_setup(item):
if (item.get_closest_marker("kafka") is not None):
pytest.skip("Skipping Kafka tests by default. Use --run_kafka to enable")

if (not item.config.getoption("--run_benchmark")):
if (item.get_closest_marker("benchmark") is not None):
pytest.skip("Skipping benchmark tests by default. Use --run_benchmark to enable")


def pytest_collection_modifyitems(config, items):
"""
Expand Down
9 changes: 6 additions & 3 deletions tests/test_add_scores_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ def test_add_scores_stage_pipe(config, tmp_path, order, pipeline_batch_size, rep
assert output_np.tolist() == expected.tolist()


def test_add_scores_stage_multi_segment_pipe(config, tmp_path):
@pytest.mark.parametrize('repeat', [1, 2, 5])
def test_add_scores_stage_multi_segment_pipe(config, tmp_path, repeat):
# Intentionally using FileSourceStage's repeat argument as this triggers a bug in #443
config.class_labels = ['frogs', 'lizards', 'toads', 'turtles']

input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.csv')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False, repeat=repeat))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
Expand All @@ -102,7 +104,8 @@ def test_add_scores_stage_multi_segment_pipe(config, tmp_path):

assert os.path.exists(out_file)

expected = np.loadtxt(input_file, delimiter=",", skiprows=1)
expected_data = np.loadtxt(input_file, delimiter=",", skiprows=1)
expected = np.concatenate([expected_data for _ in range(repeat)])

# The output data will contain an additional id column that we will need to slice off
# also somehow 0.7 ends up being 0.7000000000000001
Expand Down

0 comments on commit ebc31ff

Please sign in to comment.