Skip to content

Commit

Permalink
[BEAM-13052] Increment pubsub python version and fix breakages. (apac…
Browse files Browse the repository at this point in the history
…he#16126)

* Increment pubsub python version and fix breakages.

This is needed to unblock adding a dependency on google-cloud-pubsublite

* Increment pubsub python version and fix breakages.

This is needed to unblock adding a dependency on google-cloud-pubsublite

* Fix call syntax for pubsub_matcher and remove return_immediately (which is irrevocably broken)

* fix delete operations and add logging to matcher exceptions

* fix test utils test

* Fix import

* Formatting

* fix serialization method

* fix acknowledge method

* fix acknowledge method tests

* fix to timestamp which no longer needs conversion

* Deflake for missing messages

* Deflake for missing messages

* Deflake for missing messages

* fix directrunner transform evaluator

* pull longer print errors and close client properly

* fix unit test

* fix unit test
  • Loading branch information
dpcollins-google committed Dec 28, 2021
1 parent f2f1bdf commit acff5d1
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ def setUp(self):
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
name=self.pub_client.topic_path(
self.project, self.INPUT_TOPIC + _unique_id))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(
name=self.sub_client.subscription_path(
self.project, self.INPUT_SUB + _unique_id),
self.input_topic.name)
topic=self.input_topic.name)

# Set up BigQuery environment
self.dataset_ref = utils.create_bq_dataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ def setUp(self):

self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
name=self.pub_client.topic_path(
self.project, self.INPUT_TOPIC + _unique_id))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(
name=self.sub_client.subscription_path(
self.project, self.INPUT_SUB + _unique_id),
self.input_topic.name)
topic=self.input_topic.name)

# Set up BigQuery environment
self.dataset_ref = utils.create_bq_dataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,19 @@ def setup_pubsub(self):
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
name=self.sub_client.subscription_path(
self.project, INPUT_SUB + self.uuid),
topic=self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name,
name=self.sub_client.subscription_path(
self.project, OUTPUT_SUB + self.uuid),
topic=self.output_topic.name,
ack_deadline_seconds=60)

def _inject_data(self, topic, data):
Expand Down
14 changes: 8 additions & 6 deletions sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,19 @@ def setUp(self):
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
name=self.sub_client.subscription_path(
self.project, INPUT_SUB + self.uuid),
topic=self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name,
name=self.sub_client.subscription_path(
self.project, OUTPUT_SUB + self.uuid),
topic=self.output_topic.name,
ack_deadline_seconds=60)

def _inject_numbers(self, topic, num_messages):
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,12 +1316,13 @@ def setUp(self):
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(
self.project, self.INPUT_TOPIC + self.uuid))
self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(
name=self.sub_client.subscription_path(
self.project, self.INPUT_SUB + self.uuid),
self.input_topic.name)
topic=self.input_topic.name)

# Set up BQ
self.dataset_ref = utils.create_bq_dataset(
Expand Down
15 changes: 7 additions & 8 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ def _from_proto_str(proto_msg):
Returns:
A new PubsubMessage object.
"""
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.ParseFromString(proto_msg)
msg = pubsub.types.PubsubMessage.deserialize(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
return PubsubMessage(
msg.data,
attributes,
msg.message_id,
msg.publish_time.ToDatetime(),
msg.publish_time,
msg.ordering_key)

def _to_proto_str(self, for_publish=False):
Expand All @@ -141,16 +140,16 @@ def _to_proto_str(self, for_publish=False):
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
containing the payload of this object.
"""
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg = pubsub.types.PubsubMessage()
msg.data = self.data
for key, value in self.attributes.items():
msg.attributes[key] = value
if self.message_id and not for_publish:
msg.message_id = self.message_id
if self.publish_time and not for_publish:
msg.publish_time = msg.publish_time.FromDatetime(self.publish_time)
msg.publish_time = self.publish_time
msg.ordering_key = self.ordering_key
return msg.SerializeToString()
return pubsub.types.PubsubMessage.serialize(msg)

@staticmethod
def _from_message(msg):
Expand Down Expand Up @@ -335,9 +334,9 @@ def message_to_proto_str(element):
@staticmethod
def bytes_to_proto_str(element):
# type: (bytes) -> bytes
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg = pubsub.types.PubsubMessage()
msg.data = element
return msg.SerializeToString()
return pubsub.types.PubsubMessage.serialize(msg)

def expand(self, pcoll):
if self.with_attributes:
Expand Down
21 changes: 14 additions & 7 deletions sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# pytype: skip-file

import logging
import time
import unittest
import uuid

Expand Down Expand Up @@ -139,17 +140,22 @@ def setUp(self):
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
name=self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
name=self.sub_client.subscription_path(
self.project, INPUT_SUB + self.uuid),
topic=self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name)
name=self.sub_client.subscription_path(
self.project, OUTPUT_SUB + self.uuid),
topic=self.output_topic.name)
# Add a 30 second sleep after resource creation to ensure subscriptions will
# receive messages.
time.sleep(30)

def tearDown(self):
test_utils.cleanup_subscriptions(
Expand Down Expand Up @@ -194,7 +200,8 @@ def _test_streaming(self, with_attributes):

# Generate input data and inject to PubSub.
for msg in self.INPUT_MESSAGES[self.runner_name]:
self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes)
self.pub_client.publish(
self.input_topic.name, msg.data, **msg.attributes).result()

# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ def _setup_pipeline(self):

def _setup_pubsub(self):
super()._setup_pubsub()
_ = self.pub_client.create_topic(self.topic_name)
_ = self.pub_client.create_topic(name=self.topic_name)

_ = self.sub_client.create_subscription(
self.read_sub_name,
self.topic_name,
name=self.read_sub_name,
topic=self.topic_name,
)


Expand Down Expand Up @@ -184,11 +184,11 @@ def test(self):

def _setup_pubsub(self):
super()._setup_pubsub()
_ = self.pub_client.create_topic(self.matcher_topic_name)
_ = self.pub_client.create_topic(name=self.matcher_topic_name)

_ = self.sub_client.create_subscription(
self.read_matcher_sub_name,
self.matcher_topic_name,
name=self.read_matcher_sub_name,
topic=self.matcher_topic_name,
)

def _setup_pipeline(self):
Expand Down
33 changes: 13 additions & 20 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,9 @@ def test_read_messages_success(self, mock_pubsub):
with_attributes=True))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_strings_success(self, mock_pubsub):
data = u'🤷 ¯\\_(ツ)_/¯'
Expand All @@ -526,10 +525,9 @@ def test_read_strings_success(self, mock_pubsub):
'projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_data_success(self, mock_pubsub):
data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
Expand All @@ -547,10 +545,9 @@ def test_read_data_success(self, mock_pubsub):
| ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
data = b'data'
Expand Down Expand Up @@ -583,10 +580,9 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
data = b'data'
Expand Down Expand Up @@ -619,10 +615,9 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
data = b'data'
Expand Down Expand Up @@ -656,10 +651,9 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
timestamp_attribute='nonexistent'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(mock.ANY, [ack_id])])
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
data = b'data'
Expand Down Expand Up @@ -688,8 +682,7 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
p.run()
mock_pubsub.return_value.acknowledge.assert_not_called()

mock_pubsub.return_value.api.transport.channel.close.assert_has_calls(
[mock.call()])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])

def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
# id_label is unsupported in DirectRunner.
Expand Down
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ def _wait_for_messages(self, expected_num, timeout):
start_time = time.time()
while time.time() - start_time <= timeout:
response = sub_client.pull(
self.sub_name,
subscription=self.sub_name,
max_messages=self.max_messages_in_one_pull,
return_immediately=True,
timeout=self.pull_timeout)
for rm in response.received_messages:
msg = PubsubMessage._from_message(rm.message)
Expand All @@ -149,7 +148,7 @@ def _wait_for_messages(self, expected_num, timeout):

ack_ids = [rm.ack_id for rm in response.received_messages]
if ack_ids:
sub_client.acknowledge(self.sub_name, ack_ids)
sub_client.acknowledge(subscription=self.sub_name, ack_ids=ack_ids)
if len(total_messages) >= expected_num:
break
time.sleep(self.sleep_time)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def read_from_pubsub(
except (gexc.RetryError, gexc.DeadlineExceeded):
continue
ack_ids = [msg.ack_id for msg in response.received_messages]
sub_client.acknowledge(subscription_path, ack_ids)
sub_client.acknowledge(subscription=subscription_path, ack_ids=ack_ids)
for msg in response.received_messages:
message = PubsubMessage._from_message(msg.message)
if with_attributes:
Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/io/gcp/tests/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def test_read_from_pubsub(self):
output = utils.read_from_pubsub(
mock_pubsub, subscription_path, number_of_elements=1)
self.assertEqual([data], output)
mock_pubsub.acknowledge.assert_called_once_with(subscription_path, [ack_id])
mock_pubsub.acknowledge.assert_called_once_with(
subscription=subscription_path, ack_ids=[ack_id])

def test_read_from_pubsub_with_attributes(self):
mock_pubsub = mock.Mock()
Expand All @@ -164,7 +165,8 @@ def test_read_from_pubsub_with_attributes(self):
with_attributes=True,
number_of_elements=1)
self.assertEqual([message], output)
mock_pubsub.acknowledge.assert_called_once_with(subscription_path, [ack_id])
mock_pubsub.acknowledge.assert_called_once_with(
subscription=subscription_path, ack_ids=[ack_id])

def test_read_from_pubsub_flaky(self):
number_of_elements = 10
Expand Down Expand Up @@ -253,7 +255,7 @@ def test_read_from_pubsub_invalid_arg(self):
def _assert_ack_ids_equal(self, mock_pubsub, ack_ids):
actual_ack_ids = [
ack_id for args_list in mock_pubsub.acknowledge.call_args_list
for ack_id in args_list[0][1]
for ack_id in args_list[1]["ack_ids"]
]
self.assertEqual(actual_ack_ids, ack_ids)

Expand Down
Loading

0 comments on commit acff5d1

Please sign in to comment.