Skip to content

Commit

Permalink
fix(pegboard): implement port choosing
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 29, 2024
1 parent d6c0411 commit 7f5b5fe
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 60 deletions.
1 change: 1 addition & 0 deletions lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ lazy_static = "1.4"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
prometheus = "0.13"
rand = "0.8"
reqwest = { version = "0.11", features = ["stream"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
Expand Down
86 changes: 57 additions & 29 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::*;
use futures_util::{stream::FuturesUnordered, FutureExt, StreamExt};
use indoc::indoc;
use nix::{
errno::Errno,
sys::signal::{kill, Signal},
unistd::Pid,
};
Expand Down Expand Up @@ -295,6 +296,22 @@ impl Container {
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
Expand All @@ -306,31 +323,31 @@ impl Container {
Ok(())
}

pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "stopping");
pub async fn signal(self: &Arc<Self>, ctx: &Arc<Ctx>, signal: Signal) -> Result<()> {
tracing::info!(container_id=?self.container_id, ?signal, "sending signal");

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
if let Err(err) = self2.signal_inner(&ctx2, signal).await {
tracing::error!(?err, "container signal failed");
}

self2.cleanup(&ctx2).await
});

Ok(())
}

async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
async fn signal_inner(self: &Arc<Self>, ctx: &Arc<Ctx>, signal: Signal) -> Result<()> {
let mut i = 0;

// Signal command might be sent before the container has a valid PID. This loop waits for the PID to
// be set
let pid = loop {
if let Some(pid) = *self.pid.lock().await {
break Some(pid);
}

tracing::warn!(container_id=?self.container_id, "waiting for pid to stop workflow");
tracing::warn!(container_id=?self.container_id, "waiting for pid to signal container");

if i > STOP_PID_RETRIES {
tracing::error!(
Expand All @@ -345,31 +362,42 @@ impl Container {
tokio::time::sleep(STOP_PID_INTERVAL).await;
};

// Kill if PID found
// Kill if PID set
if let Some(pid) = pid {
kill(pid, Signal::SIGTERM)?;
use std::result::Result::{Err, Ok};

match kill(pid, signal) {
Ok(_) => {}
Err(Errno::ESRCH) => {
tracing::warn!(container_id=?self.container_id, ?pid, "pid not found for signalling")
}
Err(err) => return Err(err.into()),
}
}

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;
// Update stop_ts
if matches!(signal, Signal::SIGTERM) || pid.is_none() {
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
}

Ok(())
}
Expand Down
152 changes: 136 additions & 16 deletions lib/pegboard/manager/src/container/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use nix::{
unistd::{fork, pipe, read, write, ForkResult, Pid},
};
use pegboard::protocol;
use rand::Rng;
use serde_json::{json, Value};
use tokio::{
fs::{self, File},
Expand All @@ -20,9 +21,11 @@ use tokio::{
};

use super::{oci_config, Container};
use crate::ctx::Ctx;
use crate::{ctx::Ctx, utils};

const NETWORK_NAME: &str = "rivet-pegboard";
const MIN_INGRESS_PORT: u16 = 20000;
const MAX_INGRESS_PORT: u16 = 31999;

impl Container {
pub async fn setup_oci_bundle(&self, ctx: &Ctx) -> Result<()> {
Expand Down Expand Up @@ -334,21 +337,7 @@ impl Container {

tracing::info!(container_id=?self.container_id, "writing cni params");

let cni_port_mappings = self
.config
.ports
.iter()
.map(|(_, port)| {
// Pick random port that isn't taken
let host_port = todo!();

json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
})
.collect::<Vec<_>>();
let cni_port_mappings = self.bind_ports(ctx).await?;

// MARK: Generate CNI parameters
//
Expand Down Expand Up @@ -414,6 +403,137 @@ impl Container {
Ok(())
}

pub(crate) async fn bind_ports(&self, ctx: &Ctx) -> Result<Vec<serde_json::Value>> {
let mut tcp_count = 0;
let mut udp_count = 0;

// Count ports
for (_, port) in &self.config.ports {
match port.proxy_protocol {
protocol::TransportProtocol::Tcp => tcp_count += 1,
protocol::TransportProtocol::Udp => udp_count += 1,
}
}

let max = MAX_INGRESS_PORT - MIN_INGRESS_PORT;
let tcp_offset = rand::thread_rng().gen_range(0..max);
let udp_offset = rand::thread_rng().gen_range(0..max);

// Selects available TCP and UDP ports
let rows = utils::query(|| async {
sqlx::query_as::<_, (i64, i64)>(indoc!(
"
INSERT INTO container_ports (container_id, port, protocol)
SELECT ?1, port, protocol
-- Select TCP ports
FROM (
WITH RECURSIVE
nums(n, i) AS (
SELECT ?4, ?4
UNION ALL
SELECT (n + 1) % (?6 + 1), i + 1
FROM nums
WHERE i < ?6 + ?4
),
available_ports(port) AS (
SELECT nums.n
FROM nums
LEFT JOIN container_ports AS p
ON
nums.n = p.port AND
p.protocol = 0 AND
delete_ts IS NULL
WHERE
p.port IS NULL OR
delete_ts IS NOT NULL
LIMIT ?2
)
SELECT port, 0 AS protocol FROM available_ports
)
UNION ALL
SELECT ?1, port, protocol
-- Select UDP ports
FROM (
WITH RECURSIVE
nums(n, i) AS (
SELECT ?5, ?5
UNION ALL
SELECT (n + 1) % (?6 + 1), i + 1
FROM nums
WHERE i < ?6 + ?5
),
available_ports(port) AS (
SELECT nums.n
FROM nums
LEFT JOIN container_ports AS p
ON
nums.n = p.port AND
p.protocol = 1 AND
delete_ts IS NULL
WHERE
p.port IS NULL OR
delete_ts IS NOT NULL
LIMIT ?3
)
SELECT port, 1 AS protocol FROM available_ports
)
RETURNING port, protocol
",
))
.bind(self.container_id)
.bind(tcp_count as i64) // ?2
.bind(udp_count as i64) // ?3
.bind(tcp_offset as i64) // ?4
.bind(udp_offset as i64) // ?5
.bind(max as i64) // ?6
.fetch_all(&mut *ctx.sql().await?)
.await
})
.await?;

if rows.len() != tcp_count + udp_count {
bail!("not enough available ports");
}

let cni_port_mappings = self
.config
.ports
.iter()
.filter(|(_, port)| matches!(port.proxy_protocol, protocol::TransportProtocol::Tcp))
.zip(
rows.iter()
.filter(|(_, protocol)| *protocol == protocol::TransportProtocol::Tcp as i64),
)
.map(|((_, port), (host_port, _))| {
json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
})
.chain(
self.config
.ports
.iter()
.filter(|(_, port)| {
matches!(port.proxy_protocol, protocol::TransportProtocol::Udp)
})
.zip(rows.iter().filter(|(_, protocol)| {
*protocol == protocol::TransportProtocol::Udp as i64
}))
.map(|((_, port), (host_port, _))| {
json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
}),
)
.collect::<Vec<_>>();

Ok(cni_port_mappings)
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
use std::result::Result::{Err, Ok};
Expand Down
9 changes: 6 additions & 3 deletions lib/pegboard/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,12 @@ impl Ctx {
// Spawn container
container.start(&self).await?;
}
protocol::Command::StopContainer { container_id } => {
protocol::Command::SignalContainer {
container_id,
signal,
} => {
if let Some(container) = self.containers.read().await.get(&container_id) {
container.stop(&self).await?;
container.signal(&self, signal.try_into()?).await?;
} else {
tracing::warn!(
?container_id,
Expand Down Expand Up @@ -318,7 +321,7 @@ impl Ctx {
"
SELECT container_id, config, pid
FROM containers
WHERE stop_ts IS NULL AND exit_ts IS NULL
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
Expand Down
33 changes: 33 additions & 0 deletions lib/pegboard/manager/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE TABLE IF NOT EXISTS container_ports (
container_id TEXT NOT NULL, -- UUID
port INT NOT NULL,
protocol INT NOT NULL, -- protocol::TransportProtocol
delete_ts INT
)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE INDEX IF NOT EXISTS container_ports_id_idx
ON container_ports(container_id)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE UNIQUE INDEX IF NOT EXISTS container_ports_unique_idx
ON container_ports(port, protocol)
WHERE delete_ts IS NULL
",
))
.execute(&mut *conn)
.await?;

Ok(())
}

Expand Down
Loading

0 comments on commit 7f5b5fe

Please sign in to comment.