-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-16671][python] Support for defining scopes and variables on Python metric group #11470
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 5229466 (Sat Mar 21 07:52:52 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
…thon metric group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hequn8128 Thanks a lot for the great work! LGTM overall. Have left just a few minor comments.
<td><h5>python.udf.metric.enabled</h5></td> | ||
<td style="word-wrap: break-word;">true</td> | ||
<td>Boolean</td> | ||
<td>When it is false, metric for Python UDFs will be disabled. You can disable the metric to achieve a better performance at some circumstance.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: a better performance -> better performance
|
||
def add_group(self, name: str, extra: str = None) -> 'MetricGroup': | ||
""" | ||
if extra is not None, creates a new key-value MetricGroup pair. The key group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: if -> If
""" | ||
pass | ||
|
||
def get_scope_components(self) -> []: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[] -> [str]?
""" | ||
pass | ||
|
||
def get_all_variables(self) -> map: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map[str, str]
child_variables[self._name] = name | ||
|
||
# add scope components | ||
child_compoents = self.get_scope_components().copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: child_compoents -> child_components
* The configuration to enable or disable metric for Python UDFs. | ||
*/ | ||
public static final ConfigOption<Boolean> PYTHON_UDF_METRIC_ENABLED = ConfigOptions | ||
.key("python.udf.metric.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about name it as "python.metrics.enabled" or something else as this configuration is not only used for Python UDF, but will also used for the Python DataStream API which maybe introduced in the future?
/** | ||
* Gets the proto representation of the base MetricGroup used for all user-defined functions. | ||
*/ | ||
protected FlinkFnApi.MetricGroupInfo getBaseMetricGroupInfo() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about move this method to AbstractPythonFunctionRunner as the metrics will not only be used by UDF/UDTF, but will also be used by UDAF and the Python DataStream API which maybe introduced in the future?
|
||
class UserDefinedFunctionTests(object): | ||
|
||
def test_chaining_scalar_function(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name doesn't reflect the purpose of the test case.
|
||
def add_group(self, name: str, extra: str = None) -> 'MetricGroup': | ||
""" | ||
if extra is not None, creates a new key-value MetricGroup pair. The key group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also add documentation about what will happen if extra is None.
* Helper class for forwarding metric group information from Java to Python and forward Python | ||
* metrics to Java accumulators and metrics. | ||
*/ | ||
public class FlinkMetricContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark it as @Internal
@dianfu Thanks a lot for your suggestions. The PR has been updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hequn8128 Thanks a lot for the update. Have left a few minor comments.
I have thought about the interfaces get_all_variables
, get_scope_components
andget_metric_identifier
again. Although there are these kinds of interfaces in the Java MetricGroup, it seems to me that they are provided to be used by the MetricsReporter. Personally I think it's not necessary to expose them in the Python UDF(also the Java UDF). As I'm not quite familiar with metrics, I'm not quite sure about this. What's your thought about this?
@@ -55,7 +59,7 @@ | |||
|
|||
private static final String MAIN_INPUT_ID = "input"; | |||
|
|||
private final String taskName; | |||
protected final String taskName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change
/** | ||
* The flinkMetricContainer will be set to null if metric is configured to be turned off. | ||
*/ | ||
@Nullable protected FlinkMetricContainer flinkMetricContainer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to private?
@@ -224,4 +235,15 @@ public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) throws Ex | |||
public abstract ExecutableStage createExecutableStage() throws Exception; | |||
|
|||
public abstract OutputReceiverFactory createOutputReceiverFactory(); | |||
|
|||
/** | |||
* Gets the proto representation of the base MetricGroup used for all user-defined functions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about remove the used for all user-defined functions
as metrics will be used not only for UDF?
@@ -95,4 +95,17 @@ public void testArrowBatchSize() { | |||
final int actualArrowBatchSize = configuration.getInteger(PythonOptions.MAX_ARROW_BATCH_SIZE); | |||
assertThat(actualArrowBatchSize, is(equalTo(expectedArrowBatchSize))); | |||
} | |||
|
|||
@Test | |||
public void testPthonUDFMetricEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to some other name like testPythonMetric
or testPythonMetricEnable
?
@dianfu Thanks a lot for your nice suggestions. The PR has been updated accordingly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hequn8128 Thanks a lot for the update. +1 to merge. Have only a few small typo comments.
is added to this groups sub-groups, while the value group is added to the key | ||
group's sub-groups. This method returns the value group. | ||
The only difference between calling this method and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments should be updated as get_all_variables doesn't exist any more
@@ -103,6 +103,11 @@ | |||
@Nullable | |||
private final String pythonExec; | |||
|
|||
/** | |||
* Whether metric is enabled for Python UDFs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove for Python UDFs.
?
@@ -95,4 +95,17 @@ public void testArrowBatchSize() { | |||
final int actualArrowBatchSize = configuration.getInteger(PythonOptions.MAX_ARROW_BATCH_SIZE); | |||
assertThat(actualArrowBatchSize, is(equalTo(expectedArrowBatchSize))); | |||
} | |||
|
|||
@Test | |||
public void testPthonMetricEnabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: Pthon -> Python
The tests has been passed: https://travis-ci.org/github/hequn8128/flink/builds/667156612 |
What is the purpose of the change
This pull request supports for defining scopes and variables on Python metric group, i.e., adding
get_metric_group()
onFunctionContext
, addingadd_group
inMetricGroup
.Brief change log
get_metric_group()
onFunctionContext
add_group
inMetricGroup
python.metric.enabled
configuration to enable or disable metric for Python UDFs.Verifying this change
This change added tests and can be verified as follows:
testUDFProtoConstructedProperlyForSingleUDF
to verify the metric group proto info.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation