Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude all attributes starting with "goog_" from outbound Pub/Sub headers #845

Merged
merged 2 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class PubSubHeaderMapper implements HeaderMapper<Map<String, String>> {
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
"!" + GcpPubSubHeaders.ORIGINAL_MESSAGE,
"!" + GcpPubSubHeaders.CLIENT,
"!" + NativeMessageHeaderAccessor.NATIVE_HEADERS,
"!" + MessageHistory.HEADER_NAME,
"*"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ private GcpPubSubHeaders() {

private static final String PREFIX = "gcp_pubsub_";

/**
* The client header text.
*/
public static final String CLIENT = "googclient_*";

/**
* The topic header text.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ public class PubSubHeaderMapperTests {
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testFilterGoogleClientHeaders() {
PubSubHeaderMapper mapper = new PubSubHeaderMapper();
Map<String, Object> originalHeaders = new HashMap<>();
originalHeaders.put("my header", "pantagruel's nativity");
MessageHeaders internalHeaders = new MessageHeaders(originalHeaders);

originalHeaders.put("googclient_deliveryattempt", "header attached when DLQ is enabled");
originalHeaders.put("googclient_anyHeader", "any other possible headers");

Map<String, String> filteredHeaders = new HashMap<>();
mapper.fromHeaders(internalHeaders, filteredHeaders);
assertThat(filteredHeaders)
.hasSize(1)
.containsEntry("my header", "pantagruel's nativity");
}


@Test
public void testFilterHeaders() {
PubSubHeaderMapper mapper = new PubSubHeaderMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,26 @@

package com.example;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.output.TeeOutputStream;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assume.assumeThat;

/**
* These tests verifies that the pubsub-polling-binder-sample works.
Expand All @@ -48,40 +44,21 @@
*
* @since 1.2
*/
@RunWith(SpringRunner.class)
//Please use "-Dit.pubsub=true" to enable the tests
@EnabledIfSystemProperty(named = "it.pubsub", matches = "true")
@ExtendWith(OutputCaptureExtension.class)
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"spring.cloud.stream.bindings.input.destination=sub1",
"spring.cloud.stream.bindings.output.destination=sub1" })
@DirtiesContext
public class SampleAppIntegrationTest {
class SampleAppIntegrationTest {

@Autowired
private TestRestTemplate restTemplate;

private static PrintStream systemOut;

private static ByteArrayOutputStream baos;

@BeforeClass
public static void prepare() {
assumeThat(
"PUB/SUB-sample integration tests are disabled. Please use '-Dit.pubsub=true' "
+ "to enable them. ",
System.getProperty("it.pubsub"), is("true"));

systemOut = System.out;
baos = new ByteArrayOutputStream();
TeeOutputStream out = new TeeOutputStream(systemOut, baos);
System.setOut(new PrintStream(out));
}

@AfterClass
public static void bringBack() {
System.setOut(systemOut);
}

@Test
public void testSample() throws Exception {
void testSample(CapturedOutput capturedOutput) throws Exception {
MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
String message = "test message " + UUID.randomUUID();

Expand All @@ -90,7 +67,7 @@ public void testSample() throws Exception {

this.restTemplate.postForObject("/newMessage", map, String.class);

Callable<Boolean> logCheck = () -> baos.toString().contains("New message received from testUserName via polling: " + message);
Callable<Boolean> logCheck = () -> capturedOutput.toString().contains("New message received from testUserName via polling: " + message);
Awaitility.await().atMost(60, TimeUnit.SECONDS)
.until(logCheck);

Expand Down