Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Add yaml to programming guide. (apache#30269)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Feb 28, 2024
1 parent fbde0ce commit 4f966a8
Show file tree
Hide file tree
Showing 10 changed files with 731 additions and 13 deletions.
17 changes: 17 additions & 0 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,23 @@ def extract_output(self, accumulator):
return accumulator


@with_input_types(T)
@with_output_types(T)
class ConcatListCombineFn(core.CombineFn):
"""CombineFn for concatenating lists together."""
def create_accumulator(self):
return []

def add_input(self, accumulator, element):
return accumulator + element

def merge_accumulators(self, accumulators):
return sum(accumulators, [])

def extract_output(self, accumulator):
return accumulator


@with_input_types(Tuple[K, V])
@with_output_types(Dict[K, V])
class ToDict(CombinerWithoutDefaults):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ def convert_to_typing_type(typ):
if isinstance(typ, typehints.IterableTypeConstraint):
return typing.Iterable[convert_to_typing_type(typ.inner_type)]
if isinstance(typ, typehints.UnionConstraint):
if not typ.union_types:
# Gracefully handle the empty union type.
return typing.Any
return typing.Union[tuple(convert_to_typing_types(typ.union_types))]
if isinstance(typ, typehints.SetTypeConstraint):
return typing.Set[convert_to_typing_type(typ.inner_type)]
Expand Down
11 changes: 7 additions & 4 deletions sdks/python/apache_beam/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ def of(seconds):
Corresponding Timestamp object.
"""

if not isinstance(seconds, (int, float, Timestamp)):
raise TypeError(
'Cannot interpret %s %s as Timestamp.' % (seconds, type(seconds)))
if isinstance(seconds, Timestamp):
return seconds
return Timestamp(seconds)
elif isinstance(seconds, (int, float)):
return Timestamp(seconds)
elif isinstance(seconds, datetime.datetime):
return Timestamp.from_utc_datetime(seconds)
else:
raise TypeError(
'Cannot interpret %s %s as Timestamp.' % (seconds, type(seconds)))

@staticmethod
def now():
Expand Down
Loading

0 comments on commit 4f966a8

Please sign in to comment.