Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16671][python] Support for defining scopes and variables on Python metric group #11470

Merged
merged 4 commits into from
Mar 26, 2020

Conversation

hequn8128
Copy link
Contributor

@hequn8128 hequn8128 commented Mar 21, 2020

What is the purpose of the change

This pull request supports for defining scopes and variables on Python metric group, i.e., adding get_metric_group() on FunctionContext, adding add_group in MetricGroup.

Brief change log

  • Adds get_metric_group() on FunctionContext
  • Adds add_group in MetricGroup
  • Adds python.metric.enabled configuration to enable or disable metric for Python UDFs.

Verifying this change

This change added tests and can be verified as follows:

  • Adds tests in testUDFProtoConstructedProperlyForSingleUDF to verify the metric group proto info.
  • Adds test_metric to verify operations in Python MetricGroup.
  • Adds test_metric_it_case to verify metric group info which is passed from Java to Python

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (PythonDocs, Note: more documentation will be added in FLINK-16674)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 5229466 (Sat Mar 21 07:52:52 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 21, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hequn8128 Thanks a lot for the great work! LGTM overall. Have left just a few minor comments.

<td><h5>python.udf.metric.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is false, metric for Python UDFs will be disabled. You can disable the metric to achieve a better performance at some circumstance.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: a better performance -> better performance


def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
"""
if extra is not None, creates a new key-value MetricGroup pair. The key group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: if -> If

"""
pass

def get_scope_components(self) -> []:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[] -> [str]?

"""
pass

def get_all_variables(self) -> map:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map[str, str]

child_variables[self._name] = name

# add scope components
child_compoents = self.get_scope_components().copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: child_compoents -> child_components

* The configuration to enable or disable metric for Python UDFs.
*/
public static final ConfigOption<Boolean> PYTHON_UDF_METRIC_ENABLED = ConfigOptions
.key("python.udf.metric.enabled")
Copy link
Contributor

@dianfu dianfu Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about name it as "python.metrics.enabled" or something else as this configuration is not only used for Python UDF, but will also used for the Python DataStream API which maybe introduced in the future?

/**
* Gets the proto representation of the base MetricGroup used for all user-defined functions.
*/
protected FlinkFnApi.MetricGroupInfo getBaseMetricGroupInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about move this method to AbstractPythonFunctionRunner as the metrics will not only be used by UDF/UDTF, but will also be used by UDAF and the Python DataStream API which maybe introduced in the future?


class UserDefinedFunctionTests(object):

def test_chaining_scalar_function(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name doesn't reflect the purpose of the test case.


def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
"""
if extra is not None, creates a new key-value MetricGroup pair. The key group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also add documentation about what will happen if extra is None.

* Helper class for forwarding metric group information from Java to Python and forward Python
* metrics to Java accumulators and metrics.
*/
public class FlinkMetricContainer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark it as @Internal

@hequn8128
Copy link
Contributor Author

@dianfu Thanks a lot for your suggestions. The PR has been updated.

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hequn8128 Thanks a lot for the update. Have left a few minor comments.

I have thought about the interfaces get_all_variables, get_scope_components andget_metric_identifier again. Although there are these kinds of interfaces in the Java MetricGroup, it seems to me that they are provided to be used by the MetricsReporter. Personally I think it's not necessary to expose them in the Python UDF(also the Java UDF). As I'm not quite familiar with metrics, I'm not quite sure about this. What's your thought about this?

@@ -55,7 +59,7 @@

private static final String MAIN_INPUT_ID = "input";

private final String taskName;
protected final String taskName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change

/**
* The flinkMetricContainer will be set to null if metric is configured to be turned off.
*/
@Nullable protected FlinkMetricContainer flinkMetricContainer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to private?

@@ -224,4 +235,15 @@ public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) throws Ex
public abstract ExecutableStage createExecutableStage() throws Exception;

public abstract OutputReceiverFactory createOutputReceiverFactory();

/**
* Gets the proto representation of the base MetricGroup used for all user-defined functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about remove the used for all user-defined functions as metrics will be used not only for UDF?

@@ -95,4 +95,17 @@ public void testArrowBatchSize() {
final int actualArrowBatchSize = configuration.getInteger(PythonOptions.MAX_ARROW_BATCH_SIZE);
assertThat(actualArrowBatchSize, is(equalTo(expectedArrowBatchSize)));
}

@Test
public void testPthonUDFMetricEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to some other name like testPythonMetric or testPythonMetricEnable?

@hequn8128
Copy link
Contributor Author

@dianfu Thanks a lot for your nice suggestions. The PR has been updated accordingly.

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hequn8128 Thanks a lot for the update. +1 to merge. Have only a few small typo comments.

is added to this groups sub-groups, while the value group is added to the key
group's sub-groups. This method returns the value group.
The only difference between calling this method and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments should be updated as get_all_variables doesn't exist any more

@@ -103,6 +103,11 @@
@Nullable
private final String pythonExec;

/**
* Whether metric is enabled for Python UDFs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove for Python UDFs.?

@@ -95,4 +95,17 @@ public void testArrowBatchSize() {
final int actualArrowBatchSize = configuration.getInteger(PythonOptions.MAX_ARROW_BATCH_SIZE);
assertThat(actualArrowBatchSize, is(equalTo(expectedArrowBatchSize)));
}

@Test
public void testPthonMetricEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Pthon -> Python

@hequn8128
Copy link
Contributor Author

The tests has been passed: https://travis-ci.org/github/hequn8128/flink/builds/667156612

@dianfu dianfu merged commit cb67adc into apache:master Mar 26, 2020
KarmaGYZ pushed a commit to KarmaGYZ/flink that referenced this pull request Mar 31, 2020
KarmaGYZ pushed a commit to KarmaGYZ/flink that referenced this pull request Apr 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants