diff --git a/Cargo.lock b/Cargo.lock index cd8805859ac88..df42e798b7144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3824,9 +3824,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.70.0" +version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcc72fdf0c491160a34d4a1bfb03f96da8a5054288d61c816d514b5c2fa49ea" +checksum = "977951a1d7dbce4c6f9c2cbed0711f568df9944010fafa88161a20f7e5163bd3" dependencies = [ "k8s-openapi", "kube-client", @@ -3836,9 +3836,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.70.0" +version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b01c722d55ffedec74cbc259b4508d8a59bf19540006ec87618f76ab156579" +checksum = "625e8a89c79dd4e590217838f32274ccbb0f4ecc1db181f84aba3df635fca729" dependencies = [ "base64 0.13.0", "bytes 1.1.0", @@ -3872,9 +3872,9 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.70.0" +version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd9e3535777edd122cc26fe3fe6357066b33eff63d8b919862edbe7a956a679" +checksum = "0491dcd9adca79a96c63404aa978137f5fe1d3db5dcb6e5c0b073f915dbdd49d" dependencies = [ "chrono", "form_urlencoded", @@ -3889,9 +3889,9 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.70.0" +version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "816c8c086f8bbcf9a4db0b7a68db90b784ef6292a57de35c64cccb90d5edfbe5" +checksum = "bb67002cbacd6cd99d1b1d0890785a84b6408158972f2561317bc4cdc06c981e" dependencies = [ "ahash", "backoff", @@ -7957,9 +7957,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.2.4" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c125fdea84614a4368fd35786b51d0682ab8d42705e061e92f0b955dea40fb" +checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae" dependencies = [ "base64 0.13.0", "bitflags", diff --git a/Cargo.toml b/Cargo.toml index 8981837bde707..4f54bbd7d274b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -238,7 +238,7 @@ infer = { version = "0.7.0", default-features = false, optional = true} indoc = { version = "1.0.4", default-features = false } inventory = { version = "0.1.10", default-features = false } k8s-openapi = { version = "0.14.0", default-features = false, features = ["api", "v1_16"], optional = true } -kube = { version = "0.70.0", default-features = false, features = ["client", "native-tls", "runtime"], optional = true } +kube = { version = "0.72.0", default-features = false, features = ["client", "native-tls", "runtime"], optional = true } listenfd = { version = "0.5.0", default-features = false, optional = true } logfmt = { version = "0.0.2", default-features = false, optional = true } lru = { version = "0.7.5", default-features = false, optional = true } diff --git a/vendor/kube-client/.cargo-checksum.json b/vendor/kube-client/.cargo-checksum.json index 47af97de83eb1..f3a0aa4d3cefc 100644 --- a/vendor/kube-client/.cargo-checksum.json +++ b/vendor/kube-client/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"bccd1a87a88b6bac9bd3cb074eae802e3f53d5601812ed5dac50eecdce23059f","README.md":"64461d106b16b7c13f1c646813ed55a6501bd02062d3e88385642af6f8ed96e4","src/api/core_methods.rs":"13cb918209159b193635b7a230e48b876d257dc95d9e6773f1950c3e42b2429a","src/api/entry.rs":"c1bb3454975eac9a982587a62002b133747a2a55c2e75847cb53f056131b4ea2","src/api/mod.rs":"7594e94e06c5a6fab1fc491da116d477b1168567ba3778289afabdc369d31d18","src/api/portforward.rs":"36e43a130f7258dc1b87adf5331c5be0fb855302a6ccb27df730716dc634e580","src/api/remote_command.rs":"ccd308f60976d11cb0bd24fdac663de4e4b220c2e3435d8d5578cf6d5721d8f4","src/api/subresource.rs":"16a9c6c8a29a810b4e312ce143c368cffec254c7408a6b885179e4e55e8b93a0","src/api/util/csr.rs":"7c220fe59b2397f2bd10bcc2ab098072b15737228baedb8abcfa65c8e435aa3d","src/api/util/mod.rs":"8bb8aade37e9b68c835a83eff399bbb619bc44641140c50c32565f14257d3086","src/client/auth/mod.rs":"28ada7d2cf13ea991a74abb04616a3c19be1cd67fd890613a10bc680e51b2ee6","src/client/auth/oauth.rs":"cdc6c58d5c0c141eb786db6b7f35a1c250b28820c29330417bd7175a6f39aa93","src/client/body.rs":"0b1488596c30a699f7afe1ebbc705178f61a6e6bdbc7bcd1f6111857c1fe2f62","src/client/config_ext.rs":"22923ebe9c7ec00a7289fd96a76bfd7b982a7ab65779f0f678fb96159438a5f9","src/client/middleware/base_uri.rs":"3b50e8e119b9ae8bbdd21e57331922e5fd067a95d0f74299665008eccde1f1c7","src/client/middleware/extra_headers.rs":"956711fcf36dd0ee5dc0be9dc740f463760051370cab3226d75a1cd42804b78f","src/client/middleware/mod.rs":"ecef232d946a2365dc383006acc6b7fcb7d658914e9bb9cfef35a0b7d873dcc3","src/client/mod.rs":"57fe8619ace507b30fa9a8d431fc1d89767b66de12c1dee068960d48790d4721","src/client/tls.rs":"5d4d870dc7fb115f668d9346eabc7e59fc6c058bb7848a688942b49430afc8e6","src/client/upgrade.rs":"4a29a94ace955d3841b42b15ebb70c38a1dcc5631b9fc7f994c2dc5093ffccfd","src/config/file_config.rs":"002a63ba9ed94a9cd17d756b9e69732f3e0ab63a58b8c7c8f500748f29c352fa","src/config/file_loader.rs":"fce1cbc13421c3742447ae3efb61b47a62e5e4da70e9dd286970fa9b3f8cb6e8","src/config/incluster_config.rs":"3438c25e841032b78b268a5bef37084466c15f3030cc01d9a0bccd1e4923666a","src/config/mod.rs":"36298beacee4c4b9aaa24446f971dbf4cb3c7c03f5c0fdae13370f6a48a7fe41","src/discovery/apigroup.rs":"7014dacc44d30de6817ef20d91ab0516ada5c7d1c1fab167d986750a2430040c","src/discovery/mod.rs":"439d2958c67029ac5ea61193dfc8572f41aaed3b325833ea6cc4f74f2c585aa1","src/discovery/oneshot.rs":"e4b8812acbed969b4389847b964c17d18926486e05a3e76f7fedbe6a83703573","src/discovery/parse.rs":"972287ffff2d7d9a3343e2d244e5f2ca10168807111da7af0cf410470dc0a085","src/error.rs":"8a2ba71c15d724aa473649995f7c47c3fe548e1fd19b8f41768295abd78a5f3c","src/lib.rs":"4a289497d46b64338046e705c866e82ec2127a53cfa6bfc6f7bd087b7ab068b5"},"package":"94b01c722d55ffedec74cbc259b4508d8a59bf19540006ec87618f76ab156579"} \ No newline at end of file +{"files":{"Cargo.toml":"168d20503b3d0417fad36dd4e645b9b555808b4f6edd79137747122593ac9f2c","README.md":"64461d106b16b7c13f1c646813ed55a6501bd02062d3e88385642af6f8ed96e4","src/api/core_methods.rs":"13cb918209159b193635b7a230e48b876d257dc95d9e6773f1950c3e42b2429a","src/api/entry.rs":"c1bb3454975eac9a982587a62002b133747a2a55c2e75847cb53f056131b4ea2","src/api/mod.rs":"7594e94e06c5a6fab1fc491da116d477b1168567ba3778289afabdc369d31d18","src/api/portforward.rs":"617569d6c441ee9a6ec4080cac4371a8e6a34fa21ec35347db783b68e9db54d6","src/api/remote_command.rs":"4bed56f240224d3aff309fe669f253a3219165717a3fc97a0d55b13362d2f6a2","src/api/subresource.rs":"16a9c6c8a29a810b4e312ce143c368cffec254c7408a6b885179e4e55e8b93a0","src/api/util/csr.rs":"7c220fe59b2397f2bd10bcc2ab098072b15737228baedb8abcfa65c8e435aa3d","src/api/util/mod.rs":"8bb8aade37e9b68c835a83eff399bbb619bc44641140c50c32565f14257d3086","src/client/auth/mod.rs":"28ada7d2cf13ea991a74abb04616a3c19be1cd67fd890613a10bc680e51b2ee6","src/client/auth/oauth.rs":"cdc6c58d5c0c141eb786db6b7f35a1c250b28820c29330417bd7175a6f39aa93","src/client/body.rs":"0b1488596c30a699f7afe1ebbc705178f61a6e6bdbc7bcd1f6111857c1fe2f62","src/client/builder.rs":"3c9988718a118fc7cd781ef1d661335c7b4db7df9239c9c0d987db7f152031e7","src/client/config_ext.rs":"22923ebe9c7ec00a7289fd96a76bfd7b982a7ab65779f0f678fb96159438a5f9","src/client/middleware/base_uri.rs":"3b50e8e119b9ae8bbdd21e57331922e5fd067a95d0f74299665008eccde1f1c7","src/client/middleware/extra_headers.rs":"956711fcf36dd0ee5dc0be9dc740f463760051370cab3226d75a1cd42804b78f","src/client/middleware/mod.rs":"ecef232d946a2365dc383006acc6b7fcb7d658914e9bb9cfef35a0b7d873dcc3","src/client/mod.rs":"8da573bb41334711f62532f638075272d66c958a55583c1bd16cf433b95e7a6f","src/client/tls.rs":"5d4d870dc7fb115f668d9346eabc7e59fc6c058bb7848a688942b49430afc8e6","src/client/upgrade.rs":"4a29a94ace955d3841b42b15ebb70c38a1dcc5631b9fc7f994c2dc5093ffccfd","src/config/file_config.rs":"002a63ba9ed94a9cd17d756b9e69732f3e0ab63a58b8c7c8f500748f29c352fa","src/config/file_loader.rs":"964bfa370427c6b8013941a8d77eee2048910b2ba634dc58c838a8f4704a617b","src/config/incluster_config.rs":"ace6378286986e18c005d5b20ad0b5e0c9c2fdb28383b836f8de52fff10ff597","src/config/mod.rs":"08fba1f28f39eb79348d93ecf352791e5acc92fa42e020ee2558bdf9895ff7e6","src/discovery/apigroup.rs":"7014dacc44d30de6817ef20d91ab0516ada5c7d1c1fab167d986750a2430040c","src/discovery/mod.rs":"439d2958c67029ac5ea61193dfc8572f41aaed3b325833ea6cc4f74f2c585aa1","src/discovery/oneshot.rs":"e4b8812acbed969b4389847b964c17d18926486e05a3e76f7fedbe6a83703573","src/discovery/parse.rs":"972287ffff2d7d9a3343e2d244e5f2ca10168807111da7af0cf410470dc0a085","src/error.rs":"e53e9e69fdaffc6cf4a5a5c3c03e678002dd3e1fa0d5f114d37823637818f962","src/lib.rs":"ce4c812f23ee0ac75d79296ae9cdedab73ab9fc69bb58e108486336e018efff7"},"package":"625e8a89c79dd4e590217838f32274ccbb0f4ecc1db181f84aba3df635fca729"} \ No newline at end of file diff --git a/vendor/kube-client/Cargo.toml b/vendor/kube-client/Cargo.toml index 5b6136f3d2d8c..74c8b1be2bf86 100644 --- a/vendor/kube-client/Cargo.toml +++ b/vendor/kube-client/Cargo.toml @@ -13,17 +13,39 @@ edition = "2021" rust-version = "1.56" name = "kube-client" -version = "0.70.0" -authors = ["clux ", "Teo Klestrup Röijezon ", "kazk "] +version = "0.72.0" +authors = [ + "clux ", + "Teo Klestrup Röijezon ", + "kazk ", +] description = "Kubernetes client" readme = "../README.md" -keywords = ["kubernetes", "client"] +keywords = [ + "kubernetes", + "client", +] categories = ["web-programming::http-client"] license = "Apache-2.0" repository = "https://github.com/kube-rs/kube-rs" + [package.metadata.docs.rs] -features = ["client", "native-tls", "rustls-tls", "openssl-tls", "ws", "oauth", "jsonpatch", "admission", "k8s-openapi/v1_22"] -rustdoc-args = ["--cfg", "docsrs"] +features = [ + "client", + "native-tls", + "rustls-tls", + "openssl-tls", + "ws", + "oauth", + "jsonpatch", + "admission", + "k8s-openapi/v1_23", +] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.base64] version = "0.13.0" optional = true @@ -59,7 +81,12 @@ optional = true [dependencies.hyper] version = "0.14.13" -features = ["client", "http1", "stream", "tcp"] +features = [ + "client", + "http1", + "stream", + "tcp", +] optional = true [dependencies.hyper-openssl] @@ -88,7 +115,7 @@ features = [] default-features = false [dependencies.kube-core] -version = "^0.70.0" +version = "=0.72.0" [dependencies.openssl] version = "0.10.36" @@ -112,12 +139,15 @@ features = ["dangerous_configuration"] optional = true [dependencies.rustls-pemfile] -version = "0.3.0" +version = "1.0.0" optional = true [dependencies.secrecy] version = "0.8.0" -features = ["alloc", "serde"] +features = [ + "alloc", + "serde", +] [dependencies.serde] version = "1.0.130" @@ -140,7 +170,11 @@ version = "1.0.29" [dependencies.tokio] version = "1.14.0" -features = ["time", "signal", "sync"] +features = [ + "time", + "signal", + "sync", +] optional = true [dependencies.tokio-native-tls] @@ -153,31 +187,47 @@ optional = true [dependencies.tokio-util] version = "0.7.0" -features = ["io", "codec"] +features = [ + "io", + "codec", +] optional = true [dependencies.tower] version = "0.4.6" -features = ["buffer", "filter", "util"] +features = [ + "buffer", + "filter", + "util", +] optional = true [dependencies.tower-http] -version = "0.2.0" -features = ["auth", "map-response-body", "trace"] +version = "0.3.2" +features = [ + "auth", + "map-response-body", + "trace", +] optional = true [dependencies.tracing] version = "0.1.29" features = ["log"] optional = true + [dev-dependencies.k8s-openapi] version = "0.14.0" -features = ["v1_22"] +features = ["v1_23"] default-features = false [dev-dependencies.kube] version = "<1.0.0, >=0.61.0" -features = ["derive", "client", "ws"] +features = [ + "derive", + "client", + "ws", +] [dev-dependencies.schemars] version = "0.8.6" @@ -196,16 +246,64 @@ version = "0.4.0" version = "0.4.0" [features] -__non_core = ["tracing", "serde_yaml", "base64"] +__non_core = [ + "tracing", + "serde_yaml", + "base64", +] admission = ["kube-core/admission"] -client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"] -config = ["__non_core", "pem", "dirs"] -default = ["client", "native-tls"] -deprecated-crd-v1beta1 = ["kube-core/deprecated-crd-v1beta1"] -gzip = ["client", "tower-http/decompression-gzip"] +client = [ + "config", + "__non_core", + "hyper", + "http-body", + "tower", + "tower-http", + "hyper-timeout", + "pin-project", + "chrono", + "jsonpath_lib", + "bytes", + "futures", + "tokio", + "tokio-util", + "either", +] +config = [ + "__non_core", + "pem", + "dirs", +] +default = [ + "client", + "openssl-tls", +] +gzip = [ + "client", + "tower-http/decompression-gzip", +] jsonpatch = ["kube-core/jsonpatch"] -native-tls = ["openssl", "hyper-tls", "tokio-native-tls"] -oauth = ["client", "tame-oauth"] -openssl-tls = ["openssl", "hyper-openssl"] -rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"] -ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"] +native-tls = [ + "openssl", + "hyper-tls", + "tokio-native-tls", +] +oauth = [ + "client", + "tame-oauth", +] +openssl-tls = [ + "openssl", + "hyper-openssl", +] +rustls-tls = [ + "rustls", + "rustls-pemfile", + "hyper-rustls", +] +ws = [ + "client", + "tokio-tungstenite", + "rand", + "kube-core/ws", +] diff --git a/vendor/kube-client/src/api/portforward.rs b/vendor/kube-client/src/api/portforward.rs index 901b062718104..6dee12b4cba0a 100644 --- a/vendor/kube-client/src/api/portforward.rs +++ b/vendor/kube-client/src/api/portforward.rs @@ -1,9 +1,4 @@ -use std::{ - future::Future, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll, Waker}, -}; +use std::{collections::HashMap, future::Future}; use bytes::{Buf, Bytes}; use futures::{ @@ -62,6 +57,9 @@ pub enum Error { /// Failed to receive a WebSocket message from the server. #[error("failed to receive a WebSocket message: {0}")] ReceiveWebSocketMessage(#[source] ws::Error), + + #[error("failed to complete the background task: {0}")] + Spawn(#[source] tokio::task::JoinError), } type ErrorReceiver = oneshot::Receiver; @@ -73,18 +71,15 @@ enum Message { ToPod(u8, Bytes), } -struct PortforwarderState { - waker: Option, - result: Option>, -} - -// Provides `AsyncRead + AsyncWrite` for each port and **does not** bind to local ports. -// Error channel for each port is only written by the server when there's an exception and -// the port cannot be used (didn't initialize or can't be used anymore). -/// Manage port forwarding. +/// Manages port-forwarded streams. +/// +/// Provides `AsyncRead + AsyncWrite` for each port and **does not** bind to local ports. Error +/// channel for each port is only written by the server when there's an exception and +/// the port cannot be used (didn't initialize or can't be used anymore). pub struct Portforwarder { - ports: Vec, - state: Arc>, + ports: HashMap, + errors: HashMap, + task: tokio::task::JoinHandle>, } impl Portforwarder { @@ -92,90 +87,59 @@ impl Portforwarder { where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, { - let mut ports = Vec::new(); - let mut errors = Vec::new(); - let mut duplexes = Vec::new(); - for _ in port_nums.iter() { + let mut ports = HashMap::with_capacity(port_nums.len()); + let mut error_rxs = HashMap::with_capacity(port_nums.len()); + let mut error_txs = Vec::with_capacity(port_nums.len()); + let mut task_ios = Vec::with_capacity(port_nums.len()); + for port in port_nums.iter() { let (a, b) = tokio::io::duplex(1024 * 1024); - let (tx, rx) = oneshot::channel(); - ports.push(Port::new(a, rx)); - errors.push(Some(tx)); - duplexes.push(b); - } - - let state = Arc::new(Mutex::new(PortforwarderState { - waker: None, - result: None, - })); - let shared_state = state.clone(); - let port_nums = port_nums.to_owned(); - tokio::spawn(async move { - let result = start_message_loop(stream, port_nums, duplexes, errors).await; - - let mut shared = shared_state.lock().unwrap(); - shared.result = Some(result); - if let Some(waker) = shared.waker.take() { - waker.wake() - } - }); - Portforwarder { ports, state } - } + ports.insert(*port, a); + task_ios.push(b); - /// Get streams for forwarded ports. - pub fn ports(&mut self) -> &mut [Port] { - self.ports.as_mut_slice() - } -} - -impl Future for Portforwarder { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = self.state.lock().unwrap(); - if let Some(result) = state.result.take() { - return Poll::Ready(result); - } - - if let Some(waker) = &state.waker { - if waker.will_wake(cx.waker()) { - return Poll::Pending; - } + let (tx, rx) = oneshot::channel(); + error_rxs.insert(*port, rx); + error_txs.push(Some(tx)); } - - state.waker = Some(cx.waker().clone()); - Poll::Pending - } -} - -pub struct Port { - // Data pipe. - stream: Option, - // Error channel. - error: Option, -} - -impl Port { - pub(crate) fn new(stream: DuplexStream, error: ErrorReceiver) -> Self { - Port { - stream: Some(stream), - error: Some(error), + let task = tokio::spawn(start_message_loop( + stream, + port_nums.to_vec(), + task_ios, + error_txs, + )); + + Portforwarder { + ports, + errors: error_rxs, + task, } } - /// Data pipe for sending to and receiving from the forwarded port. + /// Take a port stream by the port on the target resource. /// - /// This returns a `Some` on the first call, then a `None` on every subsequent call - pub fn stream(&mut self) -> Option { - self.stream.take() + /// A value is returned at most once per port. + #[inline] + pub fn take_stream(&mut self, port: u16) -> Option { + self.ports.remove(&port) } - /// Future that resolves with any error message or when the error sender is dropped. + /// Take a future that resolves with any error message or when the error sender is dropped. /// When the future resolves, the port should be considered no longer usable. /// - /// This returns a `Some` on the first call, then a `None` on every subsequent call - pub fn error(&mut self) -> Option>> { - // Ignore Cancellation error. - self.error.take().map(|recv| recv.map(|res| res.ok())) + /// A value is returned at most once per port. + #[inline] + pub fn take_error(&mut self, port: u16) -> Option>> { + self.errors.remove(&port).map(|recv| recv.map(|res| res.ok())) + } + + /// Abort the background task, causing port forwards to fail. + #[inline] + pub fn abort(&self) { + self.task.abort(); + } + + /// Waits for port forwarding task to complete. + pub async fn join(self) -> Result<(), Error> { + self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e))) } } diff --git a/vendor/kube-client/src/api/remote_command.rs b/vendor/kube-client/src/api/remote_command.rs index fa689e496a8aa..26b1be8c1fc7e 100644 --- a/vendor/kube-client/src/api/remote_command.rs +++ b/vendor/kube-client/src/api/remote_command.rs @@ -1,50 +1,84 @@ -use std::{ - future::Future, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll, Waker}, -}; +use std::future::Future; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ + channel::oneshot, future::{ select, Either::{Left, Right}, }, - SinkExt, StreamExt, + FutureExt, SinkExt, StreamExt, }; +use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}; use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; use super::AttachParams; -// Internal state of an attached process -struct AttachedProcessState { - waker: Option, - finished: bool, - status: Option, - stdin_writer: Option, - stdout_reader: Option, - stderr_reader: Option, +type StatusReceiver = oneshot::Receiver; +type StatusSender = oneshot::Sender; + +/// Errors from attaching to a pod. +#[derive(Debug, Error)] +pub enum Error { + /// Failed to read from stdin + #[error("failed to read from stdin: {0}")] + ReadStdin(#[source] std::io::Error), + + /// Failed to send stdin data to the pod + #[error("failed to send a stdin data: {0}")] + SendStdin(#[source] ws::Error), + + /// Failed to write to stdout + #[error("failed to write to stdout: {0}")] + WriteStdout(#[source] std::io::Error), + + /// Failed to write to stderr + #[error("failed to write to stderr: {0}")] + WriteStderr(#[source] std::io::Error), + + /// Failed to receive a WebSocket message from the server. + #[error("failed to receive a WebSocket message: {0}")] + ReceiveWebSocketMessage(#[source] ws::Error), + + // Failed to complete the background task + #[error("failed to complete the background task: {0}")] + Spawn(#[source] tokio::task::JoinError), + + /// Failed to send close message. + #[error("failed to send a WebSocket close message: {0}")] + SendClose(#[source] ws::Error), + + /// Failed to deserialize status object + #[error("failed to deserialize status object: {0}")] + DeserializeStatus(#[source] serde_json::Error), + + /// Failed to send status object + #[error("failed to send status object")] + SendStatus, } const MAX_BUF_SIZE: usize = 1024; /// Represents an attached process in a container for [`attach`] and [`exec`]. /// -/// Resolves when the connection terminates with an optional [`Status`]. /// Provides access to `stdin`, `stdout`, and `stderr` if attached. /// +/// Use [`AttachedProcess::join`] to wait for the process to terminate. +/// /// [`attach`]: crate::Api::attach /// [`exec`]: crate::Api::exec -/// [`Status`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::Status #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub struct AttachedProcess { has_stdin: bool, has_stdout: bool, has_stderr: bool, - state: Arc>, + stdin_writer: Option, + stdout_reader: Option, + stderr_reader: Option, + status_rx: Option, + task: tokio::task::JoinHandle>, } impl AttachedProcess { @@ -67,32 +101,25 @@ impl AttachedProcess { } else { (None, None) }; + let (status_tx, status_rx) = oneshot::channel(); - let state = Arc::new(Mutex::new(AttachedProcessState { - waker: None, - finished: false, - status: None, - stdin_writer: Some(stdin_writer), - stdout_reader, - stderr_reader, - })); - let shared_state = state.clone(); - tokio::spawn(async move { - let status = start_message_loop(stream, stdin_reader, stdout_writer, stderr_writer).await; - - let mut shared = shared_state.lock().unwrap(); - shared.finished = true; - shared.status = status; - if let Some(waker) = shared.waker.take() { - waker.wake() - } - }); + let task = tokio::spawn(start_message_loop( + stream, + stdin_reader, + stdout_writer, + stderr_writer, + status_tx, + )); AttachedProcess { has_stdin: ap.stdin, has_stdout: ap.stdout, has_stderr: ap.stderr, - state, + task, + stdin_writer: Some(stdin_writer), + stdout_reader, + stderr_reader, + status_rx: Some(status_rx), } } @@ -106,9 +133,7 @@ impl AttachedProcess { if !self.has_stdin { return None; } - - let mut state = self.state.lock().unwrap(); - state.stdin_writer.take() + self.stdin_writer.take() } /// Async reader for stdout outputs. @@ -121,8 +146,7 @@ impl AttachedProcess { if !self.has_stdout { return None; } - let mut state = self.state.lock().unwrap(); - state.stdout_reader.take() + self.stdout_reader.take() } /// Async reader for stderr outputs. @@ -135,30 +159,25 @@ impl AttachedProcess { if !self.has_stderr { return None; } - - let mut state = self.state.lock().unwrap(); - state.stderr_reader.take() + self.stderr_reader.take() } -} -impl Future for AttachedProcess { - type Output = Option; + /// Abort the background task, causing remote command to fail. + #[inline] + pub fn abort(&self) { + self.task.abort(); + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = self.state.lock().unwrap(); - if state.finished { - Poll::Ready(state.status.take()) - } else { - // Update waker if necessary - if let Some(waker) = &state.waker { - if waker.will_wake(cx.waker()) { - return Poll::Pending; - } - } + /// Waits for the remote command task to complete. + pub async fn join(self) -> Result<(), Error> { + self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e))) + } - state.waker = Some(cx.waker().clone()); - Poll::Pending - } + /// Take a future that resolves with any status object or when the sender is dropped. + /// + /// Returns `None` if called more than once. + pub fn take_status(&mut self) -> Option>> { + self.status_rx.take().map(|recv| recv.map(|res| res.ok())) } } @@ -174,7 +193,8 @@ async fn start_message_loop( stdin: impl AsyncRead + Unpin, mut stdout: Option, mut stderr: Option, -) -> Option + status_tx: StatusSender, +) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, { @@ -184,7 +204,6 @@ where let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); let mut server_msg = server_recv.next(); let mut next_stdin = stdin_stream.next(); - let mut status: Option = None; loop { match select(server_msg, next_stdin).await { @@ -193,31 +212,25 @@ where match message { Ok(Message::Stdout(bin)) => { if let Some(stdout) = stdout.as_mut() { - stdout - .write_all(&bin[1..]) - .await - .expect("stdout pipe is writable"); + stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?; } } Ok(Message::Stderr(bin)) => { if let Some(stderr) = stderr.as_mut() { - stderr - .write_all(&bin[1..]) - .await - .expect("stderr pipe is writable"); + stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?; } } Ok(Message::Status(bin)) => { - if let Ok(s) = serde_json::from_slice::(&bin[1..]) { - status = Some(s); - } + let status = + serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; + status_tx.send(status).map_err(|_| Error::SendStatus)?; + break; } - // Fatal error Err(err) => { - panic!("AttachedProcess: fatal WebSocket error: {:?}", err); + return Err(Error::ReceiveWebSocketMessage(err)); } } server_msg = server_recv.next(); @@ -238,28 +251,26 @@ where server_send .send(ws::Message::binary(vec)) .await - .expect("send stdin"); + .map_err(Error::SendStdin)?; } server_msg = p_server_msg; next_stdin = stdin_stream.next(); } Right((Some(Err(err)), _)) => { - server_send.close().await.expect("send close message"); - panic!("AttachedProcess: failed to read from stdin pipe: {:?}", err); + return Err(Error::ReadStdin(err)); } Right((None, _)) => { // Stdin closed (writer half dropped). // Let the server know and disconnect. - // REVIEW warn? - server_send.close().await.expect("send close message"); + server_send.close().await.map_err(Error::SendClose)?; break; } } } - status + Ok(()) } /// Channeled messages from the server. diff --git a/vendor/kube-client/src/client/builder.rs b/vendor/kube-client/src/client/builder.rs new file mode 100644 index 0000000000000..1916c26cea212 --- /dev/null +++ b/vendor/kube-client/src/client/builder.rs @@ -0,0 +1,181 @@ +use bytes::Bytes; +use http::{Request, Response}; +use hyper::{self, client::HttpConnector}; +use hyper_timeout::TimeoutConnector; +pub use kube_core::response::Status; +use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder}; +use tower_http::{ + classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer, +}; + +use crate::{client::ConfigExt, Client, Config, Error, Result}; + +/// HTTP body of a dynamic backing type. +/// +/// The suggested implementation type is [`hyper::Body`]. +pub type DynBody = dyn http_body::Body + Send + Unpin; + +/// Builder for [`Client`] instances with customized [tower](`Service`) middleware. +pub struct ClientBuilder { + service: Svc, + default_ns: String, +} + +impl ClientBuilder { + /// Construct a [`ClientBuilder`] from scratch with a fully custom [`Service`] stack. + /// + /// This method is only intended for advanced use cases, most users will want to use [`ClientBuilder::try_from`] instead, + /// which provides a default stack as a starting point. + pub fn new(service: Svc, default_namespace: impl Into) -> Self + where + Svc: Service>, + { + Self { + service, + default_ns: default_namespace.into(), + } + } + + /// Add a [`Layer`] to the current [`Service`] stack. + pub fn with_layer>(self, layer: &L) -> ClientBuilder { + let Self { + service: stack, + default_ns, + } = self; + ClientBuilder { + service: layer.layer(stack), + default_ns, + } + } + + /// Build a [`Client`] instance with the current [`Service`] stack. + pub fn build(self) -> Client + where + Svc: Service, Response = Response> + Send + 'static, + Svc::Future: Send + 'static, + Svc::Error: Into, + B: http_body::Body + Send + 'static, + B::Error: Into, + { + Client::new(self.service, self.default_ns) + } +} + +impl TryFrom for ClientBuilder, Response>, BoxError>> { + type Error = Error; + + /// Builds a default [`ClientBuilder`] stack from a given configuration + fn try_from(config: Config) -> Result { + use std::time::Duration; + + use http::header::HeaderMap; + use tracing::Span; + + let timeout = config.timeout; + let default_ns = config.default_namespace.clone(); + + let client: hyper::Client<_, hyper::Body> = { + let mut connector = HttpConnector::new(); + connector.enforce_http(false); + + // Current TLS feature precedence when more than one are set: + // 1. openssl-tls + // 2. native-tls + // 3. rustls-tls + // Create a custom client to use something else. + // If TLS features are not enabled, http connector will be used. + #[cfg(feature = "openssl-tls")] + let connector = config.openssl_https_connector_with_connector(connector)?; + #[cfg(all(not(feature = "openssl-tls"), feature = "native-tls"))] + let connector = hyper_tls::HttpsConnector::from(( + connector, + tokio_native_tls::TlsConnector::from(config.native_tls_connector()?), + )); + #[cfg(all( + not(any(feature = "openssl-tls", feature = "native-tls")), + feature = "rustls-tls" + ))] + let connector = hyper_rustls::HttpsConnector::from(( + connector, + std::sync::Arc::new(config.rustls_client_config()?), + )); + + let mut connector = TimeoutConnector::new(connector); + connector.set_connect_timeout(timeout); + connector.set_read_timeout(timeout); + + hyper::Client::builder().build(connector) + }; + + let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner(); + #[cfg(feature = "gzip")] + let stack = ServiceBuilder::new() + .layer(stack) + .layer(tower_http::decompression::DecompressionLayer::new()) + .into_inner(); + + let service = ServiceBuilder::new() + .layer(stack) + .option_layer(config.auth_layer()?) + .layer(config.extra_headers_layer()?) + .layer( + // Attribute names follow [Semantic Conventions]. + // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md + TraceLayer::new_for_http() + .make_span_with(|req: &Request| { + tracing::debug_span!( + "HTTP", + http.method = %req.method(), + http.url = %req.uri(), + http.status_code = tracing::field::Empty, + otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"), + otel.kind = "client", + otel.status_code = tracing::field::Empty, + ) + }) + .on_request(|_req: &Request, _span: &Span| { + tracing::debug!("requesting"); + }) + .on_response(|res: &Response, _latency: Duration, span: &Span| { + let status = res.status(); + span.record("http.status_code", &status.as_u16()); + if status.is_client_error() || status.is_server_error() { + span.record("otel.status_code", &"ERROR"); + } + }) + // Explicitly disable `on_body_chunk`. The default does nothing. + .on_body_chunk(()) + .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| { + tracing::debug!("stream closed"); + }) + .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| { + // Called when + // - Calling the inner service errored + // - Polling `Body` errored + // - the response was classified as failure (5xx) + // - End of stream was classified as failure + span.record("otel.status_code", &"ERROR"); + match ec { + ServerErrorsFailureClass::StatusCode(status) => { + span.record("http.status_code", &status.as_u16()); + tracing::error!("failed with status {}", status) + } + ServerErrorsFailureClass::Error(err) => { + tracing::error!("failed with error {}", err) + } + } + }), + ) + .service(client); + + Ok(Self::new( + BoxService::new( + MapResponseBodyLayer::new(|body| { + Box::new(http_body::Body::map_err(body, BoxError::from)) as Box + }) + .layer(service), + ), + default_ns, + )) + } +} diff --git a/vendor/kube-client/src/client/mod.rs b/vendor/kube-client/src/client/mod.rs index 347e253cf9fd7..91006c6dce0cb 100644 --- a/vendor/kube-client/src/client/mod.rs +++ b/vendor/kube-client/src/client/mod.rs @@ -11,8 +11,7 @@ use bytes::Bytes; use either::{Either, Left, Right}; use futures::{self, Stream, StreamExt, TryStream, TryStreamExt}; use http::{self, Request, Response, StatusCode}; -use hyper::{client::HttpConnector, Body}; -use hyper_timeout::TimeoutConnector; +use hyper::Body; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; pub use kube_core::response::Status; use serde::de::DeserializeOwned; @@ -23,15 +22,14 @@ use tokio_util::{ codec::{FramedRead, LinesCodec, LinesCodecError}, io::StreamReader, }; -use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceBuilder, ServiceExt}; -use tower_http::{ - classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer, -}; +use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt}; +use tower_http::map_response_body::MapResponseBodyLayer; use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; mod auth; mod body; +mod builder; // Add `into_stream()` to `http::Body` use body::BodyStreamExt; mod config_ext; @@ -40,6 +38,7 @@ pub use config_ext::ConfigExt; pub mod middleware; #[cfg(any(feature = "native-tls", feature = "rustls-tls", feature = "openssl-tls"))] mod tls; + #[cfg(feature = "native-tls")] pub use tls::native_tls::Error as NativeTlsError; #[cfg(feature = "openssl-tls")] pub use tls::openssl_tls::Error as OpensslTlsError; @@ -52,6 +51,8 @@ pub use auth::OAuthError; #[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError; +pub use builder::{ClientBuilder, DynBody}; + /// Client for connecting with a Kubernetes cluster. /// /// The easiest way to instantiate the client is either by @@ -450,110 +451,9 @@ fn handle_api_errors(text: &str, s: StatusCode) -> Result<()> { impl TryFrom for Client { type Error = Error; - /// Convert [`Config`] into a [`Client`] + /// Builds a default [`Client`] from a [`Config`], see [`ClientBuilder`] if more customization is required fn try_from(config: Config) -> Result { - use std::time::Duration; - - use http::header::HeaderMap; - use tracing::Span; - - let timeout = config.timeout; - let default_ns = config.default_namespace.clone(); - - let client: hyper::Client<_, Body> = { - let mut connector = HttpConnector::new(); - connector.enforce_http(false); - - // Current TLS feature precedence when more than one are set: - // 1. openssl-tls - // 2. native-tls - // 3. rustls-tls - // Create a custom client to use something else. - // If TLS features are not enabled, http connector will be used. - #[cfg(feature = "openssl-tls")] - let connector = config.openssl_https_connector_with_connector(connector)?; - #[cfg(all(not(feature = "openssl-tls"), feature = "native-tls"))] - let connector = hyper_tls::HttpsConnector::from(( - connector, - tokio_native_tls::TlsConnector::from(config.native_tls_connector()?), - )); - #[cfg(all( - not(any(feature = "openssl-tls", feature = "native-tls")), - feature = "rustls-tls" - ))] - let connector = hyper_rustls::HttpsConnector::from(( - connector, - std::sync::Arc::new(config.rustls_client_config()?), - )); - - let mut connector = TimeoutConnector::new(connector); - connector.set_connect_timeout(timeout); - connector.set_read_timeout(timeout); - - hyper::Client::builder().build(connector) - }; - - let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner(); - #[cfg(feature = "gzip")] - let stack = ServiceBuilder::new() - .layer(stack) - .layer(tower_http::decompression::DecompressionLayer::new()) - .into_inner(); - - let service = ServiceBuilder::new() - .layer(stack) - .option_layer(config.auth_layer()?) - .layer(config.extra_headers_layer()?) - .layer( - // Attribute names follow [Semantic Conventions]. - // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md - TraceLayer::new_for_http() - .make_span_with(|req: &Request| { - tracing::debug_span!( - "HTTP", - http.method = %req.method(), - http.url = %req.uri(), - http.status_code = tracing::field::Empty, - otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"), - otel.kind = "client", - otel.status_code = tracing::field::Empty, - ) - }) - .on_request(|_req: &Request, _span: &Span| { - tracing::debug!("requesting"); - }) - .on_response(|res: &Response, _latency: Duration, span: &Span| { - let status = res.status(); - span.record("http.status_code", &status.as_u16()); - if status.is_client_error() || status.is_server_error() { - span.record("otel.status_code", &"ERROR"); - } - }) - // Explicitly disable `on_body_chunk`. The default does nothing. - .on_body_chunk(()) - .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| { - tracing::debug!("stream closed"); - }) - .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| { - // Called when - // - Calling the inner service errored - // - Polling `Body` errored - // - the response was classified as failure (5xx) - // - End of stream was classified as failure - span.record("otel.status_code", &"ERROR"); - match ec { - ServerErrorsFailureClass::StatusCode(status) => { - span.record("http.status_code", &status.as_u16()); - tracing::error!("failed with status {}", status) - } - ServerErrorsFailureClass::Error(err) => { - tracing::error!("failed with error {}", err) - } - } - }), - ) - .service(client); - Ok(Self::new(service, default_ns)) + Ok(ClientBuilder::try_from(config)?.build()) } } diff --git a/vendor/kube-client/src/config/file_loader.rs b/vendor/kube-client/src/config/file_loader.rs index 2897f0485f262..7cf8552a89abe 100644 --- a/vendor/kube-client/src/config/file_loader.rs +++ b/vendor/kube-client/src/config/file_loader.rs @@ -111,7 +111,9 @@ impl ConfigLoader { if let Some(proxy) = nonempty(self.cluster.proxy_url.clone()) .or_else(|| nonempty(std::env::var("HTTP_PROXY").ok())) + .or_else(|| nonempty(std::env::var("http_proxy").ok())) .or_else(|| nonempty(std::env::var("HTTPS_PROXY").ok())) + .or_else(|| nonempty(std::env::var("https_proxy").ok())) { Ok(Some( proxy diff --git a/vendor/kube-client/src/config/incluster_config.rs b/vendor/kube-client/src/config/incluster_config.rs index b71acdcb1c019..4b80349b31705 100644 --- a/vendor/kube-client/src/config/incluster_config.rs +++ b/vendor/kube-client/src/config/incluster_config.rs @@ -1,12 +1,5 @@ -use std::env; - use thiserror::Error; -// Old method to connect to kubernetes -const SERVICE_HOSTENV: &str = "KUBERNETES_SERVICE_HOST"; -const SERVICE_PORTENV: &str = "KUBERNETES_SERVICE_PORT"; -// New method to connect to kubernetes -const SERVICE_DNS: &str = "kubernetes.default.svc"; // Mounted credential files const SERVICE_TOKENFILE: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token"; const SERVICE_CERTFILE: &str = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"; @@ -15,14 +8,6 @@ const SERVICE_DEFAULT_NS: &str = "/var/run/secrets/kubernetes.io/serviceaccount/ /// Errors from loading in-cluster config #[derive(Error, Debug)] pub enum Error { - /// Required envionment variables were not set - #[error( - "missing environment variables {} and/or {}", - SERVICE_HOSTENV, - SERVICE_PORTENV - )] - MissingEnvironmentVariables, - /// Failed to read the default namespace for the service account #[error("failed to read the default namespace: {0}")] ReadDefaultNamespace(#[source] std::io::Error), @@ -40,35 +25,8 @@ pub enum Error { ParseCertificates(#[source] pem::PemError), } -/// Returns Kubernetes address from specified environment variables. -pub fn kube_server() -> Result { - kube_host_port() - .ok_or(Error::MissingEnvironmentVariables)? - .parse::() - .map_err(Error::ParseClusterUrl) -} - pub fn kube_dns() -> http::Uri { - http::Uri::builder() - .scheme("https") - .authority(SERVICE_DNS) - .path_and_query("/") - .build() - .unwrap() -} - -fn kube_host_port() -> Option { - let host = kube_host()?; - let port = kube_port()?; - Some(format!("https://{}:{}", host, port)) -} - -fn kube_host() -> Option { - env::var(SERVICE_HOSTENV).ok() -} - -fn kube_port() -> Option { - env::var(SERVICE_PORTENV).ok() + http::Uri::from_static("https://kubernetes.default.svc/") } pub fn token_file() -> String { @@ -85,27 +43,3 @@ pub fn load_cert() -> Result>, Error> { pub fn load_default_ns() -> Result { std::fs::read_to_string(&SERVICE_DEFAULT_NS).map_err(Error::ReadDefaultNamespace) } - -#[test] -fn test_kube_host() { - let expected = "fake.io"; - env::set_var(SERVICE_HOSTENV, expected); - assert_eq!(kube_host().unwrap(), expected); - kube_dns(); // verify kube_dns always unwraps -} - -#[test] -fn test_kube_port() { - let expected = "8080"; - env::set_var(SERVICE_PORTENV, expected); - assert_eq!(kube_port().unwrap(), expected); -} - -#[test] -fn test_kube_server() { - let host = "fake.io"; - let port = "8080"; - env::set_var(SERVICE_HOSTENV, host); - env::set_var(SERVICE_PORTENV, port); - assert_eq!(kube_server().unwrap(), "https://fake.io:8080"); -} diff --git a/vendor/kube-client/src/config/mod.rs b/vendor/kube-client/src/config/mod.rs index e5e4d1fa88c2a..9af1f7a0988e6 100644 --- a/vendor/kube-client/src/config/mod.rs +++ b/vendor/kube-client/src/config/mod.rs @@ -132,7 +132,7 @@ pub struct Config { /// /// A value of `None` means no timeout pub timeout: Option, - /// Whether to accept invalid ceritifacts + /// Whether to accept invalid certificates pub accept_invalid_certs: bool, /// Stores information to tell the cluster who you are. pub(crate) auth_info: AuthInfo, @@ -192,14 +192,7 @@ impl Config { /// and relies on you having the service account's token mounted, /// as well as having given the service account rbac access to do what you need. pub fn from_cluster_env() -> Result { - let cluster_url = if cfg!(feature = "rustls-tls") { - // try rolling out new method for rustls which does not support ip based urls anyway - // see https://github.com/kube-rs/kube-rs/issues/587 - incluster_config::kube_dns() - } else { - incluster_config::kube_server()? - }; - + let cluster_url = incluster_config::kube_dns(); let default_namespace = incluster_config::load_default_ns()?; let root_cert = incluster_config::load_cert()?; diff --git a/vendor/kube-client/src/error.rs b/vendor/kube-client/src/error.rs index 9cf29cc5b3c65..4b2826459f0c4 100644 --- a/vendor/kube-client/src/error.rs +++ b/vendor/kube-client/src/error.rs @@ -67,7 +67,7 @@ pub enum Error { /// Errors from OpenSSL TLS #[cfg(feature = "openssl-tls")] - #[cfg_attr(docsrs, doc(feature = "openssl-tls"))] + #[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))] #[error("openssl tls error: {0}")] OpensslTls(#[source] crate::client::OpensslTlsError), diff --git a/vendor/kube-client/src/lib.rs b/vendor/kube-client/src/lib.rs index 213e5752c1afe..03bf2b34f47ad 100644 --- a/vendor/kube-client/src/lib.rs +++ b/vendor/kube-client/src/lib.rs @@ -173,6 +173,21 @@ mod test { Ok(()) } + #[tokio::test] + #[ignore] // needs cluster (lists pods) + #[cfg(all(feature = "openssl-tls"))] + async fn custom_client_openssl_tls_configuration() -> Result<(), Box> { + let config = Config::infer().await?; + let https = config.openssl_https_connector()?; + let service = ServiceBuilder::new() + .layer(config.base_uri_layer()) + .service(hyper::Client::builder().build(https)); + let client = Client::new(service, config.default_namespace); + let pods: Api = Api::default_namespaced(client); + pods.list(&Default::default()).await?; + Ok(()) + } + #[tokio::test] #[ignore] // needs cluster (lists api resources) #[cfg(all(feature = "discovery"))] @@ -336,7 +351,7 @@ mod test { .collect::>() .await .join(""); - attached.await; + attached.join().await.unwrap(); assert_eq!(out.lines().count(), 3); assert_eq!(out, "1\n2\n3\n"); } @@ -354,15 +369,16 @@ mod test { let mut stdin_writer = attached.stdin().unwrap(); let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); let next_stdout = stdout_stream.next(); - stdin_writer.write(b"echo test string 1\n").await?; + stdin_writer.write_all(b"echo test string 1\n").await?; let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap(); println!("{}", stdout); assert_eq!(stdout, "test string 1\n"); // AttachedProcess resolves with status object. // Send `exit 1` to get a failure status. - stdin_writer.write(b"exit 1\n").await?; - if let Some(status) = attached.await { + stdin_writer.write_all(b"exit 1\n").await?; + let status = attached.take_status().unwrap(); + if let Some(status) = status.await { println!("{:?}", status); assert_eq!(status.status, Some("Failure".to_owned())); assert_eq!(status.reason, Some("NonZeroExitCode".to_owned())); diff --git a/vendor/kube-core/.cargo-checksum.json b/vendor/kube-core/.cargo-checksum.json index 2f4a8cffd4ee7..b329f97b529e3 100644 --- a/vendor/kube-core/.cargo-checksum.json +++ b/vendor/kube-core/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"3d26737f838350041f9cf902e5cb2097ae790ee0e516fb67f71a1a562095be1e","README.md":"8e51230819eb7772f0097b56afee5c0e98552b4d4ebbfc0f075cebebb8feaaf6","src/admission.rs":"7df98be89eaf6a0991603e7aeaef7976c0093bfddd174b0e10e50cd35e7013de","src/crd.rs":"78c4c72b88a2e1c1933065269a60471094dfdf44a3068cea507dee95251ad4b6","src/discovery.rs":"9ffae9a2c7c569e46e07ab3bddb073d4ab5d759f0b17de3e71fae7e9b6958020","src/dynamic.rs":"9f54df338e5b2fe3a5eae0eb92594fad7d56d630f94ec53745226a01580d6531","src/error.rs":"1da7f8299d9667a5101e9ac00d0109003e1157e241a38d7771c53a397b7afe15","src/gvk.rs":"71d2b518549d256c6fdefcd14bf2f0a7da2bd011aa270f02c1bf9e52f617c034","src/lib.rs":"fc763c264fc029708e983e174ddd588bd5cfea1529e9d6fcd1240cfd612cc3b7","src/metadata.rs":"b86319781d8587a8d321d36adc9a4fe32acb1bd2bac99e6c49fe1f493dd10fb8","src/object.rs":"b11e6c20b719e5e006ca4d589149f53ef1a3f37d5312b69f6388492178e1901c","src/params.rs":"ca8322c1a33ee5ecba4a19b5b9175bb66c2b5bff79d2f6c8245c5fdd59f39961","src/request.rs":"600633661b752417cb8d54df6912c8242de56a2084010e61a5d46952009e0ce3","src/resource.rs":"acf2b5c1f4ac5c8b016dac8a3383783ba3afb6537293829191931d05a0af027b","src/response.rs":"03fdfef20f9a57ba4d3a5371be01622b16e6aaf5f14208b648af4ac55805a09f","src/schema.rs":"9cd0f958808c594326db635d0dcc824c9d9907d2a11e32d337c9d16d5988ec08","src/subresource.rs":"2cd256e05bbc87c2cdf8db2b5801e70a1fb521713da7987a5dc86ee4a3508590","src/util.rs":"fcf8c8864872619533d57a9531c57627c243851447c978d3d912fbe4101bf1bf","src/version.rs":"0b99ad86624517ac29738134a1a332aad20e8a6e734ec425a5c40bf7a797db0e","src/watch.rs":"d957654cd413eded3bdb5a768b3f4d2cbf3c5054f2e9e848744982a6e2c27598"},"package":"7dd9e3535777edd122cc26fe3fe6357066b33eff63d8b919862edbe7a956a679"} \ No newline at end of file +{"files":{"Cargo.toml":"071c380762ab3b99e8609cf1042d02b0e21a1ebc51b85d6ac25ed18f43ed3169","README.md":"8e51230819eb7772f0097b56afee5c0e98552b4d4ebbfc0f075cebebb8feaaf6","src/admission.rs":"7df98be89eaf6a0991603e7aeaef7976c0093bfddd174b0e10e50cd35e7013de","src/crd.rs":"7ba31fe22c27b2578a7f0c840960f72c4c2c033fcfcdc3937bb67bd6c6e8795c","src/discovery.rs":"9ffae9a2c7c569e46e07ab3bddb073d4ab5d759f0b17de3e71fae7e9b6958020","src/dynamic.rs":"9f54df338e5b2fe3a5eae0eb92594fad7d56d630f94ec53745226a01580d6531","src/error.rs":"1da7f8299d9667a5101e9ac00d0109003e1157e241a38d7771c53a397b7afe15","src/gvk.rs":"2cccba0e5bd0e58cd625b9c00be6b4a3e4419369efeadc7e0b6e383a9e02cbc6","src/lib.rs":"fc763c264fc029708e983e174ddd588bd5cfea1529e9d6fcd1240cfd612cc3b7","src/metadata.rs":"b86319781d8587a8d321d36adc9a4fe32acb1bd2bac99e6c49fe1f493dd10fb8","src/object.rs":"b11e6c20b719e5e006ca4d589149f53ef1a3f37d5312b69f6388492178e1901c","src/params.rs":"ca8322c1a33ee5ecba4a19b5b9175bb66c2b5bff79d2f6c8245c5fdd59f39961","src/request.rs":"600633661b752417cb8d54df6912c8242de56a2084010e61a5d46952009e0ce3","src/resource.rs":"56837c3f3cf9ef622d66429369c5b90fd3a6eed3946c7d8f92b7f3138b3c1ee0","src/response.rs":"03fdfef20f9a57ba4d3a5371be01622b16e6aaf5f14208b648af4ac55805a09f","src/schema.rs":"6acb53c33eafe10c43854809fbdbd060f4ba72310395476d588ea1e8e62f8a9a","src/subresource.rs":"2cd256e05bbc87c2cdf8db2b5801e70a1fb521713da7987a5dc86ee4a3508590","src/util.rs":"fcf8c8864872619533d57a9531c57627c243851447c978d3d912fbe4101bf1bf","src/version.rs":"0b99ad86624517ac29738134a1a332aad20e8a6e734ec425a5c40bf7a797db0e","src/watch.rs":"d957654cd413eded3bdb5a768b3f4d2cbf3c5054f2e9e848744982a6e2c27598"},"package":"0491dcd9adca79a96c63404aa978137f5fe1d3db5dcb6e5c0b073f915dbdd49d"} \ No newline at end of file diff --git a/vendor/kube-core/Cargo.toml b/vendor/kube-core/Cargo.toml index 4333eef33ba43..d1755e92ee2cb 100644 --- a/vendor/kube-core/Cargo.toml +++ b/vendor/kube-core/Cargo.toml @@ -13,15 +13,28 @@ edition = "2021" rust-version = "1.56" name = "kube-core" -version = "0.70.0" -authors = ["clux ", "kazk "] +version = "0.72.0" +authors = [ + "clux ", + "kazk ", +] description = "Kube shared types, traits and client-less behavior" readme = "../README.md" license = "Apache-2.0" repository = "https://github.com/kube-rs/kube-rs" + [package.metadata.docs.rs] -features = ["ws", "admission", "jsonpatch", "k8s-openapi/v1_22"] -rustdoc-args = ["--cfg", "docsrs"] +features = [ + "ws", + "admission", + "jsonpatch", + "k8s-openapi/v1_23", +] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.chrono] version = "0.4.19" features = ["clock"] @@ -58,17 +71,23 @@ version = "1.0.68" [dependencies.thiserror] version = "1.0.29" + +[dev-dependencies.assert-json-diff] +version = "2.0.1" + [dev-dependencies.k8s-openapi] version = "0.14.0" -features = ["v1_22"] +features = ["v1_23"] default-features = false [dev-dependencies.kube] version = "<1.0.0, >=0.53.0" +[dev-dependencies.serde_yaml] +version = "0.8.23" + [features] admission = ["json-patch"] -deprecated-crd-v1beta1 = [] jsonpatch = ["json-patch"] schema = ["schemars"] ws = [] diff --git a/vendor/kube-core/src/crd.rs b/vendor/kube-core/src/crd.rs index 0fd515178b47f..7d72de9551798 100644 --- a/vendor/kube-core/src/crd.rs +++ b/vendor/kube-core/src/crd.rs @@ -4,6 +4,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions as apiexts; /// Types for v1 CustomResourceDefinitions pub mod v1 { + use super::apiexts::v1::CustomResourceDefinition as Crd; /// Extension trait that is implemented by kube-derive /// /// This trait variant is implemented by default (or when `#[kube(apiextensions = "v1")]`) @@ -11,7 +12,7 @@ pub mod v1 { /// Helper to generate the CRD including the JsonSchema /// /// This is using the stable v1::CustomResourceDefinitions (present in kubernetes >= 1.16) - fn crd() -> super::apiexts::v1::CustomResourceDefinition; + fn crd() -> Crd; /// Helper to return the name of this `CustomResourceDefinition` in kubernetes. /// /// This is not the name of an _instance_ of this custom resource but the `CustomResourceDefinition` object itself. @@ -28,36 +29,213 @@ pub mod v1 { /// [`Pod`]: `k8s_openapi::api::core::v1::Pod` fn shortnames() -> &'static [&'static str]; } -} -/// Types for legacy v1beta1 CustomResourceDefinitions -#[cfg(feature = "deprecated-crd-v1beta1")] -pub mod v1beta1 { - /// Extension trait that is implemented by kube-derive for legacy v1beta1::CustomResourceDefinitions + /// Possible errors when merging CRDs + #[derive(Debug, thiserror::Error)] + pub enum MergeError { + /// No crds given + #[error("empty list of CRDs cannot be merged")] + MissingCrds, + + /// Stored api not present + #[error("stored api version {0} not found")] + MissingStoredApi(String), + + /// Root api not present + #[error("root api version {0} not found")] + MissingRootVersion(String), + + /// No versions given in one crd to merge + #[error("given CRD must have versions")] + MissingVersions, + + /// Too many versions given to individual crds + #[error("mergeable CRDs cannot have multiple versions")] + MultiVersionCrd, + + /// Mismatching spec properties on crds + #[error("mismatching {0} property from given CRDs")] + PropertyMismatch(String), + } + + /// Merge a collection of crds into a single multiversion crd /// - /// This trait variant is only implemented with `#[kube(apiextensions = "v1beta1")]` - pub trait CustomResourceExt { - /// Helper to generate the legacy CRD without a JsonSchema - /// - /// This is using v1beta1::CustomResourceDefinitions (which will be removed in kubernetes 1.22) - fn crd() -> super::apiexts::v1beta1::CustomResourceDefinition; - /// Helper to return the name of this `CustomResourceDefinition` in kubernetes. - /// - /// This is not the name of an _instance_ of this custom resource but the `CustomResourceDefinition` object itself. - fn crd_name() -> &'static str; - /// Helper to generate the api information type for use with the dynamic `Api` - fn api_resource() -> crate::discovery::ApiResource; - /// Shortnames of this resource type. - /// - /// For example: [`Pod`] has the shortname alias `po`. - /// - /// NOTE: This function returns *declared* short names (at compile-time, using the `#[kube(shortname = "foo")]`), not the - /// shortnames registered with the Kubernetes API (which is what tools such as `kubectl` look at). - /// - /// [`Pod`]: `k8s_openapi::api::core::v1::Pod` - fn shortnames() -> &'static [&'static str]; + /// Given multiple [`CustomResource`] derived types granting [`CRD`]s via [`CustomResourceExt::crd`], + /// we can merge them into a single [`CRD`] with multiple [`CRDVersion`] objects, marking only + /// the specified apiversion as `storage: true`. + /// + /// This merge algorithm assumes that every [`CRD`]: + /// + /// - exposes exactly one [`CRDVersion`] + /// - uses identical values for `spec.group`, `spec.scope`, and `spec.names.kind` + /// + /// This is always true for [`CustomResource`] derives. + /// + /// ## Usage + /// + /// ```no_run + /// # use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; + /// use kube::core::crd::merge_crds; + /// # let mycrd_v1: CustomResourceDefinition = todo!(); // v1::MyCrd::crd(); + /// # let mycrd_v2: CustomResourceDefinition = todo!(); // v2::MyCrd::crd(); + /// let crds = vec![mycrd_v1, mycrd_v2]; + /// let multi_version_crd = merge_crds(crds, "v1").unwrap(); + /// ``` + /// + /// Note the merge is done by marking the: + /// + /// - crd containing the `stored_apiversion` as the place the other crds merge their [`CRDVersion`] items + /// - stored version is marked with `storage: true`, while all others get `storage: false` + /// + /// [`CustomResourceExt::crd`]: crate::CustomResourceExt::crd + /// [`CRD`]: https://docs.rs/k8s-openapi/latest/k8s_openapi/apiextensions_apiserver/pkg/apis/apiextensions/v1/struct.CustomResourceDefinition.html + /// [`CRDVersion`]: https://docs.rs/k8s-openapi/latest/k8s_openapi/apiextensions_apiserver/pkg/apis/apiextensions/v1/struct.CustomResourceDefinitionVersion.html + /// [`CustomResource`]: https://docs.rs/kube/latest/kube/derive.CustomResource.html + pub fn merge_crds(mut crds: Vec, stored_apiversion: &str) -> Result { + if crds.is_empty() { + return Err(MergeError::MissingCrds); + } + for crd in crds.iter() { + if crd.spec.versions.is_empty() { + return Err(MergeError::MissingVersions); + } + if crd.spec.versions.len() != 1 { + return Err(MergeError::MultiVersionCrd); + } + } + let ver = stored_apiversion; + let found = crds.iter().position(|c| c.spec.versions[0].name == ver); + // Extract the root/first object to start with (the one we will merge into) + let mut root = match found { + None => return Err(MergeError::MissingRootVersion(ver.into())), + Some(idx) => crds.remove(idx), + }; + root.spec.versions[0].storage = true; // main version - set true in case modified + + // Values that needs to be identical across crds: + let group = &root.spec.group; + let kind = &root.spec.names.kind; + let scope = &root.spec.scope; + // sanity; don't merge crds with mismatching groups, versions, or other core properties + for crd in crds.iter() { + if &crd.spec.group != group { + return Err(MergeError::PropertyMismatch("group".to_string())); + } + if &crd.spec.names.kind != kind { + return Err(MergeError::PropertyMismatch("kind".to_string())); + } + if &crd.spec.scope != scope { + return Err(MergeError::PropertyMismatch("scope".to_string())); + } + } + + // combine all version objects into the root object + let versions = &mut root.spec.versions; + while let Some(mut crd) = crds.pop() { + while let Some(mut v) = crd.spec.versions.pop() { + v.storage = false; // secondary versions + versions.push(v); + } + } + Ok(root) + } + + mod tests { + #[test] + fn crd_merge() { + use super::{merge_crds, Crd}; + let crd1 = r#" + apiVersion: apiextensions.k8s.io/v1 + kind: CustomResourceDefinition + metadata: + name: multiversions.kube.rs + spec: + group: kube.rs + names: + categories: [] + kind: MultiVersion + plural: multiversions + shortNames: [] + singular: multiversion + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true"#; + + let crd2 = r#" + apiVersion: apiextensions.k8s.io/v1 + kind: CustomResourceDefinition + metadata: + name: multiversions.kube.rs + spec: + group: kube.rs + names: + categories: [] + kind: MultiVersion + plural: multiversions + shortNames: [] + singular: multiversion + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true"#; + + let expected = r#" + apiVersion: apiextensions.k8s.io/v1 + kind: CustomResourceDefinition + metadata: + name: multiversions.kube.rs + spec: + group: kube.rs + names: + categories: [] + kind: MultiVersion + plural: multiversions + shortNames: [] + singular: multiversion + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v2 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true + - additionalPrinterColumns: [] + name: v1 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: false"#; + + + let c1: Crd = serde_yaml::from_str(crd1).unwrap(); + let c2: Crd = serde_yaml::from_str(crd2).unwrap(); + let ce: Crd = serde_yaml::from_str(expected).unwrap(); + let combined = merge_crds(vec![c1, c2], "v2").unwrap(); + + let combo_json = serde_json::to_value(&combined).unwrap(); + let exp_json = serde_json::to_value(&ce).unwrap(); + assert_json_diff::assert_json_eq!(combo_json, exp_json); + } } } -/// re-export the current latest version until a newer one is available in cloud providers -pub use v1::CustomResourceExt; +// re-export current latest (v1) +pub use v1::{merge_crds, CustomResourceExt, MergeError}; diff --git a/vendor/kube-core/src/gvk.rs b/vendor/kube-core/src/gvk.rs index 4c8a3ef0ef3cd..df2d5f52b408d 100644 --- a/vendor/kube-core/src/gvk.rs +++ b/vendor/kube-core/src/gvk.rs @@ -1,12 +1,13 @@ //! Type information structs for dynamic resources. use std::str::FromStr; +use crate::TypeMeta; use serde::{Deserialize, Serialize}; use thiserror::Error; #[derive(Debug, Error)] #[error("failed to parse group version: {0}")] -/// Failed to parse group version. +/// Failed to parse group version pub struct ParseGroupVersionError(pub String); /// Core information about an API Resource. @@ -31,6 +32,21 @@ impl GroupVersionKind { } } +impl TryFrom<&TypeMeta> for GroupVersionKind { + type Error = ParseGroupVersionError; + + fn try_from(tm: &TypeMeta) -> Result { + Ok(GroupVersion::from_str(&tm.api_version)?.with_kind(&tm.kind)) + } +} +impl TryFrom for GroupVersionKind { + type Error = ParseGroupVersionError; + + fn try_from(tm: TypeMeta) -> Result { + Ok(GroupVersion::from_str(&tm.api_version)?.with_kind(&tm.kind)) + } +} + /// Core information about a family of API Resources #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct GroupVersion { @@ -47,6 +63,15 @@ impl GroupVersion { let group = group_.to_string(); Self { group, version } } + + /// Upgrade a GroupVersion to a GroupVersionKind + pub fn with_kind(self, kind: &str) -> GroupVersionKind { + GroupVersionKind { + group: self.group, + version: self.version, + kind: kind.into(), + } + } } impl FromStr for GroupVersion { @@ -118,3 +143,24 @@ impl GroupVersionResource { } } } + +#[cfg(test)] +mod tests { + #[test] + fn gvk_yaml() { + use crate::{GroupVersionKind, TypeMeta}; + let input = r#"--- +apiVersion: kube.rs/v1 +kind: Example +metadata: + name: doc1 +"#; + let tm: TypeMeta = serde_yaml::from_str(input).unwrap(); + let gvk = GroupVersionKind::try_from(&tm).unwrap(); // takes ref + let gvk2: GroupVersionKind = tm.try_into().unwrap(); // takes value + assert_eq!(gvk.kind, "Example"); + assert_eq!(gvk.group, "kube.rs"); + assert_eq!(gvk.version, "v1"); + assert_eq!(gvk.kind, gvk2.kind); + } +} diff --git a/vendor/kube-core/src/resource.rs b/vendor/kube-core/src/resource.rs index 1cf51bbb91de1..281db65717ce2 100644 --- a/vendor/kube-core/src/resource.rs +++ b/vendor/kube-core/src/resource.rs @@ -1,5 +1,9 @@ pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; -use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference}; +use k8s_openapi::{ + api::core::v1::ObjectReference, + apimachinery::pkg::apis::meta::v1::{ManagedFieldsEntry, OwnerReference, Time}, +}; + use std::{borrow::Cow, collections::BTreeMap}; /// An accessor trait for a kubernetes Resource. @@ -154,6 +158,10 @@ pub trait ResourceExt: Resource { /// Unique ID (if you delete resource and then create a new /// resource with the same name, it will have different ID) fn uid(&self) -> Option; + /// Returns the creation timestamp + /// + /// This is guaranteed to exist on resources received by the apiserver. + fn creation_timestamp(&self) -> Option