Skip to content

Commit

Permalink
[FLINK-19131][python] Add support of Python 3.8 in PyFlink
Browse files Browse the repository at this point in the history
This closes apache#13334.
  • Loading branch information
sunjincheng121 authored and dianfu committed Sep 8, 2020
1 parent 13e0b35 commit c1a12e9
Show file tree
Hide file tree
Showing 40 changed files with 147 additions and 2,120 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/python_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<td><h5>python.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version &gt;= 7.1.0) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version &gt;= 7.1.0) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
<td><h5>python.files</h5></td>
Expand Down
4 changes: 2 additions & 2 deletions docs/dev/python/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ under the License.
{:toc}

## Environment Requirements
<span class="label label-info">Note</span> Python version (3.5, 3.6 or 3.7) is required for PyFlink. Please run the following command to make sure that it meets the requirements:
<span class="label label-info">Note</span> Python version (3.5, 3.6, 3.7 or 3.8) is required for PyFlink. Please run the following command to make sure that it meets the requirements:

{% highlight bash %}
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}

## Installation of PyFlink
Expand Down
4 changes: 2 additions & 2 deletions docs/dev/python/installation.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ under the License.
{:toc}

## 环境要求
<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6 或 3.7)。请运行以下命令,以确保Python版本满足要求。
<span class="label label-info">注意</span> PyFlink需要特定的Python版本(3.5, 3.6, 3.7 或 3.8)。请运行以下命令,以确保Python版本满足要求。

{% highlight bash %}
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}

## PyFlink 安装
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/python/table-api-users-guide/udfs/python_udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.

User-defined functions are important features, because they significantly extend the expressiveness of Python Table API programs.

**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side.
**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side.

* This will be replaced by the TOC
{:toc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.

用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。

**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。
**注意:** 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本(3.5、3.6、3.7 或 3.8),并安装PyFlink。

* This will be replaced by the TOC
{:toc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ These Python libraries are highly optimized and provide high-performance data st
[non-vectorized user-defined functions]({% link dev/python/table-api-users-guide/udfs/python_udfs.md %}) on how to define vectorized user-defined functions.
Users only need to add an extra parameter `udf_type="pandas"` in the decorator `udf` to mark it as a vectorized user-defined function.

**NOTE:** Python UDF execution requires Python version (3.5, 3.6 or 3.7) with PyFlink installed. It's required on both the client side and the cluster side.
**NOTE:** Python UDF execution requires Python version (3.5, 3.6, 3.7 or 3.8) with PyFlink installed. It's required on both the client side and the cluster side.

* This will be replaced by the TOC
{:toc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ under the License.
向量化用户自定义函数的定义,与[非向量化用户自定义函数]({% link dev/python/table-api-users-guide/udfs/python_udfs.zh.md %})具有相似的方式,
用户只需要在调用`udf`装饰器时添加一个额外的参数`udf_type="pandas"`,将其标记为一个向量化用户自定义函数即可。

**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6或3.7)。客户端和群集端都需要安装它。
**注意:**要执行Python UDF,需要安装PyFlink的Python版本(3.5、3.6、3.7 或 3.8)。客户端和群集端都需要安装它。

* This will be replaced by the TOC
{:toc}
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Mode "embedded" submits Flink jobs from the local machine.
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/sqlClient.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Mode "embedded" submits Flink jobs from the local machine.
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
Expand Down
4 changes: 2 additions & 2 deletions docs/flinkDev/building.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ mvn clean install -DskipTests -Dfast

If you want to build a PyFlink package that can be used for pip installation, you need to build the Flink project first, as described in [Build Flink](#build-flink).

2. Python version(3.5, 3.6 or 3.7) is required
2. Python version(3.5, 3.6, 3.7 or 3.8) is required

```shell
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
```

3. Build PyFlink with Cython extension support (optional)
Expand Down
4 changes: 2 additions & 2 deletions docs/flinkDev/building.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ mvn clean install -DskipTests -Dfast

如果想构建一个可用于 pip 安装的 PyFlink 包,需要先构建 Flink 工程,如 [构建 Flink](#build-flink) 中所述。

2. Python 的版本为 3.5, 3.6 或者 3.7.
2. Python 的版本为 3.5, 3.6, 3.7 或者 3.8.

```shell
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
```

3. 构建 PyFlink 的 Cython 扩展模块(可选的)
Expand Down
8 changes: 4 additions & 4 deletions docs/ops/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ These examples about how to submit a job in CLI.

<div data-lang="python" markdown="1">

<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6 or 3.7:
<span class="label label-info">Note</span> When submitting Python job via `flink run`, Flink will run the command “python”. Please run the following command to confirm that the command “python” in current environment points to a specified Python version 3.5, 3.6, 3.7 or 3.8:

{% highlight bash %}
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}

- Run Python Table program:
Expand Down Expand Up @@ -374,8 +374,8 @@ Action "run" compiles and runs a program.
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on a specified Python
version 3.5, 3.6 or 3.7, Apache Beam
(version == 2.19.0), Pip (version >= 7.1.0)
version 3.5, 3.6 3.7 or 3.8, Apache Beam
(version == 2.23.0), Pip (version >= 7.1.0)
and SetupTools (version >= 37.0.0).
Please ensure that the specified environment
meets the above requirements.
Expand Down
8 changes: 4 additions & 4 deletions docs/ops/cli.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ option.

<div data-lang="python" markdown="1">

<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.5, 3.6 或者 3.7中的一个
<span class="label label-info">注意</span> 通过`flink run`提交Python任务时Flink会调用“python”命令。请执行以下命令以确认当前环境下的指令“python”指向Python的版本为3.53.6、3.7 或 3.8 中的一个

{% highlight bash %}
$ python --version
# the version printed here must be 3.5, 3.6 or 3.7
# the version printed here must be 3.5, 3.6, 3.7 or 3.8
{% endhighlight %}

- 提交一个Python Table的作业:
Expand Down Expand Up @@ -373,8 +373,8 @@ Action "run" compiles and runs a program.
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on a specified Python
version 3.5, 3.6 or 3.7, Apache Beam
(version == 2.19.0), Pip (version >= 7.1.0)
version 3.5, 3.6 3.7 or 3.8, Apache Beam
(version == 2.23.0), Pip (version >= 7.1.0)
and SetupTools (version >= 37.0.0).
Please ensure that the specified environment
meets the above requirements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public class CliFrontendParser {
public static final Option PYEXEC_OPTION = new Option("pyexec", "pyExecutable", true,
"Specify the path of the python interpreter used to execute the python UDF worker " +
"(e.g.: --pyExecutable /usr/local/bin/python3). " +
"The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), " +
"The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), " +
"Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " +
"Please ensure that the specified environment meets the above requirements.");

Expand Down
2 changes: 1 addition & 1 deletion flink-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The auto-generated Python docs can be found at [https://ci.apache.org/projects/f

## Python Requirements

Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.19.0) and jsonpickle (currently 1.2).
Apache Flink Python API depends on Py4J (currently version 0.10.8.1), CloudPickle (currently version 1.2.2), python-dateutil(currently version 2.8.0), Apache Beam (currently version 2.23.0) and jsonpickle (currently 1.2).

## Development Notices

Expand Down
2 changes: 1 addition & 1 deletion flink-python/dev/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
# limitations under the License.
setuptools>=18.0
wheel
apache-beam==2.19.0
apache-beam==2.23.0
cython==0.29.16
6 changes: 3 additions & 3 deletions flink-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ function install_miniconda() {
# Install some kinds of py env.
function install_py_env() {
py_env=("3.5" "3.6" "3.7")
py_env=("3.5" "3.6" "3.7" "3.8")
for ((i=0;i<${#py_env[@]};i++)) do
if [ -d "$CURRENT_DIR/.conda/envs/${py_env[i]}" ]; then
rm -rf "$CURRENT_DIR/.conda/envs/${py_env[i]}"
Expand Down Expand Up @@ -357,7 +357,7 @@ function install_environment() {
print_function "STEP" "install miniconda... [SUCCESS]"

# step-3 install python environment whcih includes
# 3.5 3.6 3.7
# 3.5 3.6 3.7 3.8
if [ $STEP -lt 3 ] && [ `need_install_component "py_env"` = true ]; then
print_function "STEP" "installing python environment..."
install_py_env
Expand Down Expand Up @@ -696,7 +696,7 @@ usage: $0 [options]
-l list all checks supported.
Examples:
./lint-python -s basic => install environment with basic components.
./lint-python -s py_env => install environment with python env(3.5,3.6,3.7).
./lint-python -s py_env => install environment with python env(3.5,3.6,3.7,3.8).
./lint-python -s all => install environment with all components such as python env,tox,flake8,sphinx etc.
./lint-python -s tox,flake8 => install environment with tox,flake8.
./lint-python -s tox -f => reinstall environment with tox.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def set_python_executable(self, python_exec: str):
.. note::
The python udf worker depends on Apache Beam (version == 2.19.0).
The python udf worker depends on Apache Beam (version == 2.23.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
10 changes: 2 additions & 8 deletions flink-python/pyflink/fn_execution/beam/beam_boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,19 @@ def check_not_empty(check_str, error_message):
parser = argparse.ArgumentParser()

parser.add_argument("--id", default="", help="Local identifier (required).")
parser.add_argument("--logging_endpoint", default="",
help="Logging endpoint (required).")
parser.add_argument("--provision_endpoint", default="",
help="Provision endpoint (required).")
parser.add_argument("--control_endpoint", default="",
help="Control endpoint (required).")
parser.add_argument("--semi_persist_dir", default="/tmp",
help="Local semi-persistent directory (optional).")

args = parser.parse_known_args()[0]

worker_id = args.id
logging_endpoint = args.logging_endpoint
provision_endpoint = args.provision_endpoint
control_endpoint = args.control_endpoint
semi_persist_dir = args.semi_persist_dir

check_not_empty(worker_id, "No id provided.")
check_not_empty(logging_endpoint, "No logging endpoint provided.")
check_not_empty(provision_endpoint, "No provision endpoint provided.")
check_not_empty(control_endpoint, "No control endpoint provided.")

logging.info("Initializing python harness: %s" % " ".join(sys.argv))

Expand All @@ -89,6 +81,8 @@ def check_not_empty(check_str, error_message):
client = ProvisionServiceStub(channel=channel)
info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info
options = json_format.MessageToJson(info.pipeline_options)
logging_endpoint = info.logging_endpoint.url
control_endpoint = info.control_endpoint.url

os.environ["WORKER_ID"] = worker_id
os.environ["PIPELINE_OPTIONS"] = options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ cdef class BeamStatelessFunctionOperation(Operation):
str(tag)] = receiver.opcounter.element_counter.value()
return metrics

cpdef monitoring_infos(self, transform_id):
# only pass user metric to Java
cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id):
"""
Only pass user metric to Java
:param tag_to_pcollection_id: useless for user metric
"""
return self.user_monitoring_infos(transform_id)

cdef void _update_gauge(self, base_metric_group):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ def process(self, o: WindowedValue):
self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
output_stream.maybe_flush()

def monitoring_infos(self, transform_id):
# only pass user metric to Java
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
"""
Only pass user metric to Java
:param tag_to_pcollection_id: useless for user metric
"""
return super().user_monitoring_infos(transform_id)

def generate_func(self, udfs) -> tuple:
Expand Down
11 changes: 0 additions & 11 deletions flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,8 @@ def test_param_validation(self):

args = [self.runner_path, "--id", "1"]
exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
self.assertIn("No logging endpoint provided.", exit_message)

args = [self.runner_path, "--id", "1",
"--logging_endpoint", "localhost:0000"]
exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
self.assertIn("No provision endpoint provided.", exit_message)

args = [self.runner_path, "--id", "1",
"--logging_endpoint", "localhost:0000",
"--provision_endpoint", "localhost:%d" % self.provision_port]
exit_message = subprocess.check_output(args, env=self.env).decode("utf-8")
self.assertIn("No control endpoint provided.", exit_message)

def test_set_working_directory(self):
JProcessPythonEnvironmentManager = \
get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/table/table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def set_python_executable(self, python_exec):
.. note::
The python udf worker depends on Apache Beam (version == 2.19.0).
The python udf worker depends on Apache Beam (version == 2.23.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
9 changes: 6 additions & 3 deletions flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,11 @@ def find_file_path(pattern):
author='Apache Software Foundation',
author_email='dev@flink.apache.org',
python_requires='>=3.5',
install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0',
install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.23.0',
'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2',
'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'],
'pandas>=0.24.2,<1; python_full_version < "3.5.3"',
'pandas>=0.25.2,<1; python_full_version >= "3.5.3"',
'pyarrow>=0.15.1,<0.18.0', 'pytz>=2018.3'],
cmdclass={'build_ext': build_ext},
tests_require=['pytest==4.4.1'],
description='Apache Flink Python API',
Expand All @@ -328,7 +330,8 @@ def find_file_path(pattern):
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7'],
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8'],
ext_modules=extensions
)
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;

/** An implementation of the Beam Fn State service. */
public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
Expand Down
Loading

0 comments on commit c1a12e9

Please sign in to comment.