Skip to content

Commit

Permalink
feat(pegboard): get container connection e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 25, 2024
1 parent d675f4b commit dba3125
Show file tree
Hide file tree
Showing 53 changed files with 878 additions and 411 deletions.
2 changes: 1 addition & 1 deletion lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ServiceContextData {
}

pub fn depends_on_cluster_config(&self) -> bool {
self.name() == "cluster-default-update"
self.name() == "cluster-default-update" || self.name() == "pegboard-dc-init"
}

pub fn depends_on_provision_margin(&self) -> bool {
Expand Down
142 changes: 93 additions & 49 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

Expand Down Expand Up @@ -42,6 +45,7 @@ pub struct Container {
config: protocol::ContainerConfig,

pid: Mutex<Option<Pid>>,
exited: AtomicBool,
}

impl Container {
Expand All @@ -51,6 +55,7 @@ impl Container {
config,

pid: Mutex::new(None),
exited: AtomicBool::new(false),
})
}

Expand All @@ -60,6 +65,7 @@ impl Container {
config,

pid: Mutex::new(Some(pid)),
exited: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -198,7 +204,7 @@ impl Container {
"
UPDATE containers
SET
running_ts = ?2 AND
running_ts = ?2,
pid = ?3
WHERE container_id = ?1
",
Expand Down Expand Up @@ -284,58 +290,31 @@ impl Container {
};

let exit_code = if let ObservationState::Exited = state {
Some(
fs::read_to_string(&exit_code_path)
.await?
.trim()
.parse::<i32>()?,
)
use std::result::Result::Ok;
match fs::read_to_string(&exit_code_path).await {
Ok(contents) => match contents.trim().parse::<i32>() {
Ok(x) => Some(x),
Err(err) => {
tracing::error!(?err, "failed to parse exit code file");

None
}
},
Err(err) => {
tracing::error!(?err, "failed to read exit code file");

None
}
}
} else {
tracing::warn!(?pid, "process died before exit code file was written");

None
};

tracing::info!(container_id=?self.container_id, ?exit_code, "received exit code");

// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET
exit_ts = ?2 AND
exit_code = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(exit_code)
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;
tracing::info!(container_id=?self.container_id, ?exit_code, "exited");

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
})
.await?;
self.set_exit_code(ctx, exit_code).await?;

tracing::info!(container_id=?self.container_id, "complete");

Expand Down Expand Up @@ -427,4 +406,69 @@ impl Container {

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn set_exit_code(&self, ctx: &Ctx, exit_code: Option<i32>) -> Result<()> {
// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET
exit_ts = ?2,
exit_code = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(exit_code)
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
})
.await?;

self.exited.store(true, Ordering::SeqCst);

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
tracing::info!(container_id=?self.container_id, "cleaning up");

{
// Cleanup ctx
let mut containers = ctx.containers.write().await;
containers.remove(&self.container_id);
}

if !self.exited.load(Ordering::SeqCst) {
self.set_exit_code(ctx, None).await?;
}

