Skip to content

Commit

Permalink
fix(pegboard): implement cleanup, rebuild, fix queries
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 16, 2024
1 parent 3d64456 commit 87abc50
Show file tree
Hide file tree
Showing 13 changed files with 716 additions and 420 deletions.
16 changes: 14 additions & 2 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,25 @@ the internal location.
`std::collections::HashMap` does not implement `Hash`. To get around this, use `util::serde::HashableMap`:

```rust
use util::serde::AsHashableExt;

struct Input {
map: HashMap<..., ...>,
}

// ...

ctx
.activity(MyActivityInput {
map: input.map.as_hashable(),
map: input.map.into(),
})
.await?;

// ...

struct MyActivityInput {
map: util::serde::HashableMap<..., ...>,
}

```

## Nested options with serde
Expand Down
4 changes: 3 additions & 1 deletion lib/bolt/core/src/tasks/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ async fn ip_inner(
"root",
"-i",
ssh_key.path(),
ip
"-L",
"9090:10.0.0.84:8080",
ip,
)
.run()
}
Expand Down
133 changes: 73 additions & 60 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
time::Duration,
};

use anyhow::*;
Expand All @@ -23,8 +23,8 @@ mod setup;

/// How often to check for a PID when one is not present and a stop command was received.
const STOP_PID_INTERVAL: Duration = std::time::Duration::from_millis(250);
/// How long to wait until no longer waiting for a PID when a stop command was received.
const STOP_PID_TIMEOUT: Duration = std::time::Duration::from_secs(30);
/// How many times to check for a PID when a stop command was received.
const STOP_PID_RETRIES: usize = 32;
/// How often to check that a PID is still running when observing container state.
const PID_POLL_INTERVAL: Duration = std::time::Duration::from_millis(1000);
const VECTOR_SOCKET_ADDR: &str = "127.0.0.1:5021";
Expand All @@ -38,40 +38,50 @@ enum ObservationState {

pub struct Container {
container_id: Uuid,
config: protocol::ContainerConfig,

pid: Mutex<Option<Pid>>,
}

impl Container {
pub fn new(container_id: Uuid) -> Arc<Self> {
pub fn new(container_id: Uuid, config: protocol::ContainerConfig) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(None),
})
}

pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting container");
pub fn with_pid(container_id: Uuid, config: protocol::ContainerConfig, pid: Pid) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(Some(pid)),
})
}

pub async fn start(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting");

// Write container to DB
let config_json = serde_json::to_vec(&self.config)?;
utils::query(|| async {
// NOTE: On conflict here in case this query runs but the command is not acknowledged
sqlx::query(indoc!(
"
INSERT INTO containers (
container_id,
config,
start_ts
)
VALUES (?1, ?2)
VALUES (?1, ?2, ?3)
ON CONFLICT (container_id) DO NOTHING
",
))
.bind(self.container_id)
.bind(&config_json)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
Expand All @@ -84,75 +94,75 @@ impl Container {
})
.await?;

{
let s = self.clone();
let ctx = ctx.clone();
// Lifecycle
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
use std::result::Result::{Err, Ok};

tokio::spawn(async move {
if let Err(err) = s.setup(&ctx, config).await {
tracing::error!(?err, "container run failed");
match self2.setup(&ctx2).await {
Ok(container_runner_path) => match self2.run(&ctx2, container_runner_path).await {
Ok(pid) => {
if let Err(err) = self2.observe(&ctx2, pid).await {
tracing::error!(container_id=?self2.container_id, ?err, "observe failed");
}
}
Err(err) => {
tracing::error!(container_id=?self2.container_id, ?err, "run failed")
}
},
Err(err) => tracing::error!(container_id=?self2.container_id, ?err, "setup failed"),
}

// Cleanup
let mut containers = ctx.containers.write().await;
containers.remove(&s.container_id);
}
});
}
// Cleanup afterwards
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn setup(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
async fn setup(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<PathBuf> {
tracing::info!(container_id=?self.container_id, "setting up");

let container_path = ctx.container_path(self.container_id);

fs::create_dir(&container_path).await?;

// Download container runner
let container_runner_path = ctx
.fetch_container_runner(&config.container_runner_binary_url)
.fetch_container_runner(&self.config.container_runner_binary_url)
.await?;

setup::cni_bundle(self.container_id, &config, &ctx).await?;
self.setup_cni_bundle(&ctx).await?;

// Run CNI setup script
if let protocol::NetworkMode::Bridge = config.network_mode {
setup::cni_network(self.container_id, &config, &ctx).await?;
if let protocol::NetworkMode::Bridge = self.config.network_mode {
self.setup_cni_network(&ctx).await?;
}

Ok(container_runner_path)
}

async fn run(self: &Arc<Self>, ctx: &Arc<Ctx>, container_runner_path: PathBuf) -> Result<Pid> {
tracing::info!(container_id=?self.container_id, "spawning");

let mut runner_env = vec![
(
"PEGBOARD_META_root_user_enabled",
config.root_user_enabled.to_string(),
self.config.root_user_enabled.to_string(),
),
(
"PEGBOARD_META_vector_socket_addr",
VECTOR_SOCKET_ADDR.to_string(),
),
];
runner_env.extend(config.stakeholder.env());

self.run(ctx, container_runner_path, &runner_env).await?;

Ok(())
}

async fn run(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
container_runner_path: PathBuf,
env: &[(&str, String)],
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "spawning");
runner_env.extend(self.config.stakeholder.env());

// Spawn runner which spawns the container
let pid = setup::spawn_orphaned_container_runner(
container_runner_path,
ctx.container_path(self.container_id),
&env,
&runner_env,
)?;

tracing::info!(container_id=?self.container_id, ?pid, "pid received");
Expand All @@ -168,12 +178,14 @@ impl Container {
"
UPDATE containers
SET
running_ts = ?2
running_ts = ?2 AND
pid = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(pid.as_raw())
.execute(&mut *ctx.sql().await?)
.await
})
Expand All @@ -187,13 +199,13 @@ impl Container {
})
.await?;

self.observe(ctx, pid).await?;

Ok(())
Ok(pid)
}

// Watch container for updates
async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
pub(crate) async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
tracing::info!(container_id=?self.container_id, ?pid, "observing");

let exit_code_path = ctx.container_path(self.container_id).join("exit-code");
let proc_path = Path::new("/proc").join(pid.to_string());

Expand Down Expand Up @@ -288,29 +300,29 @@ impl Container {
})
.await?;

tracing::info!(container_id=?self.container_id, "container complete");
tracing::info!(container_id=?self.container_id, "complete");

Ok(())
}

pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "stopping");

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
}

// Cleanup regardless
let mut containers = ctx2.containers.write().await;
containers.remove(&self2.container_id);
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
let now = Instant::now();
let mut i = 0;

let pid = loop {
if let Some(pid) = *self.pid.lock().await {
Expand All @@ -319,14 +331,15 @@ impl Container {

tracing::warn!(container_id=?self.container_id, "waiting for pid to stop workflow");

if now.elapsed() > STOP_PID_TIMEOUT {
if i > STOP_PID_RETRIES {
tracing::error!(
container_id=?self.container_id,
"timed out waiting for container to get PID, considering container stopped",
);

break None;
}
i += 1;

tokio::time::sleep(STOP_PID_INTERVAL).await;
};
Expand All @@ -353,7 +366,7 @@ impl Container {

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopping,
state: protocol::ContainerState::Stopped,
})
.await?;

Expand Down
Loading

0 comments on commit 87abc50

Please sign in to comment.