Skip to content

Commit

Permalink
feat(pegboard): get container connection e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 24, 2024
1 parent 394e1df commit 3c42b1f
Show file tree
Hide file tree
Showing 43 changed files with 680 additions and 308 deletions.
2 changes: 1 addition & 1 deletion lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ServiceContextData {
}

pub fn depends_on_cluster_config(&self) -> bool {
self.name() == "cluster-default-update"
self.name() == "cluster-default-update" || self.name() == "pegboard-dc-init"
}

pub fn depends_on_provision_margin(&self) -> bool {
Expand Down
142 changes: 93 additions & 49 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

Expand Down Expand Up @@ -42,6 +45,7 @@ pub struct Container {
config: protocol::ContainerConfig,

pid: Mutex<Option<Pid>>,
exited: AtomicBool,
}

impl Container {
Expand All @@ -51,6 +55,7 @@ impl Container {
config,

pid: Mutex::new(None),
exited: AtomicBool::new(false),
})
}

Expand All @@ -60,6 +65,7 @@ impl Container {
config,

pid: Mutex::new(Some(pid)),
exited: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -198,7 +204,7 @@ impl Container {
"
UPDATE containers
SET
running_ts = ?2 AND
running_ts = ?2,
pid = ?3
WHERE container_id = ?1
",
Expand Down Expand Up @@ -284,58 +290,31 @@ impl Container {
};

let exit_code = if let ObservationState::Exited = state {
Some(
fs::read_to_string(&exit_code_path)
.await?
.trim()
.parse::<i32>()?,
)
use std::result::Result::Ok;
match fs::read_to_string(&exit_code_path).await {
Ok(contents) => match contents.trim().parse::<i32>() {
Ok(x) => Some(x),
Err(err) => {
tracing::error!(?err, "failed to parse exit code file");

None
}
},
Err(err) => {
tracing::error!(?err, "failed to read exit code file");

None
}
}
} else {
tracing::warn!(?pid, "process died before exit code file was written");

None
};

tracing::info!(container_id=?self.container_id, ?exit_code, "received exit code");

// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET
exit_ts = ?2 AND
exit_code = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(exit_code)
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;
tracing::info!(container_id=?self.container_id, ?exit_code, "exited");

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
})
.await?;
self.set_exit_code(ctx, exit_code).await?;

tracing::info!(container_id=?self.container_id, "complete");

Expand Down Expand Up @@ -427,4 +406,69 @@ impl Container {

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn set_exit_code(&self, ctx: &Ctx, exit_code: Option<i32>) -> Result<()> {
// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET
exit_ts = ?2,
exit_code = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(exit_code)
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
})
.await?;

self.exited.store(true, Ordering::SeqCst);

Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
tracing::info!(container_id=?self.container_id, "cleaning up");

{
// Cleanup ctx
let mut containers = ctx.containers.write().await;
containers.remove(&self.container_id);
}

if !self.exited.load(Ordering::SeqCst) {
self.set_exit_code(ctx, None).await?;
}

self.cleanup_setup(ctx).await
}
}
31 changes: 14 additions & 17 deletions lib/pegboard/manager/src/container/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,25 @@ impl Container {
let oci_bundle_path = container_path.join("oci-bundle");
let netns_path = self.netns_path();

let api_endpoint_env_var = {
let api_endpoint = ctx
.api_endpoint
.read()
.await
.clone()
.context("missing api endpoint")?;

format!("RIVET_API_ENDPOINT={api_endpoint}")
};

// Write base config
fs::write(
container_path.join("oci-bundle-config.base.json"),
serde_json::to_vec(&oci_config::config(
self.config.resources.cpu,
self.config.resources.memory,
self.config.resources.memory_max,
vec!["RIVET_API_ENDPOINT".to_string(), {
ctx.api_endpoint
.read()
.await
.clone()
.context("missing api endpoint")?
}],
vec![api_endpoint_env_var],
))?,
)
.await?;
Expand Down Expand Up @@ -569,17 +574,9 @@ impl Container {
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
pub async fn cleanup_setup(&self, ctx: &Ctx) -> Result<()> {
use std::result::Result::{Err, Ok};

tracing::info!(container_id=?self.container_id, "cleaning up");

{
// Cleanup ctx
let mut containers = ctx.containers.write().await;
containers.remove(&self.container_id);
}

match Command::new("runc")
.arg("delete")
.arg("--force")
Expand Down Expand Up @@ -656,7 +653,7 @@ impl Container {

// Path to the created namespace
fn netns_path(&self) -> PathBuf {
if let protocol::NetworkMode::Bridge = self.config.network_mode {
if let protocol::NetworkMode::Host = self.config.network_mode {
// Host network
Path::new("/proc/1/ns/net").to_path_buf()
} else {
Expand Down
59 changes: 32 additions & 27 deletions lib/pegboard/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct ContainerRow {
container_id: Uuid,
config: Vec<u8>,
pid: Option<i32>,
stop_ts: Option<i64>,
}

pub struct Ctx {
Expand Down Expand Up @@ -175,7 +176,12 @@ impl Ctx {
self.send_packet(protocol::ToServer::Init {
last_command_idx,
system: protocol::SystemInfo {
cpu: self.system.cpus().first().context("no cpus")?.frequency(),
// Sum of cpu frequency
cpu: self
.system
.cpus()
.iter()
.fold(0, |s, cpu| s + cpu.frequency()),
memory: self.system.total_memory() / (1024 * 1024),
},
})
Expand Down Expand Up @@ -333,7 +339,7 @@ impl Ctx {
let container_rows = utils::query(|| async {
sqlx::query_as::<_, ContainerRow>(indoc!(
"
SELECT container_id, config, pid
SELECT container_id, config, pid, stop_ts
FROM containers
WHERE exit_ts IS NULL
",
Expand All @@ -343,34 +349,33 @@ impl Ctx {
})
.await?;

// NOTE: Sqlite doesn't support arrays, can't parallelize this easily
// Emit stop events
for row in &container_rows {
if row.pid.is_some() {
continue;
if row.pid.is_none() && stop_ts.is_none() {
tracing::error!(container_id=?row.container_id, "container has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
WHERE container_id = ?1
",
))
.bind(row.container_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ContainerStateUpdate {
container_id: row.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
}

tracing::error!("container has no pid, stopping");

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
WHERE container_id = ?1
",
))
.bind(row.container_id)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
})
.await?;

self.event(protocol::Event::ContainerStateUpdate {
container_id: row.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
}

// Start container observers
Expand Down
7 changes: 7 additions & 0 deletions lib/util/core/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,10 @@ impl<T> sqlx::postgres::PgHasArrayType for Raw<T> {
sqlx::postgres::PgTypeInfo::with_name("_jsonb")
}
}

impl<T> sqlx::postgres::PgHasArrayType for &Raw<T> {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
// JSONB array
sqlx::postgres::PgTypeInfo::with_name("_jsonb")
}
}
Loading

0 comments on commit 3c42b1f

Please sign in to comment.