Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sv2-mining-proxy lib #1252

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 51 additions & 7 deletions roles/mining-proxy/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use roles_logic_sv2::{
};
use serde::Deserialize;
use std::{net::SocketAddr, sync::Arc};
use tokio::{net::TcpListener, sync::oneshot};
use tracing::info;
use upstream_mining::UpstreamMiningNode;

type RLogic = MiningProxyRoutingLogic<
Expand Down Expand Up @@ -84,10 +86,10 @@ pub fn get_common_routing_logic() -> CommonRoutingLogic<RLogic> {

#[derive(Debug, Deserialize, Clone)]
pub struct UpstreamMiningValues {
address: String,
port: u16,
pub_key: key_utils::Secp256k1PublicKey,
channel_kind: ChannelKind,
pub address: String,
pub port: u16,
pub pub_key: key_utils::Secp256k1PublicKey,
pub channel_kind: ChannelKind,
}

#[derive(Debug, Deserialize, Clone, Copy)]
Expand All @@ -103,9 +105,9 @@ pub struct Configuration {
pub listen_mining_port: u16,
pub max_supported_version: u16,
pub min_supported_version: u16,
downstream_share_per_minute: f32,
expected_total_downstream_hr: f32,
reconnect: bool,
pub downstream_share_per_minute: f32,
pub expected_total_downstream_hr: f32,
pub reconnect: bool,
}
pub async fn initialize_r_logic(
upstreams: &[UpstreamMiningValues],
Expand Down Expand Up @@ -145,3 +147,45 @@ pub async fn initialize_r_logic(
downstream_to_upstream_map: std::collections::HashMap::new(),
}
}

pub async fn start_mining_proxy(config: Configuration) {
let group_id = Arc::new(Mutex::new(GroupId::new()));
ROUTING_LOGIC
.set(Mutex::new(
initialize_r_logic(&config.upstreams, group_id, config.clone()).await,
))
.expect("BUG: Failed to set ROUTING_LOGIC");

info!("Initializing upstream scanner");
initialize_upstreams(config.min_supported_version, config.max_supported_version).await;
info!("Initializing downstream listener");

let socket = SocketAddr::new(
config.listen_address.parse().unwrap(),
config.listen_mining_port,
);
let listener = TcpListener::bind(socket).await.unwrap();

info!("Listening for downstream mining connections on {}", socket);

let (shutdown_tx, shutdown_rx) = oneshot::channel();

let (_, res) = tokio::join!(
// Wait for downstream connection
downstream_mining::listen_for_downstream_mining(listener, shutdown_rx),
// handle SIGTERM/QUIT / ctrl+c
tokio::spawn(async {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen to signals");
let _ = shutdown_tx.send(());
info!("Interrupt received");
})
);

if let Err(e) = res {
panic!("Failed to wait for clean exit: {:?}", e);
}

info!("Shutdown done");
}
48 changes: 3 additions & 45 deletions roles/mining-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
//! A Downstream that signal the capacity to handle group channels can open more than one channel.
//! A Downstream that signal the incapacity to handle group channels can open only one channel.
#![allow(special_module_name)]
use std::{net::SocketAddr, sync::Arc};

use tokio::{net::TcpListener, sync::oneshot};
use tracing::{error, info};
use tracing::error;

use ext_config::{Config, File, FileFormat};
use lib::Configuration;
use roles_logic_sv2::utils::{GroupId, Mutex};

mod lib;
pub mod lib;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -131,43 +127,5 @@ async fn main() {
}
};

let group_id = Arc::new(Mutex::new(GroupId::new()));
lib::ROUTING_LOGIC
.set(Mutex::new(
lib::initialize_r_logic(&config.upstreams, group_id, config.clone()).await,
))
.expect("BUG: Failed to set ROUTING_LOGIC");

info!("Initializing upstream scanner");
lib::initialize_upstreams(config.min_supported_version, config.max_supported_version).await;
info!("Initializing downstream listener");

let socket = SocketAddr::new(
config.listen_address.parse().unwrap(),
config.listen_mining_port,
);
let listener = TcpListener::bind(socket).await.unwrap();

info!("Listening for downstream mining connections on {}", socket);

let (shutdown_tx, shutdown_rx) = oneshot::channel();

let (_, res) = tokio::join!(
// Wait for downstream connection
lib::downstream_mining::listen_for_downstream_mining(listener, shutdown_rx),
// handle SIGTERM/QUIT / ctrl+c
tokio::spawn(async {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen to signals");
let _ = shutdown_tx.send(());
info!("Interrupt received");
})
);

if let Err(e) = res {
panic!("Failed to wait for clean exit: {:?}", e);
}

info!("Shutdown done");
lib::start_mining_proxy(config).await;
}
1 change: 1 addition & 0 deletions roles/tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2"] }
const_sv2 = { path = "../../protocols/v2/const-sv2" }
flate2 = "1.0.32"
key-utils = { path = "../../utils/key-utils" }
mining_proxy_sv2 = { path = "../mining-proxy" }
minreq = { version = "2.12.0", features = ["https"] }
once_cell = "1.19.0"
network_helpers_sv2 = { path = "../roles-utils/network-helpers", features =["with_tokio","with_buffer_pool"] }
Expand Down
28 changes: 28 additions & 0 deletions roles/tests-integration/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,31 @@ pub async fn start_template_provider(tp_port: u16) -> TemplateProvider {
template_provider.generate_blocks(16);
template_provider
}

pub async fn start_mining_sv2_proxy(upstream: SocketAddr) -> SocketAddr {
use mining_proxy_sv2::{ChannelKind, UpstreamMiningValues};
let upstreams = vec![UpstreamMiningValues {
address: upstream.ip().to_string(),
port: upstream.port(),
pub_key: Secp256k1PublicKey::from_str(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72",
)
.unwrap(),
channel_kind: ChannelKind::Extended,
}];
let mining_proxy_listening_address = get_available_address();
let config = mining_proxy_sv2::Configuration {
upstreams,
listen_address: mining_proxy_listening_address.ip().to_string(),
listen_mining_port: mining_proxy_listening_address.port(),
max_supported_version: 2,
min_supported_version: 2,
downstream_share_per_minute: 1.0,
expected_total_downstream_hr: 10_000.0,
reconnect: true,
};
tokio::spawn(async move {
mining_proxy_sv2::start_mining_proxy(config).await;
});
mining_proxy_listening_address
}
Loading