Skip to content

Commit

Permalink
[FLINK-16671][python] Support for defining scopes on Python metric gr…
Browse files Browse the repository at this point in the history
…oup (apache#11470)
  • Loading branch information
hequn8128 committed Mar 26, 2020
1 parent 6b78fe0 commit cb67adc
Show file tree
Hide file tree
Showing 44 changed files with 386 additions and 53 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/python_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,11 @@
<td>String</td>
<td>The amount of memory to be allocated by the Python framework. The sum of the value of this configuration and "python.fn-execution.buffer.memory.size" represents the total memory of a Python worker. The memory will be accounted as managed memory if the actual memory allocated to an operator is no less than the total memory of a Python worker. Otherwise, this configuration takes no effect.</td>
</tr>
<tr>
<td><h5>python.metric.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is false, metric for Python will be disabled. You can disable the metric to achieve better performance at some circumstance.</td>
</tr>
</tbody>
</table>
5 changes: 4 additions & 1 deletion flink-python/pyflink/fn_execution/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.serializers import PickleSerializer
from pyflink.table import FunctionContext
from pyflink.metrics.metricbase import GenericMetricGroup

SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1"
TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
Expand All @@ -45,8 +47,9 @@ def __init__(self, name, spec, counter_factory, sampler, consumers):
self.variable_dict = {}
self.user_defined_funcs = []
self.func = self.generate_func(self.spec.serialized_fn)
base_metric_group = GenericMetricGroup(None, None)
for user_defined_func in self.user_defined_funcs:
user_defined_func.open(None)
user_defined_func.open(FunctionContext(base_metric_group))

def setup(self):
super(StatelessFunctionOperation, self).setup()
Expand Down
21 changes: 21 additions & 0 deletions flink-python/pyflink/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from pyflink.metrics.metricbase import MetricGroup

__all__ = ["MetricGroup"]
78 changes: 78 additions & 0 deletions flink-python/pyflink/metrics/metricbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum


class MetricGroup(abc.ABC):

def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
"""
Creates a new MetricGroup and adds it to this groups sub-groups.
If extra is not None, creates a new key-value MetricGroup pair. The key group
is added to this groups sub-groups, while the value group is added to the key
group's sub-groups. This method returns the value group.
The only difference between calling this method and
`group.add_group(key).add_group(value)` is that get_all_variables()
of the value group return an additional `"<key>"="value"` pair.
"""
pass


class MetricGroupType(Enum):
"""
Indicate the type of MetricGroup.
"""
generic = 0
key = 1
value = 2


class GenericMetricGroup(MetricGroup):

def __init__(
self,
parent,
name,
metric_group_type=MetricGroupType.generic):
self._parent = parent
self._sub_groups = []
self._name = name
self._metric_group_type = metric_group_type

def _add_group(self, name: str, metric_group_type) -> 'MetricGroup':
for group in self._sub_groups:
if name == group._name and metric_group_type == group._metric_group_type:
# we don't create same metric group repeatedly
return group

sub_group = GenericMetricGroup(
self,
name,
metric_group_type)
self._sub_groups.append(sub_group)
return sub_group

def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
if extra is None:
return self._add_group(name, MetricGroupType.generic)
else:
return self._add_group(name, MetricGroupType.key)\
._add_group(extra, MetricGroupType.value)
17 changes: 17 additions & 0 deletions flink-python/pyflink/metrics/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
40 changes: 40 additions & 0 deletions flink-python/pyflink/metrics/tests/test_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import unittest
from pyflink.metrics.metricbase import GenericMetricGroup, MetricGroup


class MetricTests(unittest.TestCase):

base_metric_group = GenericMetricGroup(None, None)

@staticmethod
def print_metric_group_path(mg: MetricGroup) -> str:
if mg._parent is None:
return 'root'
else:
return MetricTests.print_metric_group_path(mg._parent) + '.' + mg._name

def test_add_group(self):
new_group = MetricTests.base_metric_group.add_group('my_group')
self.assertEqual(MetricTests.print_metric_group_path(new_group), 'root.my_group')

def test_add_group_with_variable(self):
new_group = MetricTests.base_metric_group.add_group('key', 'value')
self.assertEqual(MetricTests.print_metric_group_path(new_group), 'root.key.value')
8 changes: 7 additions & 1 deletion flink-python/pyflink/table/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import inspect

from pyflink.java_gateway import get_gateway
from pyflink.metrics import MetricGroup
from pyflink.table.types import DataType, _to_java_type
from pyflink.util import utils

Expand All @@ -33,7 +34,12 @@ class FunctionContext(object):
user-defined function is executed. The information includes the metric group,
and global job parameters, etc.
"""
pass

def __init__(self, base_metric_group):
self._base_metric_group = base_metric_group

def get_metric_group(self) -> MetricGroup:
return self._base_metric_group


class UserDefinedFunction(abc.ABC):
Expand Down
2 changes: 2 additions & 0 deletions flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def remove_if_exists(file_path):
'pyflink.dataset',
'pyflink.common',
'pyflink.fn_execution',
'pyflink.metrics',
'pyflink.ml',
'pyflink.lib',
'pyflink.opt',
'pyflink.conf',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.util.Preconditions;

import org.apache.beam.model.pipeline.v1.RunnerApi;
Expand All @@ -43,6 +44,8 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;

import javax.annotation.Nullable;

import java.util.Map;

/**
Expand Down Expand Up @@ -116,17 +119,24 @@ public abstract class AbstractPythonFunctionRunner<IN> implements PythonFunction
*/
protected transient DataOutputViewStreamWrapper baosWrapper;

/**
* The flinkMetricContainer will be set to null if metric is configured to be turned off.
*/
@Nullable private FlinkMetricContainer flinkMetricContainer;

public AbstractPythonFunctionRunner(
String taskName,
FnDataReceiver<byte[]> resultReceiver,
PythonEnvironmentManager environmentManager,
StateRequestHandler stateRequestHandler,
Map<String, String> jobOptions) {
Map<String, String> jobOptions,
@Nullable FlinkMetricContainer flinkMetricContainer) {
this.taskName = Preconditions.checkNotNull(taskName);
this.resultReceiver = Preconditions.checkNotNull(resultReceiver);
this.environmentManager = Preconditions.checkNotNull(environmentManager);
this.stateRequestHandler = Preconditions.checkNotNull(stateRequestHandler);
this.jobOptions = Preconditions.checkNotNull(jobOptions);
this.flinkMetricContainer = flinkMetricContainer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public class PythonConfig implements Serializable {
@Nullable
private final String pythonExec;

/**
* Whether metric is enabled.
*/
private final boolean metricEnabled;

public PythonConfig(Configuration config) {
maxBundleSize = config.get(PythonOptions.MAX_BUNDLE_SIZE);
maxBundleTimeMills = config.get(PythonOptions.MAX_BUNDLE_TIME_MILLS);
Expand All @@ -114,6 +119,7 @@ public PythonConfig(Configuration config) {
pythonRequirementsCacheDirInfo = config.getString(PYTHON_REQUIREMENTS_CACHE, null);
pythonArchivesInfo = config.getString(PYTHON_ARCHIVES, null);
pythonExec = config.getString(PYTHON_EXEC, null);
metricEnabled = config.getBoolean(PythonOptions.PYTHON_METRIC_ENABLED);
}

public int getMaxBundleSize() {
Expand Down Expand Up @@ -155,4 +161,8 @@ public Optional<String> getPythonArchivesInfo() {
public Optional<String> getPythonExec() {
return Optional.ofNullable(pythonExec);
}

public boolean isMetricEnabled() {
return metricEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,13 @@ public class PythonOptions {
"buffer of a Python worker. The memory will be accounted as managed memory if the " +
"actual memory allocated to an operator is no less than the total memory of a Python " +
"worker. Otherwise, this configuration takes no effect.");

/**
* The configuration to enable or disable metric for Python execution.
*/
public static final ConfigOption<Boolean> PYTHON_METRIC_ENABLED = ConfigOptions
.key("python.metric.enabled")
.defaultValue(true)
.withDescription("When it is false, metric for Python will be disabled. You can " +
"disable the metric to achieve better performance at some circumstance.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.python.metric;

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;

/**
* Helper class for forwarding Python metrics to Java accumulators and metrics.
*/
@Internal
public class FlinkMetricContainer {

private final MetricGroup baseMetricGroup;

public FlinkMetricContainer(MetricGroup metricGroup) {
this.baseMetricGroup = metricGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.python.env.ProcessPythonEnvironmentManager;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryReservationException;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -353,4 +354,9 @@ protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOExc
"Execution type '%s' is not supported.", pythonEnv.getExecType()));
}
}

protected FlinkMetricContainer getFlinkMetricContainer() {
return this.config.isMetricEnabled() ?
new FlinkMetricContainer(getRuntimeContext().getMetricGroup()) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.python.env.ProcessPythonEnvironmentManager;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
Expand Down Expand Up @@ -294,7 +295,8 @@ private PythonFunctionRunner<Row> createPythonFunctionRunner() throws IOExceptio
createPythonEnvironmentManager(),
udfInputType,
udfOutputType,
jobOptions);
jobOptions,
getFlinkMetricContainer());
}

private PythonEnvironmentManager createPythonEnvironmentManager() throws IOException {
Expand Down Expand Up @@ -357,4 +359,9 @@ public void close() throws Exception {
super.close();
}
}

private FlinkMetricContainer getFlinkMetricContainer() {
return this.config.isMetricEnabled() ?
new FlinkMetricContainer(getRuntimeContext().getMetricGroup()) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
pythonEnvironmentManager,
userDefinedFunctionInputType,
userDefinedFunctionOutputType,
jobOptions);
jobOptions,
getFlinkMetricContainer());
}
}
Loading

0 comments on commit cb67adc

Please sign in to comment.