From 299462926778938dc0c72077003542e10fdd5a1f Mon Sep 17 00:00:00 2001 From: Tim Buckley Date: Wed, 17 Mar 2021 11:39:29 -0600 Subject: [PATCH] Allow `--namespace` to be repeated to watch multiple namespaces This allows users to specify `--namespace` as many times as desired to watch across namespaces. Note that all selectors selectors will be matched to pods in each namespace, so it may be desirable to use multiple selectors that may not apply to each namespace. --- src/config.rs | 18 +++++++-------- src/reader/kubernetes.rs | 48 +++++++++++++++++++++++----------------- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/config.rs b/src/config.rs index da6eb5b..68b7b10 100644 --- a/src/config.rs +++ b/src/config.rs @@ -71,7 +71,7 @@ impl FromStr for RendererType { "interactive" => Ok(RendererType::Interactive), _ => bail!(format!("invalid renderer type: {}", s)) } - } + } } fn get_auto_reader(config: Arc) -> reader::Reader { @@ -86,7 +86,7 @@ fn get_auto_reader(config: Arc) -> reader::Reader { } } - if config.kubernetes.namespace.is_some() { + if !config.kubernetes.namespaces.is_empty() { return reader::read_kubernetes_selector; } @@ -136,17 +136,17 @@ impl FromStr for ReaderType { #[structopt(rename_all = "kebab-case")] pub struct KubernetesConfig { /// kubectl path override - /// + /// /// Path to kubectl. If unset, searches $PATH. #[structopt(long, short = "k", env = "WD_KUBECTL")] pub kubectl: Option, - /// Kubernetes namespace to use read - #[structopt(long, short = "n", env = "WD_NAMESPACE")] - pub namespace: Option, + /// Kubernetes namespace to read. May be repeated for multiple namespaces. + #[structopt(long = "namespace", short = "n", env = "WD_NAMESPACE")] + pub namespaces: Vec, /// Local kubernetes proxy port - /// + /// /// A kubernetes API proxy will be spawned on this port over the loopback /// interface. If unset, a random port will be selected. #[structopt(long, short = "p", env = "WD_K8S_PORT")] @@ -235,7 +235,7 @@ impl FromStr for RegexConfig { )] pub struct Config { /// Renderer to use, one of: auto, plain, json, styled, interactive - /// + /// /// If auto, will is determined by terminal and whether or not output will be /// redirected. Automatic preference may be overridden with /// --preferred-renderer, otherwise --renderer will force use of the given @@ -247,7 +247,7 @@ pub struct Config { /// /// When --renderer=auto, this controls the preferred default renderer if no /// conditions exist that would otherwise select a different renderer. - /// + /// /// For example, if you dislike the interactive renderer but still wish to /// automatically fall back to plaintext output when piped, use /// --preferred-renderer=styled. diff --git a/src/reader/kubernetes.rs b/src/reader/kubernetes.rs index 9858de9..4807b73 100644 --- a/src/reader/kubernetes.rs +++ b/src/reader/kubernetes.rs @@ -22,14 +22,17 @@ use crate::parser::util::normalize_datetime; #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct Container { + namespace: String, pod: String, container: String, siblings: usize } impl Container { - pub fn new(pod: String, container: String, siblings: usize) -> Self { - Container { pod, container, siblings } + pub fn new( + namespace: String, pod: String, container: String, siblings: usize + ) -> Self { + Container { namespace, pod, container, siblings } } } @@ -139,7 +142,9 @@ fn get_containers(pod: &KubernetesPod) -> Vec { let siblings = pod.spec.containers.len(); for container in &pod.spec.containers { ret.push(Container::new( - pod_name.clone(), container.name.clone(), + pod.metadata.namespace.clone(), + pod_name.clone(), + container.name.clone(), siblings )); } @@ -232,7 +237,7 @@ fn wrap_watch( )) .query(&query) .send().map_err(SimpleError::from)?; - + if !response.status().is_success() { return Err(SimpleError::new("failed to list pods in namespace")) } @@ -388,7 +393,7 @@ fn parse_line<'a>( fn follow_log( config: Arc, - namespace: String, port: u16, + port: u16, container: Container, tx: Sender ) { @@ -417,7 +422,7 @@ fn follow_log( } // check to make sure the container still exists - if should_stop_following(&namespace, port, &container, tx.clone()) { + if should_stop_following(&container.namespace, port, &container, tx.clone()) { break; } @@ -434,7 +439,7 @@ fn follow_log( let maybe_response = client .get(&format!( "http://localhost:{port}/api/v1/namespaces/{namespace}/pods/{pod}/log", - port = port, namespace = namespace, pod = &container.pod + port = port, namespace = container.namespace, pod = &container.pod )) .query(&query) .send(); @@ -507,7 +512,7 @@ fn follow_log( thread::sleep(Duration::from_millis(500)); // decide if we should restart the log - if should_stop_following(&namespace, port, &container, tx.clone()) { + if should_stop_following(&container.namespace, port, &container, tx.clone()) { break; } } @@ -539,7 +544,7 @@ fn spawn_kubectl(config: Arc) -> SimpleResult<(Popen, u16)> { ..Default::default() }).map_err(SimpleError::from)?; - // wait a bit to see if it exits + // wait a bit to see if it exits thread::sleep(Duration::from_millis(250)); if child.poll().is_some() { @@ -565,7 +570,7 @@ fn kubectl_get_namespace() -> SimpleResult { .stderr(Redirection::Pipe) .capture() .map_err(SimpleError::from)?; - + if data.success() { let output = data.stdout_str(); if output.is_empty() { @@ -581,6 +586,14 @@ fn kubectl_get_namespace() -> SimpleResult { } } +fn list_namespaces(config: &Config) -> SimpleResult> { + if !config.kubernetes.namespaces.is_empty() { + return Ok(config.kubernetes.namespaces.clone()) + } else { + return Ok(vec![kubectl_get_namespace()?]) + } +} + pub fn read_kubernetes_selector( config: Arc, tx: Sender, @@ -588,21 +601,17 @@ pub fn read_kubernetes_selector( exit_resp_tx: Sender<()> ) -> JoinHandle> { thread::Builder::new().name("read_kubernetes_selector".to_string()).spawn(move || { - let namespace = if let Some(namespace) = &config.kubernetes.namespace { - namespace.clone() - } else { - kubectl_get_namespace()? - }; - let (mut kubectl, port) = spawn_kubectl(Arc::clone(&config))?; tx.send(LogEntry::internal( &format!("started kubernetes api proxy on port {}", port) )).ok(); let (event_tx, event_rx) = channel(); - watch_events( - Arc::clone(&config), namespace.clone(), port, tx.clone(), event_tx - ); + for namespace in list_namespaces(&config)? { + watch_events( + Arc::clone(&config), namespace.clone(), port, tx.clone(), event_tx.clone() + ); + } loop { thread::sleep(Duration::from_millis(100)); @@ -616,7 +625,6 @@ pub fn read_kubernetes_selector( PodEvent::Added(container) => { follow_log( Arc::clone(&config), - namespace.clone(), port, container, tx.clone()