From 63da1bb9089452c1e1f624bb950a765ff833ec0b Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Wed, 18 Jan 2023 11:47:28 -0800 Subject: [PATCH] Adds the SampleDataRequest/Response This adds the protos for querying any data samples from the SDK. --- .../model/fn_execution/v1/beam_fn_api.proto | 64 +++++++++++++++++++ .../model/pipeline/v1/beam_runner_api.proto | 7 ++ 2 files changed, 71 insertions(+) diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto index 3753fb768f8bd..938596738a82a 100644 --- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto @@ -109,6 +109,7 @@ message InstructionRequest { FinalizeBundleRequest finalize_bundle = 1004; MonitoringInfosMetadataRequest monitoring_infos = 1005; HarnessMonitoringInfosRequest harness_monitoring_infos = 1006; + SampleDataRequest sample = 1007; // DEPRECATED RegisterRequest register = 1000; @@ -137,12 +138,75 @@ message InstructionResponse { FinalizeBundleResponse finalize_bundle = 1004; MonitoringInfosMetadataResponse monitoring_infos = 1005; HarnessMonitoringInfosResponse harness_monitoring_infos = 1006; + SampleDataResponse sample = 1007; // DEPRECATED RegisterResponse register = 1000; } } +// If supported, the `SampleDataRequest` will respond with a +// `SampleDataResponse`. The SDK being queried must have the +// "beam:protocol:data_sampling:v1" capability. Samples are taken from all +// specified ProcessBundleDescriptor ids and all specified PCollection ids +// (equivalent to OR clause). An empty list will match everything. +message SampleDataRequest { + // (Optional) The ProcessBundleDescriptor ids to filter for. + repeated string process_bundle_descriptor_ids = 1; + + // (Optional) The PCollection ids to filter for. + repeated string pcollection_ids = 2; +} + + +// An element sampled when the SDK is processing a bundle. This is a proto +// message to allow for additional per-element metadata. +message SampledElement { + // Required. Sampled raw bytes for an element. This is a + // single encoded element in the nested context. + bytes element = 1; + + // FUTURE WORK: Capture lull detections and exceptions. + // + // Optional. Present if there was an exception + // processing the above element. + // + // LogEntry exception_entry = 2; +} + +// If supported, the `SampleDataResponse` will contain samples from PCollections +// based upon the filters specified in the request. +message SampleDataResponse { + message ElementList { + // Required. The individual elements sampled from a PCollection. + repeated SampledElement elements = 1; + } + + // Map from PCollection id to sampled elements. + map element_samples = 1; + + // FUTURE WORK: Investigate ways of storing multiple interesting types of + // sampled elements. There are two ways of accomplishing this: + // 1) Maps of typed elements: include multiple maps here with typed element + // proto messages, ex. + // + // message SlowElement {...} + // message ErroredElement {...} + // map slow_elements + // map errored_elements + // + // However, this forces an element into a single category. It disallows + // classification across multiple characteristics (like a slow and errored + // element). + // + // 2) Compositional types: allow for URN and payloads on the base + // SampledElement message for interesting characteristics. The base class + // can then be queried for specific URNs. This allows for multiple + // attributes on the same element. + // + // For a longer conversation, see https://github.com/apache/beam/pull/25065. +} + // A request to provide full MonitoringInfo associated with the entire SDK // harness process, not specific to a bundle. // diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 2f89bb97bd16c..644c6d4bda521 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1638,6 +1638,13 @@ message StandardProtocols { // SDKs ability to store the data in memory reducing the amount of memory // used overall. STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"]; + + // Indicates that this SDK can sample in-flight elements. These samples can + // then be queried using the SampleDataRequest. Samples are uniquely associated + // with a PCollection. Meaning, samples are taken for each PCollection + // during bundle processing. This is disabled by default and enabled with the + // `enable_data_sampling` experiment. + DATA_SAMPLING = 8 [(beam_urn) = "beam:protocol:data_sampling:v1"]; } }