Skip to content

Commit

Permalink
[FLINK-22619][python] Drop usages of BatchTableEnvironment and old pl…
Browse files Browse the repository at this point in the history
…anner in Python

This is a major cleanup of the Python module that drops support for
BatchTableEnvironment and old planner.

Removes usages of:
- DataSet
- BatchTableEnvironment
- Legacy planner
- ExecutionEnvironment
  • Loading branch information
twalthr committed May 28, 2021
1 parent a1da49d commit 9fe0ec7
Show file tree
Hide file tree
Showing 54 changed files with 84 additions and 3,575 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ table_env = TableEnvironment.create(env_settings)

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
from pyflink.table import StreamTableEnvironment

# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Alternatively, users can create a `StreamTableEnvironment` from an existing `Str

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
from pyflink.table import StreamTableEnvironment

# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
Expand Down
16 changes: 6 additions & 10 deletions flink-end-to-end-tests/flink-python-test/python/python_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import sys
import tempfile

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
from pyflink.table import EnvironmentSettings, TableEnvironment


def word_count():
Expand All @@ -34,9 +33,8 @@ def word_count():
"License you may not use this file except in compliance " \
"with the License"

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(environment_settings=env_settings)

# used to test pipeline.jars and pipleline.classpaths
config_key = sys.argv[1]
Expand Down Expand Up @@ -68,19 +66,17 @@ def word_count():
'connector.path' = '{}'
)
""".format(result_path)
t_env.sql_update(sink_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python")
t_env.execute_sql("create temporary system function add_one as 'add_one.add_one' language python")
t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne")

elements = [(word, 0) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
.select("word, add_one(count) as count, add_one_java(count) as count_java") \
.group_by("word") \
.select("word, count(count) as count, count(count_java) as count_java") \
.insert_into("Results")

t_env.execute("word_count")
.execute_insert("Results")


if __name__ == '__main__':
Expand Down

This file was deleted.

This file was deleted.

20 changes: 0 additions & 20 deletions flink-end-to-end-tests/test-scripts/test_pyflink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,6 @@ PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-c org.apache.flink.python.tests.BlinkBatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"

echo "Test flink stream python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-c org.apache.flink.python.tests.FlinkStreamPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"

echo "Test flink batch python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-c org.apache.flink.python.tests.FlinkBatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"

echo "Test using python udf in sql client:\n"
SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf

Expand Down
3 changes: 0 additions & 3 deletions flink-python/dev/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
# test common module
test_module "common"

# test dataset module
test_module "dataset"

# test datastream module
test_module "datastream"

Expand Down
12 changes: 6 additions & 6 deletions flink-python/dev/pip_test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
# test pyflink shell environment
from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema

import tempfile
import os
Expand All @@ -28,9 +28,9 @@
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
b_env.set_parallelism(1)
t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
bt_env.connect(FileSystem().path(sink_path)) \
s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.connect(FileSystem().path(sink_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("a", DataTypes.BIGINT())
Expand All @@ -40,9 +40,9 @@
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())) \
.create_temporary_table("batch_sink")
.create_temporary_table("csv_sink")

t.select("a + 1, b, c").execute_insert("batch_sink").wait()
t.select("a + 1, b, c").execute_insert("csv_sink").wait()

with open(sink_path, 'r') as f:
lines = f.read()
Expand Down
1 change: 0 additions & 1 deletion flink-python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ Welcome to Flink Python API Docs!
pyflink
pyflink.common
pyflink.table
pyflink.dataset
pyflink.datastream
pyflink.metrics

Expand Down
28 changes: 0 additions & 28 deletions flink-python/docs/pyflink.dataset.rst

This file was deleted.

1 change: 0 additions & 1 deletion flink-python/docs/pyflink.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ Subpackages

pyflink.common
pyflink.table
pyflink.dataset
pyflink.datastream

.. automodule:: pyflink
Expand Down
8 changes: 1 addition & 7 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -80,12 +80,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Beam dependencies -->

Expand Down
8 changes: 4 additions & 4 deletions flink-python/pyflink/common/tests/test_execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
Expand All @@ -24,7 +24,7 @@
class ExecutionConfigTests(PyFlinkTestCase):

def setUp(self):
self.env = ExecutionEnvironment.get_execution_environment()
self.env = StreamExecutionEnvironment.get_execution_environment()
self.execution_config = self.env.get_config()

def test_constant(self):
Expand Down Expand Up @@ -253,9 +253,9 @@ def test_get_set_use_snapshot_compression(self):

def test_equals_and_hash(self):

config1 = ExecutionEnvironment.get_execution_environment().get_config()
config1 = StreamExecutionEnvironment.get_execution_environment().get_config()

config2 = ExecutionEnvironment.get_execution_environment().get_config()
config2 = StreamExecutionEnvironment.get_execution_environment().get_config()

self.assertEqual(config1, config2)

Expand Down
27 changes: 0 additions & 27 deletions flink-python/pyflink/dataset/__init__.py

This file was deleted.

Loading

0 comments on commit 9fe0ec7

Please sign in to comment.