diff --git a/roles/Cargo.lock b/roles/Cargo.lock index f7d1d1a8b..3eba7eec3 100644 --- a/roles/Cargo.lock +++ b/roles/Cargo.lock @@ -1304,6 +1304,7 @@ dependencies = [ "const_sv2", "flate2", "key-utils", + "mining_proxy_sv2", "minreq", "network_helpers_sv2", "once_cell", diff --git a/roles/mining-proxy/src/lib/mod.rs b/roles/mining-proxy/src/lib/mod.rs index 8c7a8563d..d0d9d23de 100644 --- a/roles/mining-proxy/src/lib/mod.rs +++ b/roles/mining-proxy/src/lib/mod.rs @@ -10,6 +10,8 @@ use roles_logic_sv2::{ }; use serde::Deserialize; use std::{net::SocketAddr, sync::Arc}; +use tokio::{net::TcpListener, sync::oneshot}; +use tracing::info; use upstream_mining::UpstreamMiningNode; type RLogic = MiningProxyRoutingLogic< @@ -84,10 +86,10 @@ pub fn get_common_routing_logic() -> CommonRoutingLogic { #[derive(Debug, Deserialize, Clone)] pub struct UpstreamMiningValues { - address: String, - port: u16, - pub_key: key_utils::Secp256k1PublicKey, - channel_kind: ChannelKind, + pub address: String, + pub port: u16, + pub pub_key: key_utils::Secp256k1PublicKey, + pub channel_kind: ChannelKind, } #[derive(Debug, Deserialize, Clone, Copy)] @@ -103,9 +105,9 @@ pub struct Configuration { pub listen_mining_port: u16, pub max_supported_version: u16, pub min_supported_version: u16, - downstream_share_per_minute: f32, - expected_total_downstream_hr: f32, - reconnect: bool, + pub downstream_share_per_minute: f32, + pub expected_total_downstream_hr: f32, + pub reconnect: bool, } pub async fn initialize_r_logic( upstreams: &[UpstreamMiningValues], @@ -145,3 +147,45 @@ pub async fn initialize_r_logic( downstream_to_upstream_map: std::collections::HashMap::new(), } } + +pub async fn start_mining_proxy(config: Configuration) { + let group_id = Arc::new(Mutex::new(GroupId::new())); + ROUTING_LOGIC + .set(Mutex::new( + initialize_r_logic(&config.upstreams, group_id, config.clone()).await, + )) + .expect("BUG: Failed to set ROUTING_LOGIC"); + + info!("Initializing upstream scanner"); + initialize_upstreams(config.min_supported_version, config.max_supported_version).await; + info!("Initializing downstream listener"); + + let socket = SocketAddr::new( + config.listen_address.parse().unwrap(), + config.listen_mining_port, + ); + let listener = TcpListener::bind(socket).await.unwrap(); + + info!("Listening for downstream mining connections on {}", socket); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let (_, res) = tokio::join!( + // Wait for downstream connection + downstream_mining::listen_for_downstream_mining(listener, shutdown_rx), + // handle SIGTERM/QUIT / ctrl+c + tokio::spawn(async { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen to signals"); + let _ = shutdown_tx.send(()); + info!("Interrupt received"); + }) + ); + + if let Err(e) = res { + panic!("Failed to wait for clean exit: {:?}", e); + } + + info!("Shutdown done"); +} diff --git a/roles/mining-proxy/src/main.rs b/roles/mining-proxy/src/main.rs index 4472398e8..84eb9f59b 100644 --- a/roles/mining-proxy/src/main.rs +++ b/roles/mining-proxy/src/main.rs @@ -17,16 +17,12 @@ //! A Downstream that signal the capacity to handle group channels can open more than one channel. //! A Downstream that signal the incapacity to handle group channels can open only one channel. #![allow(special_module_name)] -use std::{net::SocketAddr, sync::Arc}; - -use tokio::{net::TcpListener, sync::oneshot}; -use tracing::{error, info}; +use tracing::error; use ext_config::{Config, File, FileFormat}; use lib::Configuration; -use roles_logic_sv2::utils::{GroupId, Mutex}; -mod lib; +pub mod lib; mod args { use std::path::PathBuf; @@ -131,43 +127,5 @@ async fn main() { } }; - let group_id = Arc::new(Mutex::new(GroupId::new())); - lib::ROUTING_LOGIC - .set(Mutex::new( - lib::initialize_r_logic(&config.upstreams, group_id, config.clone()).await, - )) - .expect("BUG: Failed to set ROUTING_LOGIC"); - - info!("Initializing upstream scanner"); - lib::initialize_upstreams(config.min_supported_version, config.max_supported_version).await; - info!("Initializing downstream listener"); - - let socket = SocketAddr::new( - config.listen_address.parse().unwrap(), - config.listen_mining_port, - ); - let listener = TcpListener::bind(socket).await.unwrap(); - - info!("Listening for downstream mining connections on {}", socket); - - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - - let (_, res) = tokio::join!( - // Wait for downstream connection - lib::downstream_mining::listen_for_downstream_mining(listener, shutdown_rx), - // handle SIGTERM/QUIT / ctrl+c - tokio::spawn(async { - tokio::signal::ctrl_c() - .await - .expect("Failed to listen to signals"); - let _ = shutdown_tx.send(()); - info!("Interrupt received"); - }) - ); - - if let Err(e) = res { - panic!("Failed to wait for clean exit: {:?}", e); - } - - info!("Shutdown done"); + lib::start_mining_proxy(config).await; } diff --git a/roles/tests-integration/Cargo.toml b/roles/tests-integration/Cargo.toml index 5c077e637..ce89cde57 100644 --- a/roles/tests-integration/Cargo.toml +++ b/roles/tests-integration/Cargo.toml @@ -18,6 +18,7 @@ codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2"] } const_sv2 = { path = "../../protocols/v2/const-sv2" } flate2 = "1.0.32" key-utils = { path = "../../utils/key-utils" } +mining_proxy_sv2 = { path = "../mining-proxy" } minreq = { version = "2.12.0", features = ["https"] } once_cell = "1.19.0" network_helpers_sv2 = { path = "../roles-utils/network-helpers", features =["with_tokio","with_buffer_pool"] } diff --git a/roles/tests-integration/tests/common/mod.rs b/roles/tests-integration/tests/common/mod.rs index ea2315122..40c9eeb98 100644 --- a/roles/tests-integration/tests/common/mod.rs +++ b/roles/tests-integration/tests/common/mod.rs @@ -282,3 +282,31 @@ pub async fn start_template_provider(tp_port: u16) -> TemplateProvider { template_provider.generate_blocks(16); template_provider } + +pub async fn start_mining_sv2_proxy(upstream: SocketAddr) -> SocketAddr { + use mining_proxy_sv2::{ChannelKind, UpstreamMiningValues}; + let upstreams = vec![UpstreamMiningValues { + address: upstream.ip().to_string(), + port: upstream.port(), + pub_key: Secp256k1PublicKey::from_str( + "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72", + ) + .unwrap(), + channel_kind: ChannelKind::Extended, + }]; + let mining_proxy_listening_address = get_available_address(); + let config = mining_proxy_sv2::Configuration { + upstreams, + listen_address: mining_proxy_listening_address.ip().to_string(), + listen_mining_port: mining_proxy_listening_address.port(), + max_supported_version: 2, + min_supported_version: 2, + downstream_share_per_minute: 1.0, + expected_total_downstream_hr: 10_000.0, + reconnect: true, + }; + tokio::spawn(async move { + mining_proxy_sv2::start_mining_proxy(config).await; + }); + mining_proxy_listening_address +}