self.cleanup_setup(ctx).await
}
}
60 changes: 34 additions & 26 deletions lib/pegboard/manager/src/container/setup.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
io::Write,
os::unix::process::CommandExt,
path::{Path, PathBuf},
process::Stdio,
Expand All @@ -9,7 +10,7 @@ use futures_util::StreamExt;
use indoc::indoc;
use nix::{
sys::wait::{waitpid, WaitStatus},
unistd::{fork, pipe, read, write, ForkResult, Pid},
unistd::{fork, pipe, read, setsid, write, ForkResult, Pid},
};
use pegboard::protocol;
use rand::Rng;
Expand All @@ -35,20 +36,25 @@ impl Container {
let oci_bundle_path = container_path.join("oci-bundle");
let netns_path = self.netns_path();

let api_endpoint_env_var = {
let api_endpoint = ctx
.api_endpoint
.read()
.await
.clone()
.context("missing api endpoint")?;

format!("RIVET_API_ENDPOINT={api_endpoint}")
};

// Write base config
fs::write(
container_path.join("oci-bundle-config.base.json"),
serde_json::to_vec(&oci_config::config(
self.config.resources.cpu,
self.config.resources.memory,
self.config.resources.memory_max,
vec!["RIVET_API_ENDPOINT".to_string(), {
ctx.api_endpoint
.read()
.await
.clone()
.context("missing api endpoint")?
}],
vec![api_endpoint_env_var],
))?,
)
.await?;
Expand Down Expand Up @@ -446,6 +452,7 @@ impl Container {
}

let max = MAX_INGRESS_PORT - MIN_INGRESS_PORT;
// Add random spread to port selection
let tcp_offset = rand::thread_rng().gen_range(0..max);
let udp_offset = rand::thread_rng().gen_range(0..max);

Expand All @@ -454,8 +461,8 @@ impl Container {
sqlx::query_as::<_, (i64, i64)>(indoc!(
"
INSERT INTO container_ports (container_id, port, protocol)
SELECT ?1, port, protocol
-- Select TCP ports
SELECT ?1, port, protocol
FROM (
WITH RECURSIVE
nums(n, i) AS (
Expand All @@ -481,8 +488,8 @@ impl Container {
SELECT port, 0 AS protocol FROM available_ports
)
UNION ALL
SELECT ?1, port, protocol
-- Select UDP ports
SELECT ?1, port, protocol
FROM (
WITH RECURSIVE
nums(n, i) AS (
Expand Down Expand Up @@ -569,17 +576,9 @@ impl Container {
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
pub async fn cleanup_setup(&self, ctx: &Ctx) -> Result<()> {
use std::result::Result::{Err, Ok};

tracing::info!(container_id=?self.container_id, "cleaning up");

{
// Cleanup ctx
let mut containers = ctx.containers.write().await;
containers.remove(&self.container_id);
}

match Command::new("runc")
.arg("delete")
.arg("--force")
Expand Down Expand Up @@ -656,7 +655,7 @@ impl Container {

// Path to the created namespace
fn netns_path(&self) -> PathBuf {
if let protocol::NetworkMode::Bridge = self.config.network_mode {
if let protocol::NetworkMode::Host = self.config.network_mode {
// Host network
Path::new("/proc/1/ns/net").to_path_buf()
} else {
Expand All @@ -674,6 +673,7 @@ pub fn spawn_orphaned_container_runner(
// Prepare the arguments for the container runner
let runner_args = vec![container_path.to_str().context("bad path")?];

// TODO: Do pipes have to be manually deleted here?
// Pipe communication between processes
let (pipe_read, pipe_write) = pipe()?;

Expand All @@ -689,9 +689,9 @@ pub fn spawn_orphaned_container_runner(
// Read the second child's PID from the pipe
let mut buf = [0u8; 4];
read(pipe_read, &mut buf)?;
let second_child_pid = Pid::from_raw(i32::from_le_bytes(buf));
let orphan_pid = Pid::from_raw(i32::from_le_bytes(buf));

Ok(second_child_pid)
Ok(orphan_pid)
}
WaitStatus::Exited(_, status) => {
bail!("Child process exited with status {}", status)
Expand All @@ -704,13 +704,23 @@ pub fn spawn_orphaned_container_runner(
match unsafe { fork() } {
Result::Ok(ForkResult::Parent { child }) => {
// Write the second child's PID to the pipe
let child_pid_bytes = child.as_raw().to_le_bytes();
write(pipe_write, &child_pid_bytes)?;
let orphan_pid_bytes = child.as_raw().to_le_bytes();
write(pipe_write, &orphan_pid_bytes)?;

// Write orphan PID to the containers cgroup so that it is no longer part of the parent cgroup.
// This is important for allowing systemd to restart pegboard without restarting orphans.
let mut cgroup_procs = std::fs::File::options()
.append(true)
.open(Path::new(utils::CGROUP_PATH).join("cgroup.procs"))?;
cgroup_procs.write_all(format!("{}\n", child.as_raw()).as_bytes())?;

// Exit the intermediate child
std::process::exit(0);
}
Result::Ok(ForkResult::Child) => {
// Disassociate from the parent by creating a new session
setsid().context("setsid failed")?;

// Exit immediately on fail in order to not leak process
let err = std::process::Command::new(&container_runner_path)
.args(&runner_args)
Expand All @@ -724,8 +734,6 @@ pub fn spawn_orphaned_container_runner(
}
Err(err) => {
// Exit immediately in order to not leak child process.
//
// The first fork doesn't need to exit on error since it
eprintln!("process second fork failed: {err:?}");
std::process::exit(1);
}
Expand Down
Loading

0 comments on commit dba3125

Please sign in to comment.