Skip to content

Commit

Permalink
Add flag to control automatic exception sampling in Python (disabled) (
Browse files Browse the repository at this point in the history
…apache#28418)

* Always-on exception sampling to Python (disable)

* Fix only_sample_exceptions logic

* trigger tests

* fix flaky test

---------

Co-authored-by: Sam Rohde <srohde@google.com>
  • Loading branch information
rohdesamuel and Sam Rohde committed Sep 13, 2023
1 parent 647704e commit 1b42ded
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 7 deletions.
29 changes: 28 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from apache_beam.coders.coder_impl import CoderImpl
from apache_beam.coders.coder_impl import WindowedValueCoderImpl
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.utils.windowed_value import WindowedValue

Expand Down Expand Up @@ -216,17 +218,42 @@ def __init__(
self,
max_samples: int = 10,
sample_every_sec: float = 30,
sample_only_exceptions: bool = False,
clock=None) -> None:
# Key is PCollection id. Is guarded by the _samplers_lock.
self._samplers: Dict[str, OutputSampler] = {}
# Bundles are processed in parallel, so new samplers may be added when the
# runner queries for samples.
self._samplers_lock: threading.Lock = threading.Lock()
self._max_samples = max_samples
self._sample_every_sec = sample_every_sec
self._sample_every_sec = 0.0 if sample_only_exceptions else sample_every_sec
self._samplers_by_output: Dict[str, List[OutputSampler]] = {}
self._clock = clock

_ENABLE_DATA_SAMPLING = 'enable_data_sampling'
_ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'enable_always_on_exception_sampling'
_DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'disable_always_on_exception_sampling'

@staticmethod
def create(sdk_pipeline_options: PipelineOptions, **kwargs):
experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []

# When true, enables only the sampling of exceptions.
always_on_exception_sampling = (
DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING in experiments and
DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING not in experiments)

# When true, enables the sampling of all PCollections and exceptions.
enable_data_sampling = DataSampler._ENABLE_DATA_SAMPLING in experiments

if enable_data_sampling or always_on_exception_sampling:
sample_only_exceptions = (
always_on_exception_sampling and not enable_data_sampling)
return DataSampler(
sample_only_exceptions=sample_only_exceptions, **kwargs)
else:
return None

def stop(self) -> None:
"""Stops all sampling, does not clear samplers in case there are outstanding
samples.
Expand Down
102 changes: 101 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from apache_beam.coders import FastPrimitivesCoder
from apache_beam.coders import WindowedValueCoder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker.data_sampler import DataSampler
from apache_beam.runners.worker.data_sampler import OutputSampler
Expand Down Expand Up @@ -56,7 +57,9 @@ def make_test_descriptor(
return descriptor

def setUp(self):
self.data_sampler = DataSampler(sample_every_sec=0.1)
self.data_sampler = DataSampler.create(
PipelineOptions(experiments=[DataSampler._ENABLE_DATA_SAMPLING]),
sample_every_sec=0.1)

def tearDown(self):
self.data_sampler.stop()
Expand Down Expand Up @@ -341,6 +344,103 @@ def test_can_sample_exceptions(self):
samples = self.data_sampler.wait_for_samples([MAIN_PCOLLECTION_ID])
self.assertGreater(len(samples.element_samples), 0)

def test_create_experiments(self):
"""Tests that the experiments correctly make the DataSampler."""
enable_exception_exp = DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING
disable_exception_exp = DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING
enable_sampling_exp = DataSampler._ENABLE_DATA_SAMPLING

self.assertIsNone(DataSampler.create(PipelineOptions()))

exp = [disable_exception_exp]
self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_exception_exp, disable_exception_exp]
self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_exception_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_sampling_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_sampling_exp, enable_exception_exp, disable_exception_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

def test_samples_all_with_both_experiments(self):
"""Tests that the using both sampling experiments samples everything."""
self.data_sampler = DataSampler.create(
PipelineOptions(
experiments=[
DataSampler._ENABLE_DATA_SAMPLING,
DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING
]),
sample_every_sec=0.1)

# Create a descriptor with one transform with two outputs, 'a' and 'b'.
descriptor = self.make_test_descriptor(outputs=['a', 'b'])
self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
exc_info = None
try:
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples(['a', 'b'])
self.assertEqual(len(samples.element_samples), 2)
self.assertTrue(
samples.element_samples['a'].elements[0].HasField('exception'))
self.assertFalse(
samples.element_samples['b'].elements[0].HasField('exception'))

def test_only_sample_exceptions(self):
"""Tests that the exception sampling experiment only samples exceptions."""
self.data_sampler = DataSampler.create(
PipelineOptions(
experiments=[DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING]),
sample_every_sec=0.1)

# Create a descriptor with one transform with two outputs, 'a' and 'b'.
descriptor = self.make_test_descriptor(outputs=['a', 'b'])
self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
exc_info = None
try:
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples([])
self.assertEqual(len(samples.element_samples), 1)
self.assertIsNotNone(samples.element_samples['a'].elements[0].exception)


class OutputSamplerTest(unittest.TestCase):
def tearDown(self):
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,16 @@ def update_counters_start(self, windowed_value):
# type: (WindowedValue) -> None
self.opcounter.update_from(windowed_value)

if self.execution_context is not None:
self.execution_context.output_sampler = self.output_sampler

# The following code is optimized by inlining a function call. Because this
# is called for every element, a function call is too expensive (order of
# 100s of nanoseconds). Furthermore, a lock was purposefully not used
# between here and the DataSampler as an additional operation. The tradeoff
# is that some samples might be dropped, but it is better than the
# alternative which is double sampling the same element.
if self.element_sampler is not None and self.execution_context is not None:
self.execution_context.output_sampler = self.output_sampler
if self.element_sampler is not None:
if not self.element_sampler.has_element:
self.element_sampler.el = windowed_value
self.element_sampler.has_element = True
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ def create_harness(environment, dry_run=False):
if dry_run:
return

data_sampler = None
if 'enable_data_sampling' in experiments:
data_sampler = DataSampler()
data_sampler = DataSampler.create(sdk_pipeline_options)

sdk_harness = SdkHarness(
control_address=control_service_descriptor.url,
Expand Down

0 comments on commit 1b42ded

Please sign in to comment.