Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pegboard): implement port choosing #1155

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ lazy_static = "1.4"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
prometheus = "0.13"
rand = "0.8"
reqwest = { version = "0.11", features = ["stream"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
Expand Down
86 changes: 57 additions & 29 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::*;
use futures_util::{stream::FuturesUnordered, FutureExt, StreamExt};
use indoc::indoc;
use nix::{
errno::Errno,
sys::signal::{kill, Signal},
unistd::Pid,
};
Expand Down Expand Up @@ -295,6 +296,22 @@ impl Container {
})
.await?;

// Unbind ports
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE container_ports
SET delete_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Exited { exit_code },
Expand All @@ -306,31 +323,31 @@ impl Container {
Ok(())
}

pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "stopping");
pub async fn signal(self: &Arc<Self>, ctx: &Arc<Ctx>, signal: Signal) -> Result<()> {
tracing::info!(container_id=?self.container_id, ?signal, "sending signal");

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
if let Err(err) = self2.signal_inner(&ctx2, signal).await {
tracing::error!(?err, "container signal failed");
}

self2.cleanup(&ctx2).await
});

Ok(())
}

async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
async fn signal_inner(self: &Arc<Self>, ctx: &Arc<Ctx>, signal: Signal) -> Result<()> {
let mut i = 0;

// Signal command might be sent before the container has a valid PID. This loop waits for the PID to
// be set
let pid = loop {
if let Some(pid) = *self.pid.lock().await {
break Some(pid);
}

tracing::warn!(container_id=?self.container_id, "waiting for pid to stop workflow");
tracing::warn!(container_id=?self.container_id, "waiting for pid to signal container");

if i > STOP_PID_RETRIES {
tracing::error!(
Expand All @@ -345,31 +362,42 @@ impl Container {
tokio::time::sleep(STOP_PID_INTERVAL).await;
};

// Kill if PID found
// Kill if PID set
if let Some(pid) = pid {
kill(pid, Signal::SIGTERM)?;
use std::result::Result::{Err, Ok};

match kill(pid, signal) {
Ok(_) => {}
Err(Errno::ESRCH) => {
tracing::warn!(container_id=?self.container_id, ?pid, "pid not found for signalling")
}
Err(err) => return Err(err.into()),
}
}

utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;
// Update stop_ts
if matches!(signal, Signal::SIGTERM) || pid.is_none() {
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET stop_ts = ?2
container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopped,
})
.await?;
}

Ok(())
}
Expand Down
152 changes: 136 additions & 16 deletions lib/pegboard/manager/src/container/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use nix::{
unistd::{fork, pipe, read, write, ForkResult, Pid},
};
use pegboard::protocol;
use rand::Rng;
use serde_json::{json, Value};
use tokio::{
fs::{self, File},
Expand All @@ -20,9 +21,11 @@ use tokio::{
};

use super::{oci_config, Container};
use crate::ctx::Ctx;
use crate::{ctx::Ctx, utils};

const NETWORK_NAME: &str = "rivet-pegboard";
const MIN_INGRESS_PORT: u16 = 20000;
const MAX_INGRESS_PORT: u16 = 31999;

impl Container {
pub async fn setup_oci_bundle(&self, ctx: &Ctx) -> Result<()> {
Expand Down Expand Up @@ -334,21 +337,7 @@ impl Container {

tracing::info!(container_id=?self.container_id, "writing cni params");

let cni_port_mappings = self
.config
.ports
.iter()
.map(|(_, port)| {
// Pick random port that isn't taken
let host_port = todo!();

json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
})
.collect::<Vec<_>>();
let cni_port_mappings = self.bind_ports(ctx).await?;

// MARK: Generate CNI parameters
//
Expand Down Expand Up @@ -414,6 +403,137 @@ impl Container {
Ok(())
}

pub(crate) async fn bind_ports(&self, ctx: &Ctx) -> Result<Vec<serde_json::Value>> {
let mut tcp_count = 0;
let mut udp_count = 0;

// Count ports
for (_, port) in &self.config.ports {
match port.proxy_protocol {
protocol::TransportProtocol::Tcp => tcp_count += 1,
protocol::TransportProtocol::Udp => udp_count += 1,
}
}

let max = MAX_INGRESS_PORT - MIN_INGRESS_PORT;
let tcp_offset = rand::thread_rng().gen_range(0..max);
let udp_offset = rand::thread_rng().gen_range(0..max);

// Selects available TCP and UDP ports
let rows = utils::query(|| async {
sqlx::query_as::<_, (i64, i64)>(indoc!(
"
INSERT INTO container_ports (container_id, port, protocol)
SELECT ?1, port, protocol
-- Select TCP ports
FROM (
WITH RECURSIVE
nums(n, i) AS (
SELECT ?4, ?4
UNION ALL
SELECT (n + 1) % (?6 + 1), i + 1
FROM nums
WHERE i < ?6 + ?4
),
available_ports(port) AS (
SELECT nums.n
FROM nums
LEFT JOIN container_ports AS p
ON
nums.n = p.port AND
p.protocol = 0 AND
delete_ts IS NULL
WHERE
p.port IS NULL OR
delete_ts IS NOT NULL
LIMIT ?2
)
SELECT port, 0 AS protocol FROM available_ports
)
UNION ALL
SELECT ?1, port, protocol
-- Select UDP ports
FROM (
WITH RECURSIVE
nums(n, i) AS (
SELECT ?5, ?5
UNION ALL
SELECT (n + 1) % (?6 + 1), i + 1
FROM nums
WHERE i < ?6 + ?5
),
available_ports(port) AS (
SELECT nums.n
FROM nums
LEFT JOIN container_ports AS p
ON
nums.n = p.port AND
p.protocol = 1 AND
delete_ts IS NULL
WHERE
p.port IS NULL OR
delete_ts IS NOT NULL
LIMIT ?3
)
SELECT port, 1 AS protocol FROM available_ports
)
RETURNING port, protocol
",
))
.bind(self.container_id)
.bind(tcp_count as i64) // ?2
.bind(udp_count as i64) // ?3
.bind(tcp_offset as i64) // ?4
.bind(udp_offset as i64) // ?5
.bind(max as i64) // ?6
.fetch_all(&mut *ctx.sql().await?)
.await
})
.await?;

if rows.len() != tcp_count + udp_count {
bail!("not enough available ports");
}

let cni_port_mappings = self
.config
.ports
.iter()
.filter(|(_, port)| matches!(port.proxy_protocol, protocol::TransportProtocol::Tcp))
.zip(
rows.iter()
.filter(|(_, protocol)| *protocol == protocol::TransportProtocol::Tcp as i64),
)
.map(|((_, port), (host_port, _))| {
json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
})
.chain(
self.config
.ports
.iter()
.filter(|(_, port)| {
matches!(port.proxy_protocol, protocol::TransportProtocol::Udp)
})
.zip(rows.iter().filter(|(_, protocol)| {
*protocol == protocol::TransportProtocol::Udp as i64
}))
.map(|((_, port), (host_port, _))| {
json!({
"HostPort": host_port,
"ContainerPort": port.internal_port,
"Protocol": port.proxy_protocol.to_string(),
})
}),
)
.collect::<Vec<_>>();

Ok(cni_port_mappings)
}

#[tracing::instrument(skip_all)]
pub async fn cleanup(&self, ctx: &Ctx) -> Result<()> {
use std::result::Result::{Err, Ok};
Expand Down
9 changes: 6 additions & 3 deletions lib/pegboard/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,12 @@ impl Ctx {
// Spawn container
container.start(&self).await?;
}
protocol::Command::StopContainer { container_id } => {
protocol::Command::SignalContainer {
container_id,
signal,
} => {
if let Some(container) = self.containers.read().await.get(&container_id) {
container.stop(&self).await?;
container.signal(&self, signal.try_into()?).await?;
} else {
tracing::warn!(
?container_id,
Expand Down Expand Up @@ -318,7 +321,7 @@ impl Ctx {
"
SELECT container_id, config, pid
FROM containers
WHERE stop_ts IS NULL AND exit_ts IS NULL
WHERE exit_ts IS NULL
",
))
.fetch_all(&mut *self.sql().await?)
Expand Down
33 changes: 33 additions & 0 deletions lib/pegboard/manager/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE TABLE IF NOT EXISTS container_ports (
container_id TEXT NOT NULL, -- UUID
port INT NOT NULL,
protocol INT NOT NULL, -- protocol::TransportProtocol

delete_ts INT
)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE INDEX IF NOT EXISTS container_ports_id_idx
ON container_ports(container_id)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE UNIQUE INDEX IF NOT EXISTS container_ports_unique_idx
ON container_ports(port, protocol)
WHERE delete_ts IS NULL
",
))
.execute(&mut *conn)
.await?;

Ok(())
}

Expand Down
Loading
Loading