Skip to content

Commit

Permalink
Adds the SampleDataRequest/Response
Browse files Browse the repository at this point in the history
This adds the protos for querying any data samples from the SDK.
  • Loading branch information
Sam Rohde committed Jan 25, 2023
1 parent a96afe2 commit 63da1bb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ message InstructionRequest {
FinalizeBundleRequest finalize_bundle = 1004;
MonitoringInfosMetadataRequest monitoring_infos = 1005;
HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
SampleDataRequest sample = 1007;

// DEPRECATED
RegisterRequest register = 1000;
Expand Down Expand Up @@ -137,12 +138,75 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
SampleDataResponse sample = 1007;

// DEPRECATED
RegisterResponse register = 1000;
}
}

// If supported, the `SampleDataRequest` will respond with a
// `SampleDataResponse`. The SDK being queried must have the
// "beam:protocol:data_sampling:v1" capability. Samples are taken from all
// specified ProcessBundleDescriptor ids and all specified PCollection ids
// (equivalent to OR clause). An empty list will match everything.
message SampleDataRequest {
// (Optional) The ProcessBundleDescriptor ids to filter for.
repeated string process_bundle_descriptor_ids = 1;

// (Optional) The PCollection ids to filter for.
repeated string pcollection_ids = 2;
}


// An element sampled when the SDK is processing a bundle. This is a proto
// message to allow for additional per-element metadata.
message SampledElement {
// Required. Sampled raw bytes for an element. This is a
// single encoded element in the nested context.
bytes element = 1;

// FUTURE WORK: Capture lull detections and exceptions.
//
// Optional. Present if there was an exception
// processing the above element.
//
// LogEntry exception_entry = 2;
}

// If supported, the `SampleDataResponse` will contain samples from PCollections
// based upon the filters specified in the request.
message SampleDataResponse {
message ElementList {
// Required. The individual elements sampled from a PCollection.
repeated SampledElement elements = 1;
}

// Map from PCollection id to sampled elements.
map<string, ElementList> element_samples = 1;

// FUTURE WORK: Investigate ways of storing multiple interesting types of
// sampled elements. There are two ways of accomplishing this:
// 1) Maps of typed elements: include multiple maps here with typed element
// proto messages, ex.
//
// message SlowElement {...}
// message ErroredElement {...}
// map<string, SlowElement> slow_elements
// map<string, ErroredElement> errored_elements
//
// However, this forces an element into a single category. It disallows
// classification across multiple characteristics (like a slow and errored
// element).
//
// 2) Compositional types: allow for URN and payloads on the base
// SampledElement message for interesting characteristics. The base class
// can then be queried for specific URNs. This allows for multiple
// attributes on the same element.
//
// For a longer conversation, see https://github.com/apache/beam/pull/25065.
}

// A request to provide full MonitoringInfo associated with the entire SDK
// harness process, not specific to a bundle.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,13 @@ message StandardProtocols {
// SDKs ability to store the data in memory reducing the amount of memory
// used overall.
STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"];

// Indicates that this SDK can sample in-flight elements. These samples can
// then be queried using the SampleDataRequest. Samples are uniquely associated
// with a PCollection. Meaning, samples are taken for each PCollection
// during bundle processing. This is disabled by default and enabled with the
// `enable_data_sampling` experiment.
DATA_SAMPLING = 8 [(beam_urn) = "beam:protocol:data_sampling:v1"];
}
}

Expand Down

0 comments on commit 63da1bb

Please sign in to comment.