Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25501][SS] Add kafka delegation token support. #22598

Closed
wants to merge 25 commits into from

Conversation

gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

It adds kafka delegation token support for structured streaming. Please see the relevant SPIP

What this PR contains:

  • Configuration parameters for the feature
  • Delegation token fetching from broker
  • Usage of token through dynamic JAAS configuration
  • Minor refactoring in the existing code

What this PR doesn't contain:

  • Documentation changes because design can change

How was this patch tested?

Existing tests + added small amount of additional unit tests.

Because it's an external service integration mainly tested on cluster.

  • 4 node cluster
  • Kafka broker version 1.1.0
  • Topic with 4 partitions
  • security.protocol = SASL_SSL
  • sasl.mechanism = SCRAM-SHA-256

An example of obtaining a token:

18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID         HMAC                           OWNER           RENEWERS                  ISSUEDATE       EXPIRYDATE      MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user    []                        2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67

An example token usage:

18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.

@gaborgsomogyi
Copy link
Contributor Author

ok to test

@HyukjinKwon
Copy link
Member

What's diff with #22550 ? Let's close one of them. +@merlintang

@gaborgsomogyi
Copy link
Contributor Author

That said on the jira the mentioned PR by @merlintang is crashing on my cluster so not proposed for merge. Please close it.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As embedded Kafka is available for testing I would say having a unittest where the DT is tested is quite important for this change (especially to assure further developments does not break this feature).

@merlintang
Copy link

@gaborgsomogyi thanks for your PR, I am going through the details and test on my local machine.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96814 has finished for PR 22598 at commit f9b4685.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier

 * Made KafkaDelegationTokenIdentifier private
 * Kerberos module name fix for IBM JVM
 * ScramLoginModule comes from the class name which fails compile time if it's moved
 * Extended HadoopDelegationTokenManagerSuite tests
 * Moved token parameter set into ConfigUpdater
@SparkQA
Copy link

SparkQA commented Oct 3, 2018

Test build #96871 has finished for PR 22598 at commit 2a79d59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our previous talks the configuration stuff looked ok, given the limitations of how the Kafka connector is initialized. Couldn't really think of a different way that could cover all the cases.

If there's any unit test that can be written for the TokenUtil code that would be great.

 * Removed the additional token enable flag.
 * Increased test coverage.
 * Fixes for @vanzin's comments.
@SparkQA
Copy link

SparkQA commented Oct 8, 2018

Test build #97114 has finished for PR 22598 at commit a4ab4f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 8, 2018

Test build #97115 has finished for PR 22598 at commit 8c860fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 9, 2018

Test build #97151 has finished for PR 22598 at commit a2c4397.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

 * Added comment for getKrb5LoginModuleName.
 * Used === operator in tests instead of equals.
@SparkQA
Copy link

SparkQA commented Oct 10, 2018

Test build #97188 has finished for PR 22598 at commit fffd139.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've finished taking a first look at the diff. Please note that I've added some comments in comment threads which are marked as resolved.

Btw, looks like manual test is the only way to verify this patch: while I don't have time/environment to try this out, but I'll try to do it when I'm ready.

 * Reordered TokenUtil methods to make it more readable.
 * Enhanced additional parameters doc to make things more explicit.
@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97612 has finished for PR 22598 at commit 7c22433.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97613 has finished for PR 22598 at commit 7c22433.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99330 has finished for PR 22598 at commit 36d05d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99453 has finished for PR 22598 at commit cbff31c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks ok pending tests and a few minor things.

 * Move configs to Kafka.scala
 * Minor cosmetic changes
@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99469 has finished for PR 22598 at commit 2a0b1c3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 30, 2018

Test build #99474 has finished for PR 22598 at commit c0519f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 30, 2018

Test build #99476 has finished for PR 22598 at commit a122865.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Nov 30, 2018

Merging to master.

@asfgit asfgit closed this in 0166c73 Nov 30, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)

What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code

What this PR doesn't contain:
* Documentation changes because design can change

## How was this patch tested?

Existing tests + added small amount of additional unit tests.

Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256

An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID         HMAC                           OWNER           RENEWERS                  ISSUEDATE       EXPIRYDATE      MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user    []                        2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```

An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```

Closes apache#22598 from gaborgsomogyi/SPARK-25501.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Kafka delegation token support implemented in [PR#22598](apache#22598) but that didn't contain documentation because of rapid changes. Because it has been merged in this PR I've documented it.

## How was this patch tested?
jekyll build + manual html check

Closes apache#23195 from gaborgsomogyi/SPARK-26236.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@zsxwing
Copy link
Member

zsxwing commented Mar 19, 2019

@gaborgsomogyi why all these configs use Spark conf? Does this mean we cannot support multiple tokens? We have supported to pass kafka configs via data source options, why not use data source options instead? Sorry if I missed any discussions here.

@gaborgsomogyi
Copy link
Contributor Author

@zsxwing this part is described in the SPIP with possible alternatives. Yeah, multiple tokens not supported.

@gaborgsomogyi
Copy link
Contributor Author

We have supported to pass kafka configs via data source options, why not use data source options instead?

As a small extract these parameters are parsed way after the delegation token subsystem started and tokens obtained.

@koertkuipers
Copy link
Contributor

this is exciting to me.

i tested it on a kafka 2.2.0 cluster that uses gssapi/kerberos for authentication, by enabling token support in kafka. to be specific in server.properties i changed:

