Skip to content

Commit

Permalink
Allow --namespace to be repeated to watch multiple namespaces
Browse files Browse the repository at this point in the history
This allows users to specify `--namespace` as many times as desired
to watch across namespaces.

Note that all selectors selectors will be matched to pods in each
namespace, so it may be desirable to use multiple selectors that may
not apply to each namespace.
  • Loading branch information
timothyb89 committed Mar 17, 2021
1 parent f5f6047 commit 2994629
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 29 deletions.
18 changes: 9 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl FromStr for RendererType {
"interactive" => Ok(RendererType::Interactive),
_ => bail!(format!("invalid renderer type: {}", s))
}
}
}
}

fn get_auto_reader(config: Arc<Config>) -> reader::Reader {
Expand All @@ -86,7 +86,7 @@ fn get_auto_reader(config: Arc<Config>) -> reader::Reader {
}
}

if config.kubernetes.namespace.is_some() {
if !config.kubernetes.namespaces.is_empty() {
return reader::read_kubernetes_selector;
}

Expand Down Expand Up @@ -136,17 +136,17 @@ impl FromStr for ReaderType {
#[structopt(rename_all = "kebab-case")]
pub struct KubernetesConfig {
/// kubectl path override
///
///
/// Path to kubectl. If unset, searches $PATH.
#[structopt(long, short = "k", env = "WD_KUBECTL")]
pub kubectl: Option<String>,

/// Kubernetes namespace to use read
#[structopt(long, short = "n", env = "WD_NAMESPACE")]
pub namespace: Option<String>,
/// Kubernetes namespace to read. May be repeated for multiple namespaces.
#[structopt(long = "namespace", short = "n", env = "WD_NAMESPACE")]
pub namespaces: Vec<String>,

/// Local kubernetes proxy port
///
///
/// A kubernetes API proxy will be spawned on this port over the loopback
/// interface. If unset, a random port will be selected.
#[structopt(long, short = "p", env = "WD_K8S_PORT")]
Expand Down Expand Up @@ -235,7 +235,7 @@ impl FromStr for RegexConfig {
)]
pub struct Config {
/// Renderer to use, one of: auto, plain, json, styled, interactive
///
///
/// If auto, will is determined by terminal and whether or not output will be
/// redirected. Automatic preference may be overridden with
/// --preferred-renderer, otherwise --renderer will force use of the given
Expand All @@ -247,7 +247,7 @@ pub struct Config {
///
/// When --renderer=auto, this controls the preferred default renderer if no
/// conditions exist that would otherwise select a different renderer.
///
///
/// For example, if you dislike the interactive renderer but still wish to
/// automatically fall back to plaintext output when piped, use
/// --preferred-renderer=styled.
Expand Down
48 changes: 28 additions & 20 deletions src/reader/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ use crate::parser::util::normalize_datetime;

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct Container {
namespace: String,
pod: String,
container: String,
siblings: usize
}

impl Container {
pub fn new(pod: String, container: String, siblings: usize) -> Self {
Container { pod, container, siblings }
pub fn new(
namespace: String, pod: String, container: String, siblings: usize
) -> Self {
Container { namespace, pod, container, siblings }
}
}

Expand Down Expand Up @@ -139,7 +142,9 @@ fn get_containers(pod: &KubernetesPod) -> Vec<Container> {
let siblings = pod.spec.containers.len();
for container in &pod.spec.containers {
ret.push(Container::new(
pod_name.clone(), container.name.clone(),
pod.metadata.namespace.clone(),
pod_name.clone(),
container.name.clone(),
siblings
));
}
Expand Down Expand Up @@ -232,7 +237,7 @@ fn wrap_watch(
))
.query(&query)
.send().map_err(SimpleError::from)?;

if !response.status().is_success() {
return Err(SimpleError::new("failed to list pods in namespace"))
}
Expand Down Expand Up @@ -388,7 +393,7 @@ fn parse_line<'a>(

fn follow_log(
config: Arc<Config>,
namespace: String, port: u16,
port: u16,
container: Container,
tx: Sender<LogEntry>
) {
Expand Down Expand Up @@ -417,7 +422,7 @@ fn follow_log(
}

// check to make sure the container still exists
if should_stop_following(&namespace, port, &container, tx.clone()) {
if should_stop_following(&container.namespace, port, &container, tx.clone()) {
break;
}

Expand All @@ -434,7 +439,7 @@ fn follow_log(
let maybe_response = client
.get(&format!(
"http://localhost:{port}/api/v1/namespaces/{namespace}/pods/{pod}/log",
port = port, namespace = namespace, pod = &container.pod
port = port, namespace = container.namespace, pod = &container.pod
))
.query(&query)
.send();
Expand Down Expand Up @@ -507,7 +512,7 @@ fn follow_log(
thread::sleep(Duration::from_millis(500));

// decide if we should restart the log
if should_stop_following(&namespace, port, &container, tx.clone()) {
if should_stop_following(&container.namespace, port, &container, tx.clone()) {
break;
}
}
Expand Down Expand Up @@ -539,7 +544,7 @@ fn spawn_kubectl(config: Arc<Config>) -> SimpleResult<(Popen, u16)> {
..Default::default()
}).map_err(SimpleError::from)?;

// wait a bit to see if it exits
// wait a bit to see if it exits
thread::sleep(Duration::from_millis(250));

if child.poll().is_some() {
Expand All @@ -565,7 +570,7 @@ fn kubectl_get_namespace() -> SimpleResult<String> {
.stderr(Redirection::Pipe)
.capture()
.map_err(SimpleError::from)?;

if data.success() {
let output = data.stdout_str();
if output.is_empty() {
Expand All @@ -581,28 +586,32 @@ fn kubectl_get_namespace() -> SimpleResult<String> {
}
}

fn list_namespaces(config: &Config) -> SimpleResult<Vec<String>> {
if !config.kubernetes.namespaces.is_empty() {
return Ok(config.kubernetes.namespaces.clone())
} else {
return Ok(vec![kubectl_get_namespace()?])
}
}

pub fn read_kubernetes_selector(
config: Arc<Config>,
tx: Sender<LogEntry>,
exit_req_rx: Receiver<()>,
exit_resp_tx: Sender<()>
) -> JoinHandle<SimpleResult<()>> {
thread::Builder::new().name("read_kubernetes_selector".to_string()).spawn(move || {
let namespace = if let Some(namespace) = &config.kubernetes.namespace {
namespace.clone()
} else {
kubectl_get_namespace()?
};

let (mut kubectl, port) = spawn_kubectl(Arc::clone(&config))?;
tx.send(LogEntry::internal(
&format!("started kubernetes api proxy on port {}", port)
)).ok();

let (event_tx, event_rx) = channel();
watch_events(
Arc::clone(&config), namespace.clone(), port, tx.clone(), event_tx
);
for namespace in list_namespaces(&config)? {
watch_events(
Arc::clone(&config), namespace.clone(), port, tx.clone(), event_tx.clone()
);
}

loop {
thread::sleep(Duration::from_millis(100));
Expand All @@ -616,7 +625,6 @@ pub fn read_kubernetes_selector(
PodEvent::Added(container) => {
follow_log(
Arc::clone(&config),
namespace.clone(),
port,
container,
tx.clone()
Expand Down

0 comments on commit 2994629

Please sign in to comment.