Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6368] Flink engine supports user impersonation #6383

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

wForget
Copy link
Member

@wForget wForget commented May 10, 2024

🔍 Description

Issue References 🔗

This pull request fixes #6368

Describe Your Solution 🔧

Support impersonation mode for flink sql engine.

Types of changes 🔖

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Test Plan 🧪

Behavior Without This Pull Request ⚰️

Behavior With This Pull Request 🎉

Test in hadoop-testing env.

Connection:

beeline -u "jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOST@TEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application;kyuubi.engine.share.level=CONNECTION;kyuubi.engine.flink.doAs.enabled=true;"

sql:

select 1;

result:

image

launch engine command:

2024-06-12 03:22:10.242 INFO KyuubiSessionManager-exec-pool: Thread-62 org.apache.kyuubi.engine.EngineRef: Launching engine:
/opt/flink-1.18.1/bin/flink run-application \
	-t yarn-application \
	-Dyarn.ship-files=/opt/flink/opt/flink-sql-client-1.18.1.jar;/opt/flink/opt/flink-sql-gateway-1.18.1.jar;/etc/hive/conf/hive-site.xml \
	-Dyarn.application.name=kyuubi_CONNECTION_FLINK_SQL_spark_6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	-Dyarn.tags=KYUUBI,6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	-Dcontainerized.master.env.FLINK_CONF_DIR=. \
	-Dcontainerized.master.env.HIVE_CONF_DIR=. \
	-Dyarn.security.appmaster.delegation.token.services=kyuubi \
	-Dsecurity.delegation.token.provider.HiveServer2.enabled=false \
	-Dsecurity.delegation.token.provider.hbase.enabled=false \
	-Dexecution.target=yarn-application \
	-Dsecurity.module.factory.classes=org.apache.flink.runtime.security.modules.JaasModuleFactory;org.apache.flink.runtime.security.modules.ZookeeperModuleFa
ctory \
	-Dsecurity.delegation.token.provider.hadoopfs.enabled=false \
	-c org.apache.kyuubi.engine.flink.FlinkSQLEngine /opt/apache-kyuubi-1.10.0-SNAPSHOT-bin/externals/engines/flink/kyuubi-flink-sql-engine_2.12-1.10.0-SNAPS
HOT.jar \
	--conf kyuubi.session.user=spark \
	--conf kyuubi.client.ipAddress=172.20.0.5 \
	--conf kyuubi.engine.credentials=SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2Fs
QFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiL
mxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA== \
	--conf kyuubi.engine.flink.doAs.enabled=true \
	--conf kyuubi.engine.hive.extra.classpath=/opt/hadoop/share/hadoop/client/*:/opt/hadoop/share/hadoop/mapreduce/* \
	--conf kyuubi.engine.share.level=CONNECTION \
	--conf kyuubi.engine.submit.time=1718162530017 \
	--conf kyuubi.engine.type=FLINK_SQL \
	--conf kyuubi.frontend.protocols=THRIFT_BINARY,REST \
	--conf kyuubi.ha.addresses=hadoop-master1.orb.local:2181 \
	--conf kyuubi.ha.engine.ref.id=6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	--conf kyuubi.ha.namespace=/kyuubi_1.10.0-SNAPSHOT_CONNECTION_FLINK_SQL/spark/6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	--conf kyuubi.server.ipAddress=172.20.0.5 \
	--conf kyuubi.session.connection.url=hadoop-master1.orb.local:10009 \
	--conf kyuubi.session.engine.startup.waitCompletion=false \
	--conf kyuubi.session.real.user=spark

launch engine log:

image

jobmanager job:

2024-06-12 03:22:26,400 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Loading delegation token providers
2024-06-12 03:22:26,992 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenProvider [] - Renew delegation token with engine credentials: SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA==
2024-06-12 03:22:27,100 INFO  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Add new unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72 6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65 72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 90 0a 77 9e bc 8a 01 90 2e 84 22 bc 16 0f
2024-06-12 03:22:27,104 WARN  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Ignore token with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020, Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=, realUser=kyuubi/hadoop-master1.orb.local@TEST.ORG, issueDate=1718162529936, maxDate=1718767329936, sequenceNumber=71, masterKeyId=28)
2024-06-12 03:22:27,104 INFO  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Update delegation tokens. The number of tokens sent by the server is 2. The actual number of updated tokens is 1.
......
4-06-12 03:22:29,414 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Starting tokens update task
2024-06-12 03:22:29,415 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - New delegation tokens arrived, sending them to receivers
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updating delegation tokens for current user
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, -112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 46, -124, 34, -112, 71, 28]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updated delegation tokens for current user successfully

taskmanager log:

2024-06-12 03:45:06,622 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive initial delegation tokens from resource manager
2024-06-12 03:45:06,627 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - New delegation tokens arrived, sending them to receivers
2024-06-12 03:45:06,628 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updating delegation tokens for current user
2024-06-12 03:45:06,629 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:45:06,630 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, -112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:45:06,630 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 46, -124, 34, -112, 71, 28]
2024-06-12 03:45:06,636 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updated delegation tokens for current user successfully
2024-06-12 03:45:06,636 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation tokens sent to receivers

Related Unit Tests


Checklist 📝

Be nice. Be informative.

@wForget wForget self-assigned this May 10, 2024
@codecov-commenter
Copy link

codecov-commenter commented May 10, 2024

Codecov Report

Attention: Patch coverage is 0% with 44 lines in your changes missing coverage. Please review.

Project coverage is 0.00%. Comparing base (04468a6) to head (2ec270e).
Report is 10 commits behind head on master.

Files with missing lines Patch % Lines
...ache/kyuubi/engine/flink/FlinkProcessBuilder.scala 0.00% 32 Missing ⚠️
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 0.00% 12 Missing ⚠️
Additional details and impacted files
@@          Coverage Diff           @@
##           master   #6383   +/-   ##
======================================
  Coverage    0.00%   0.00%           
======================================
  Files         684     684           
  Lines       42281   42327   +46     
  Branches     5768    5771    +3     
======================================
- Misses      42281   42327   +46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wForget wForget added this to the v1.10.0 milestone May 14, 2024
@wForget wForget marked this pull request as draft May 24, 2024 07:44
@pan3793 pan3793 changed the title [KYUUBI #6368] Support impersonation mode for flink sql engine [KYUUBI #6368] Flink engine supports user impersonation May 24, 2024
@@ -68,12 +68,10 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
}.toMap

try {
// Renewer is not needed. But setting a renewer can avoid potential NPE.
val renewer = UserGroupInformation.getCurrentUser.getUserName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is worth a dedicated PR.

@@ -58,14 +61,30 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] = conf.getOption("flink.execution.target")

private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) &&
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Spark, when doAs is enabled, we actually have a constraint to ensure that the principal should always be the session user.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of flink proxy user is different from the built-in --proxy-user of spark. It relies on HADOOP_PROXY_USER and has some limitations. It is difficult for us to control the behavior of the client well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should unify the concept on the Kyuubi layer as much as possible.

private def generateTokenFile(): Option[(String, String)] = {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to mix two approaches up, maybe we need a switch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FLINK-35525 has not yet been introduced in stable version. the hadoop token file way is currently only effective one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink 1.20 is on the way.

We can simply make the decision based on the Flink version, but considering vendors are likely to backport patches to their internal distributions, an explicit switch is preferred.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an explicit switch is preferred.

Do you mean add a configuration to control this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.

@wForget wForget marked this pull request as ready for review June 12, 2024 06:04

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

val ENGINE_FLINK_DOAS_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.flink.doAs.enabled")
.doc("Whether to enable using hadoop proxy user to run flink engine. Only takes effect" +
s" in kerberos environment and when `${ENGINE_DO_AS_ENABLED.key}` is set to `true`.")
Copy link
Member

@pan3793 pan3793 Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this configuration should be independent with kyuubi.engine.doAs.enabled. And we should fail the Flink engine bootstrap when user enables the configuration on a non-Kerberized environment.

the docs might be: "When enabled, the session user is used as the proxy user to launch the Flink engine, otherwise, the server user. Note, due to the limitation of Apache Flink, it can only be enabled on Kerberized environment."

@@ -58,14 +61,30 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] = conf.getOption("flink.execution.target")

private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) &&
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should unify the concept on the Kyuubi layer as much as possible.

private def generateTokenFile(): Option[(String, String)] = {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.

@bowenliang123
Copy link
Contributor

Hi @wForget , as we are about to cut the branch for 1.10.0, would you like to polish this PR or postpone it to next milestone?

@wForget
Copy link
Member Author

wForget commented Oct 17, 2024

Hi @wForget , as we are about to cut the branch for 1.10.0, would you like to polish this PR or postpone it to next milestone?

Yes, I hope to bring it into 1.10.0, I will complete it as soon as possible.

@wForget
Copy link
Member Author

wForget commented Oct 17, 2024

KyuubiDelegationTokenProvider is not loaded:

image

image

@bowenliang123
Copy link
Contributor

Is this a blocker issue with KyuubiDelegationTokenProvider ?

@wForget
Copy link
Member Author

wForget commented Oct 18, 2024

Is this a blocker issue with KyuubiDelegationTokenProvider ?

No, we can enable kyuubi.engine.flink.doAs.generateTokenFile to avoid it, and I will adjust the classpath to try to solve it.

pan3793 pushed a commit that referenced this pull request Oct 18, 2024
# 🔍 Description
## Issue References 🔗

Allow delegation tokens to be used and renewed by yarn resourcemanager. (used in proxy user mode of flink engine, address #6383 (comment))

## Describe Your Solution 🔧

Set hadoop fs delegation token renewer to empty.

## Types of changes 🔖

- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6753 from wForget/renewer.

Closes #6753

f2e1f0a [wforget] Set hadoop fs delegation token renewer to empty

Authored-by: wforget <643348094@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
" it can only be enabled on Kerberized environment.")
.version("1.10.0")
.booleanConf
.createWithDefault(false)
Copy link
Member

@pan3793 pan3793 Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we let kyuubi.engine.flink.doAs.enabled default value fallback to kyuubi.engine.doAs.enabled, and make flink engine exclusively respect kyuubi.engine.flink.doAs.enabled?

ignore this.

val ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.flink.doAs.generateTokenFile")
.doc("Whether to generate a hadoop token file for flink submit process." +
s" We need to enable it when we set `$ENGINE_FLINK_DOAS_ENABLED=true`" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s" We need to enable it when we set `$ENGINE_FLINK_DOAS_ENABLED=true`" +
s" We need to enable it when we set `${ENGINE_FLINK_DOAS_ENABLED.key}=true`" +

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be an internal flag -we can remove it anytime

I think the issue could also be addressed by YARN-10333 (correct me if I'm wrong), so the docs might be

"When ${ENGINE_FLINK_DOAS_ENABLED.key}=true and neither FLINK-35525 (Flink 1.20.0) nor YARN-10333 (Hadoop 3.4.0) is available, enable this configuration to generate a temporary HADOOP_TOKEN_FILE that will be picked up by the Flink engine bootstrap process."

Comment on lines +65 to +66
private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... it can only be enabled on Kerberized environment.

I think the logic here should be

if (conf.get(ENGINE_FLINK_DOAS_ENABLED)) {
  if (!UserGroupInformation.isSecurityEnabled) {
     log warning message
     false
  } else {
     true
  }
}

@@ -84,6 +84,7 @@
<module>kyuubi-util</module>
<module>kyuubi-util-scala</module>
<module>kyuubi-zookeeper</module>
<module>extensions/flink/kyuubi-flink-token-provider</module>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it to line 57

@pan3793
Copy link
Member

pan3793 commented Oct 18, 2024

the code seems does not mention YARN-10333?

@pan3793
Copy link
Member

pan3793 commented Oct 18, 2024

TODO things:

  1. decide how to distribute kyuubi-flink-token-provider-<version>.jar, either by including in the kyuubi bin dist tarball or guiding users to download it from Maven Central
  2. document this feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support impersonation mode for flink sql engine
4 participants