- sasl.enabled.mechanisms=GSSAPI
+ sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512
+ delegation.token.master.key=somesecret

note that i kept sasl.mechanism.inter.broker.protocol=GSSAPI

i could see my spark structured streaming job obtain kafka token successfully. after that i ran into some issues in driver that seemed kafka specific. i added comments here:

https://issues.apache.org/jira/browse/KAFKA-7631

@gaborgsomogyi
Copy link
Contributor Author

Good to hear its used. Ping me if you see any Spark related issue. BTW, multi-cluster support is on the way...

@koertkuipers
Copy link
Contributor

this is exciting to me.

i tested it on a kafka 2.2.0 cluster that uses gssapi/kerberos for authentication, by enabling token support in kafka. to be specific in server.properties i changed:

- sasl.enabled.mechanisms=GSSAPI
+ sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512
+ delegation.token.master.key=somesecret

note that i kept sasl.mechanism.inter.broker.protocol=GSSAPI

i could see my spark structured streaming job obtain kafka token successfully. after that i ran into some issues in driver that seemed kafka specific. i added comments here:

https://issues.apache.org/jira/browse/KAFKA-7631

after adding the scram login module to my broker jaas configs (so brokers have both kerberos and scram) the kafka specific issue was resolved. so now everything works. i will be doing long running tests over next few days/weeks. thanks again.

@koertkuipers
Copy link
Contributor

i have been testing long running structured streaming jobs from and to kafka using delegation tokens. the driver is launched by a user with a kerberos login and keytab, principal and keytab are provided to spark-submit. trigger is hourly.
i see no errors in executor logs.
in driver i see this starting 24 hours after job was launched (i assume this is when kerberos ticket expires?):

2019-05-31 17:05:03 WARN internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-05-31 17:05:03 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Member consumer-1-3f19c416-c17a-41f6-b710-67f50dd7a568 sending LeaveGroup request to coordinator node05.company.com:9092 (id: 2147483592 rack: null)
2019-05-31 18:00:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Revoking previously assigned partitions 
[twitter-0]
2019-05-31 18:00:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] (Re-)joining group
2019-05-31 18:00:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Group coordinator node05.company.com:9092 (id: 2147483592 rack: null) is unavailable or invalid, will attempt rediscovery
2019-05-31 18:00:00 INFO network.Selector: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Failed authentication with node06.company.com/10.0.0.56 (Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512)
2019-05-31 18:00:00 ERROR clients.NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Connection to node 56 (node06.company.com/10.0.0.56:9092) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
2019-05-31 18:00:00 INFO network.Selector: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Failed authentication with node10.company.com/10.0.0.60 (Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512)
2019-05-31 18:00:00 ERROR clients.NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-0] Connection to node 60 (node10.company.com/10.0.0.60:9092) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
2019-05-31 18:00:00 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
2019-05-31 18:00:01 INFO consumer.ConsumerConfig: ConsumerConfig values: 
        <redacted>
2019-05-31 18:00:01 INFO authenticator.AbstractLogin: Successfully logged in.
2019-05-31 18:00:01 INFO utils.AppInfoParser: Kafka version: 2.2.0
2019-05-31 18:00:01 INFO utils.AppInfoParser: Kafka commitId: 05fcfde8f69b0349
2019-05-31 18:00:01 INFO consumer.KafkaConsumer: [Consumer clientId=consumer-2, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-1] Subscribed to topic(s): <redacted>
2019-05-31 18:00:01 INFO clients.Metadata: Cluster ID: <redacted>
2019-05-31 18:00:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-2, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-1] Discovered group coordinator node06.company.com:9092 (id: 2147483591 rack: null)
2019-05-31 18:00:01 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-2, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-1] Revoking previously assigned partitions 
[]
2019-05-31 18:00:01 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-2, groupId=spark-kafka-source-84afdf97-b7e0-468e-9ee1-d7ab50e81b85-1237283319-driver-1] (Re-)joining group

it seems like login with SCRAM fails a few times and then succeeds again. is this expected?

@gaborgsomogyi
Copy link
Contributor Author

Since Kafka doesn't support dynamic parameter settings, yes.

@koertkuipers
Copy link
Contributor

koertkuipers commented Jun 4, 2019 via email

@koertkuipers
Copy link
Contributor

we just upgraded our dev cluster from SASL_PLAINTEXT to SASL_SSL but something is wrong with the certificates (or with kafkas hostname resolution? not sure yet). so as a workaround i temporarily need to set:
ssl.endpoint.identification.algorithm = "" (blank, which disables hostname verification)
i know how to do this in spark structured streaming for kafka in general. but its not clear how i do this for the kafka token provider.
i can see the kafka admin client being created:

2019-06-07 10:17:06 INFO admin.AdminClientConfig: AdminClientConfig values: 
        <redacted>
        ssl.endpoint.identification.algorithm = https
        <redacted>

how i can generally change any of these settings?
is there a key like
--conf spark.kafka.ssl.endpoint.identification.algorithm = ""
that i can use?
or perhaps per kafka cluster, like
--conf spark.kafka.clusters.<cluster>.ssl.endpoint.identification.algorithm=""

in future i could see how there are other settings in this kafka admin clients configuration that i would like to change as well, so i am looking for a general way to update it in a deployment.

thanks!

@gaborgsomogyi
Copy link
Contributor Author

It's not yet possible. I'm having one week vacation but after that I'll add this feature.

@koertkuipers
Copy link
Contributor

koertkuipers commented Jun 7, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.