Skip to content

Commit

Permalink
Implement basic mapping capabilities for YAML. (#27096)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Jun 15, 2023
1 parent 22c6e72 commit ada507d
Show file tree
Hide file tree
Showing 7 changed files with 652 additions and 23 deletions.
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,13 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
result.nullable = True
return result

elif _safe_issubclass(type_, Sequence):
elif type_ == range:
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(
element_type=schema_pb2.FieldType(
atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[int])))

elif _safe_issubclass(type_, Sequence) and not _safe_issubclass(type_, str):
element_type = self.typing_to_runner_api(_get_args(type_)[0])
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(element_type=element_type))
Expand Down
43 changes: 30 additions & 13 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def guess_name_and_type(expr):
typ = float
else:
typ = str
elif '+' in expr:
typ = float
else:
part = parts[0]
if '.' in part:
Expand Down Expand Up @@ -172,6 +174,8 @@ def replace_recursive(spec, transform_type, arg_name, arg_value):


def create_test_method(test_type, test_name, test_yaml):
test_yaml = test_yaml.replace('pkg.module.fn', 'str')

def test(self):
with TestEnvironment() as env:
spec = yaml.load(test_yaml, Loader=SafeLoader)
Expand Down Expand Up @@ -202,6 +206,7 @@ def test(self):


def parse_test_methods(markdown_lines):
# pylint: disable=too-many-nested-blocks
code_lines = None
for ix, line in enumerate(markdown_lines):
line = line.rstrip()
Expand All @@ -211,26 +216,38 @@ def parse_test_methods(markdown_lines):
test_type = 'RUN'
test_name = f'test_line_{ix + 2}'
else:
if code_lines and code_lines[0] == 'pipeline:':
yaml_pipeline = '\n'.join(code_lines)
if 'providers:' in yaml_pipeline:
test_type = 'PARSE'
yield test_name, create_test_method(
test_type,
test_name,
yaml_pipeline)
if code_lines:
if code_lines[0].startswith('- type:'):
# Treat this as a fragment of a larger pipeline.
code_lines = [
'pipeline:',
' type: chain',
' transforms:',
' - type: ReadFromCsv',
' path: whatever',
] + [
' ' + line for line in code_lines
] # pylint: disable=not-an-iterable
if code_lines[0] == 'pipeline:':
yaml_pipeline = '\n'.join(code_lines)
if 'providers:' in yaml_pipeline:
test_type = 'PARSE'
yield test_name, create_test_method(
test_type,
test_name,
yaml_pipeline)
code_lines = None
elif code_lines is not None:
code_lines.append(line)


def createTestSuite():
with open(os.path.join(os.path.dirname(__file__), 'README.md')) as readme:
return type(
'ReadMeTest', (unittest.TestCase, ), dict(parse_test_methods(readme)))
def createTestSuite(name, path):
with open(path) as readme:
return type(name, (unittest.TestCase, ), dict(parse_test_methods(readme)))


ReadMeTest = createTestSuite()
ReadMeTest = createTestSuite(
'ReadMeTest', os.path.join(os.path.dirname(__file__), 'README.md'))

if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand Down
196 changes: 196 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Beam YAML mappings

Beam YAML has the ability to do simple transformations which can be used to
get data into the correct shape. The simplest of these is `MaptoFields`
which creates records with new fields defined in terms of the input fields.

## Field renames

To rename fields one can write

```
- type: MapToFields
fields:
new_col1: col1
new_col2: col2
```

will result in an output where each record has two fields,
`new_col1` and `new_col2`, whose values are those of `col1` and `col2`
respectively.

One can specify the append parameter which indicates the original fields should
be retained similar to the use of `*` in an SQL select statement. For example

```
- type: MapToFields
append: true
fields:
new_col1: col1
new_col2: col2
```

will output records that have `new_col1` and `new_col2` as *additional*
fields. When the append field is specified, one can drop fields as well, e.g.

```
- type: MapToFields
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
```

which includes all original fiels *except* col3 in addition to outputting the
two new ones.


## Mapping functions

Of course one may want to do transformations beyond just dropping and renaming
fields. Beam YAML has the ability to inline simple UDFs.
This requires a language specification. For example

```
- type: MapToFields
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
```

In addition, one can provide a full Python callable that takes the row as an
argument to do more complex mappings
(see [PythonCallableSource](https://beam.apache.org/releases/pydoc/current/apache_beam.utils.python_callable.html#apache_beam.utils.python_callable.PythonCallableWithSource)
for acceptable formats). Thus one can write

```
- type: MapToFields
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
```

Once one reaches a certain level of complexity, it may be preferable to package
this up as a dependency and simply refer to it by fully qualified name, e.g.

```
- type: MapToFields
language: python
fields:
new_col:
callable: pkg.module.fn
```

Currently, in addition to Python, SQL expressions are supported as well

```
- type: MapToFields
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
```

## FlatMap

Sometimes it may be desirable to emit more (or less) than one record for each
input record. This can be accomplished by mapping to an iterable type and
noting that the specific field should be exploded, e.g.

```
- type: MapToFields
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
explode: new_col
```

will result in three output records for every input record.

If more than one record is to be exploded, one must specify whether the cross
product over all fields should be taken. For example

```
- type: MapToFields
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
cross_product: true
```

will emit nine records whereas

```
- type: MapToFields
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
cross_product: false
```

will only emit three.

If one is only exploding existing fields, a simpler `Explode` transform may be
used instead

```
- type: Explode
explode: [col1]
```

## Filtering

Sometimes it can be desirable to only keep records that satisfy a certain
criteria. This can be accomplished by specifying a keep parameter, e.g.

```
- type: MapToFields
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
keep: "col2 > 0"
```

Like explode, there is a simpler `Filter` transform useful when no mapping is
being done

```
- type: Filter
language: sql
keep: "col2 > 0"
```
Loading

0 comments on commit ada507d

Please sign in to comment.