Skip to content

Commit

Permalink
Merge pull request #52 from HewlettPackard/message-ordering
Browse files Browse the repository at this point in the history
Add optional support for message reordering
  • Loading branch information
timothyb89 authored Jul 1, 2020
2 parents fc03670 + 29f13c7 commit f5f6047
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 5 deletions.
15 changes: 15 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,21 @@ pub struct Config {
#[structopt(long, env = "WD_REGEXES")]
pub regexes: Option<RegexConfig>,

/// If set, attempts to ensure messages are displayed in semantic order by
/// placing them in a priority queue for a short period before being written
/// to the renderer. By default, messages are held for one second; this can be
/// overridden with `--buffer-ms`.
///
/// Particularly useful when paired with readers that ingest from multiple
/// sources at once, like the Kubernetes reader.
#[structopt(long, short = "o")]
pub ordered: bool,

/// Sets the length of time, in milliseconds, that messages should be buffered
/// to attempt to reorder them. If set, implies `--ordered`.
#[structopt(long)]
pub buffer_ms: Option<u64>,

#[structopt(flatten)]
pub kubernetes: KubernetesConfig
}
27 changes: 22 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,28 @@ fn main() -> Result<(), Box<dyn Error>> {
let (exit_req_tx, exit_req_rx) = channel();
let (exit_resp_tx, exit_resp_rx) = channel();

reader_impl(
Arc::clone(&config),
entry_tx,
exit_req_rx, exit_resp_tx
);
if config.ordered || config.buffer_ms.is_some() {
// if --ordered or --buffer-ms, wrap the reader in read_ordered
let (ord_tx, ord_rx) = channel();

reader_impl(
Arc::clone(&config),
ord_tx,
exit_req_rx, exit_resp_tx
);

reader::read_ordered(
Arc::clone(&config),
ord_rx,
entry_tx
);
} else {
reader_impl(
Arc::clone(&config),
entry_tx,
exit_req_rx, exit_resp_tx
);
}

renderer.join().expect("renderer thread did not exit cleanly");

Expand Down
2 changes: 2 additions & 0 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ pub mod stdin;
pub mod stdin_hack;
pub mod kubernetes;
pub mod null;
pub mod ordered;

pub use types::Reader;
pub use stdin::read_stdin;
pub use stdin_hack::read_stdin_hack;
pub use kubernetes::read_kubernetes_selector;
pub use null::read_null;
pub(crate) use ordered::read_ordered;
145 changes: 145 additions & 0 deletions src/reader/ordered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// (C) Copyright 2020 Hewlett Packard Enterprise Development LP

use std::collections::BinaryHeap;
use std::cmp::{Ord, Ordering, PartialEq, PartialOrd};
use std::sync::Arc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};

use chrono::Utc;
use simple_error::SimpleResult;

use crate::config::Config;
use crate::parser::MessageKind;
use crate::renderer::{LogEntry, MessageEntry};

/// The default length of time messages should be held in the buffer
const DEFAULT_BUFFER_MS: u64 = 1000;

/// A wrapped struct since we need an extra timestamp
struct TimestampedEntry {
/// Monotonic instant that this message was received from the underlying
/// reader, used to evict messages that have been held for too long
received: Instant,

/// UTC timestamp as determined by the underlying reader, potentially from
/// message metadata
///
/// This should be the desired sort order where possible
timestamp: i64,

/// The wrapped MessageEntry
entry: MessageEntry
}

impl TimestampedEntry {
fn new(entry: MessageEntry) -> Self {
// fall back to the system timestamp if none exists
let timestamp = if let Some(timestamp) = &entry.message.timestamp {
timestamp.timestamp_millis()
} else if let Some(meta) = &entry.message.reader_metadata {
if let Some(timestamp) = &meta.timestamp {
timestamp.timestamp_millis()
} else {
Utc::now().timestamp_millis()
}
} else {
Utc::now().timestamp_millis()
};

TimestampedEntry {
received: Instant::now(),
timestamp,
entry
}
}
}

impl Ord for TimestampedEntry {
fn cmp(&self, other: &Self) -> Ordering {
// note: intentionally inverted (we want a min-heap)
other.timestamp.cmp(&self.timestamp)
}
}

impl PartialOrd for TimestampedEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.timestamp.partial_cmp(&self.timestamp)
}
}

impl PartialEq for TimestampedEntry {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}

impl Eq for TimestampedEntry {}

/// A wrapping reader that attempts to reorder incoming message from another
/// reader such that timestamps stay (more) sequential.
pub fn read_ordered(
config: Arc<Config>,
rx: Receiver<LogEntry>,
tx: Sender<LogEntry>,
) -> JoinHandle<SimpleResult<()>> {
thread::Builder::new().name("read_ordered".to_string()).spawn(move || {
let buffer_duration = Duration::from_millis(
config.buffer_ms.unwrap_or(DEFAULT_BUFFER_MS)
);

tx.send(LogEntry::internal(&format!(
"note: attempting to reorder messages, buffer: {}ms",
buffer_duration.as_millis()
))).ok();

// TODO: we could probably async-ify this and remove the need for sleep(100)
// This could create issues with multiple runtimes for e.g. the kubernetes
// reader, though.
let mut heap: BinaryHeap<TimestampedEntry> = BinaryHeap::new();

'outer: loop {
thread::sleep(Duration::from_millis(100));

// first, drain all incoming messages into the heap
for unbuffered_entry in rx.try_iter() {
if let Some(message) = unbuffered_entry.message {
// immediately pass through internal messages
if let MessageKind::Internal = message.message.kind {
tx.send(LogEntry {
message: Some(message),
eof: None
}).ok();
} else {
heap.push(TimestampedEntry::new(message));
}

} else if let Some(_) = unbuffered_entry.eof {
// quit and send immediately (buffered messages will be discarded)
tx.send(LogEntry::eof()).ok();
break 'outer;
}
}

// now, drain (in order) all messages from the top of the heap that pass
// the eviction deadline
let current_time = Instant::now();
while let Some(entry) = heap.peek() {
if current_time.duration_since(entry.received) >= buffer_duration {
// if this unwrap fails, we have bigger problems...
let real_entry = heap.pop().unwrap().entry;

tx.send(LogEntry {
message: Some(real_entry),
eof: None
}).ok();
} else {
break;
}
}
}

Ok(())
}).unwrap()
}

0 comments on commit f5f6047

Please sign in to comment.