diff --git a/pallets/phala/src/lib.rs b/pallets/phala/src/lib.rs index 8dadac914..b8bc6475b 100644 --- a/pallets/phala/src/lib.rs +++ b/pallets/phala/src/lib.rs @@ -18,6 +18,7 @@ pub mod phat; pub mod puppets; pub mod registry; pub mod stake_pool; +pub mod phat_tokenomic; pub mod wapod_workers; use compute::{base_pool, computation, pool_proxy, stake_pool_v2, vault, wrapped_balances}; @@ -49,7 +50,7 @@ pub use phat as pallet_phat; pub use phat_tokenomic as pallet_phat_tokenomic; pub use registry as pallet_registry; pub use stake_pool as pallet_stake_pool; -pub mod phat_tokenomic; +pub use wapod_workers as pallet_wapod_workers; #[cfg(feature = "native")] use sp_core::hashing; diff --git a/pallets/phala/src/wapod_workers.rs b/pallets/phala/src/wapod_workers.rs index 7d4acdd9a..e5fdecf43 100644 --- a/pallets/phala/src/wapod_workers.rs +++ b/pallets/phala/src/wapod_workers.rs @@ -4,6 +4,7 @@ pub use self::pallet::*; #[frame_support::pallet] pub mod pallet { + use crate::{mq, registry}; use frame_support::{ dispatch::DispatchResult, ensure, @@ -17,10 +18,10 @@ pub mod pallet { }; use sp_runtime::{traits::Zero, SaturatedConversion}; use wapod_eco_types::{ - bench_app::{SignedMessage, SigningMessage}, + bench_app::{MetricsToken, SignedMessage, SigningMessage}, crypto::CryptoProvider, - primitives::BoundedString, - ticket::{Prices, TicketDescription}, + primitives::{BoundedString, BoundedVec}, + ticket::{Prices, SignedWorkerDescription, TicketDescription, WorkerDescription}, }; type BalanceOf = @@ -39,9 +40,10 @@ pub mod pallet { impl Fraction { fn saturating_mul_u64(&self, rhs: u64) -> u64 { + let rhs = rhs as u128; let numerator = self.numerator as u128; let denominator = self.denominator as u128; - let result = (numerator * rhs as u128) / denominator; + let result = (numerator * rhs) / u128::max(denominator, 1); result.saturated_into() } } @@ -54,6 +56,14 @@ pub mod pallet { score_ratio: Fraction, } + #[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, MaxEncodedLen)] + #[cfg_attr(feature = "std", derive(scale_info::TypeInfo))] + pub struct WorkerSession { + pub session_id: [u8; 32], + pub last_nonce: [u8; 32], + pub last_metrics_sn: u64, + } + struct SpCrypto; impl CryptoProvider for SpCrypto { fn sr25519_verify(public_key: &[u8], message: &[u8], signature: &[u8]) -> bool { @@ -97,24 +107,18 @@ pub mod pallet { pub struct WorkerListInfo { pub owner: AcountId32, pub prices: Prices, + pub description: BoundedString<1024>, } #[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, MaxEncodedLen)] #[cfg_attr(feature = "std", derive(scale_info::TypeInfo))] - pub struct WorkerState { + pub struct WorkingState { session_id: u32, unresponsive: bool, last_iterations: u64, last_update_time: i64, } - #[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, MaxEncodedLen)] - #[cfg_attr(feature = "std", derive(scale_info::TypeInfo))] - pub struct WorkerDescription { - prices: Prices, - description: BoundedString<1024>, - } - #[pallet::config] pub trait Config: frame_system::Config { type RuntimeEvent: From> + IsType<::RuntimeEvent>; @@ -130,36 +134,51 @@ pub mod pallet { #[pallet::storage] pub type NextTicketId = StorageValue<_, TicketId, ValueQuery>; + /// Active tickets. + /// Each ticket holds a payment infomation to a target app address. Multiple ticket can pay for a same app at the sametime. #[pallet::storage] pub type Tickets = StorageMap<_, Twox64Concat, TicketId, TicketInfo>; #[pallet::storage] pub type NextWorkerListId = StorageValue<_, ListId, ValueQuery>; + /// Worker lists. + /// + /// A worker list is a collection of workers that with the same price. #[pallet::storage] pub type WorkerLists = StorageMap<_, Twox64Concat, ListId, WorkerListInfo>; + /// Concrete workers associated to worker list. #[pallet::storage] pub type WorkerListWorkers = StorageDoubleMap<_, Twox64Concat, ListId, Twox64Concat, WorkerPublicKey, ()>; + /// Information about workers. #[pallet::storage] pub type WorkerDescriptions = StorageMap<_, Twox64Concat, WorkerPublicKey, WorkerDescription>; + /// V3 information about workers. #[pallet::storage] - pub type WorkingWorkers = StorageMap<_, Twox64Concat, WorkerPublicKey, WorkerState>; + pub type WorkerSessions = StorageMap<_, Twox64Concat, WorkerPublicKey, WorkerSession>; + /// Working state of wapod workers. V2 compatible. #[pallet::storage] - pub type BenchmarkAddresses = StorageMap<_, Twox64Concat, Address, BenchAppInfo>; + pub type WorkingWorkers = StorageMap<_, Twox64Concat, WorkerPublicKey, WorkingState>; + /// Allowed app addresses that used to benchmark workers. #[pallet::storage] - pub type PrimaryBenchmarkAddress = StorageValue<_, Address>; + pub type BenchmarkApps = StorageMap<_, Twox64Concat, Address, BenchAppInfo>; + + /// Current recommended app address used to benchmark workers. + #[pallet::storage] + pub type RecommendedBenchmarkApp = StorageValue<_, Address>; #[pallet::error] pub enum Error { UnsupportedManifestVersion, NotAllowed, + WorkerNotFound, WorkerListNotFound, TicketNotFound, SignatureVerificationFailed, @@ -167,11 +186,40 @@ pub mod pallet { InvalidBenchApp, OutdatedMessage, InvalidMessageSender, + PriceMismatch, + SessionMismatch, } #[pallet::event] #[pallet::generate_deposit(fn deposit_event)] - pub enum Event {} + pub enum Event { + TicketCreated { + id: TicketId, + }, + TicketClosed { + id: TicketId, + }, + WorkerListCreated { + id: ListId, + }, + WorkersAddedToList { + list_id: ListId, + workers: Vec, + }, + WorkerRemovedFromList { + list_id: ListId, + worker: WorkerPublicKey, + }, + WorkerDescriptionSet { + worker: WorkerPublicKey, + }, + BenchmarkAppAdded { + address: Address, + }, + RecommendedBenchmarkAppChanged { + address: Address, + }, + } const TICKET_ADDRESS_PREFIX: &[u8] = b"wapod/ticket/"; @@ -190,8 +238,8 @@ pub mod pallet { #[pallet::call] impl Pallet where - T: crate::mq::Config, - T: crate::registry::Config, + T: mq::Config, + T: registry::Config, T::AccountId: Into + From, { /// Create a new ticket @@ -262,6 +310,7 @@ pub mod pallet { ::Currency::transfer(&ticket_account, &owner, deposit, AllowDeath)?; } Tickets::::remove(ticket_id); + Self::deposit_event(Event::TicketClosed { id: ticket_id }); Ok(()) } @@ -278,16 +327,17 @@ pub mod pallet { ); let worker_pubkey = WorkerPublicKey(message.worker_pubkey); ensure!( - crate::registry::Pallet::::worker_exsists(&worker_pubkey), + registry::Pallet::::worker_exsists(&worker_pubkey), Error::::InvalidWorkerPubkey ); - let bench_app_info = BenchmarkAddresses::::get(&message.app_address) - .ok_or(Error::::InvalidBenchApp)?; + let bench_app_info = + BenchmarkApps::::get(&message.app_address).ok_or(Error::::InvalidBenchApp)?; match message.message { SigningMessage::BenchScore { gas_per_second, gas_consumed, timestamp_secs, + matrics_token, } => { use frame_support::traits::UnixTime; @@ -295,11 +345,12 @@ pub mod pallet { let diff = (now - timestamp_secs as i64).abs(); ensure!(diff < 600, Error::::OutdatedMessage); + Self::update_metrics_token(&worker_pubkey, &matrics_token)?; + // Update the worker init score let gas_6secs = gas_per_second.saturating_mul(6); let score = bench_app_info.score_ratio.saturating_mul_u64(gas_6secs); - let p_init = - crate::registry::Pallet::::update_worker_score(&worker_pubkey, score); + let p_init = registry::Pallet::::update_worker_score(&worker_pubkey, score); // If the worker is scheduled working by the chain, simulate a heartbeat message. if let Some(mut working_state) = WorkingWorkers::::get(&worker_pubkey) { let delta_time = now - working_state.last_update_time; @@ -314,12 +365,13 @@ pub mod pallet { let p_instant = p_instant.min(p_max) as u32; let worker = MessageOrigin::Worker(worker_pubkey.into()); + // Minic the worker heartbeat message let worker_report = WorkingReportEvent::HeartbeatV3 { iterations, session_id: working_state.session_id, p_instant, }; - crate::mq::Pallet::::push_bound_message(worker, worker_report); + mq::Pallet::::push_bound_message(worker, worker_report); working_state.last_iterations = iterations; working_state.last_update_time = now; @@ -329,11 +381,140 @@ pub mod pallet { }; Ok(()) } + + /// Create a new worker list + #[pallet::call_index(4)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn create_worker_list( + origin: OriginFor, + prices: Prices, + description: BoundedString<1024>, + ) -> DispatchResult { + let owner = ensure_signed(origin)?; + Self::add_worker_list(WorkerListInfo { + owner: owner.into(), + prices, + description, + }); + Ok(()) + } + + /// Add a worker to a worker list + #[pallet::call_index(5)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn add_workers_to_list( + origin: OriginFor, + list_id: ListId, + workers: BoundedVec, + ) -> DispatchResult { + let owner = ensure_signed(origin)?; + let list_info = WorkerLists::::get(list_id).ok_or(Error::::WorkerListNotFound)?; + ensure!(owner == list_info.owner.into(), Error::::NotAllowed); + for worker in workers.iter() { + let Some(worker_info) = WorkerDescriptions::::get(worker) else { + return Err(Error::::WorkerNotFound.into()); + }; + ensure!( + worker_info.prices == Default::default() + || worker_info.prices == list_info.prices, + Error::::PriceMismatch + ); + WorkerListWorkers::::insert(list_id, worker, ()); + } + Self::deposit_event(Event::WorkersAddedToList { + list_id, + workers: workers.into(), + }); + Ok(()) + } + + /// Remove a worker from a worker list + #[pallet::call_index(6)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn remove_worker_from_list( + origin: OriginFor, + list_id: ListId, + worker: WorkerPublicKey, + ) -> DispatchResult { + let owner = ensure_signed(origin)?; + let list_info = WorkerLists::::get(list_id).ok_or(Error::::WorkerListNotFound)?; + ensure!(owner == list_info.owner.into(), Error::::NotAllowed); + WorkerListWorkers::::remove(list_id, worker); + Self::deposit_event(Event::WorkerRemovedFromList { list_id, worker }); + Ok(()) + } + + /// Set worker description + #[pallet::call_index(7)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn set_worker_description( + origin: OriginFor, + signed_description: SignedWorkerDescription, + ) -> DispatchResult { + let _ = ensure_signed(origin)?; + let worker_pubkey = WorkerPublicKey(signed_description.worker_pubkey); + // Worker price can only be set once + ensure!( + !WorkerDescriptions::::contains_key(&worker_pubkey), + Error::::NotAllowed + ); + ensure!( + registry::Pallet::::worker_exsists(&worker_pubkey), + Error::::InvalidWorkerPubkey + ); + ensure!( + signed_description.verify::(), + Error::::SignatureVerificationFailed + ); + WorkerDescriptions::::insert(&worker_pubkey, signed_description.worker_description); + Self::deposit_event(Event::WorkerDescriptionSet { + worker: worker_pubkey, + }); + Ok(()) + } + + /// Add a benchmark app (governance only) + #[pallet::call_index(8)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn add_benchmark_app( + origin: OriginFor, + ticket: TicketId, + version: u32, + score_ratio: Fraction, + ) -> DispatchResult { + T::GovernanceOrigin::ensure_origin(origin)?; + let address = Tickets::::get(ticket) + .ok_or(Error::::TicketNotFound)? + .app_address; + BenchmarkApps::::insert( + address, + BenchAppInfo { + version, + ticket, + score_ratio, + }, + ); + Self::deposit_event(Event::BenchmarkAppAdded { address }); + Ok(()) + } + + /// Set recommended benchmark app (governance only) + #[pallet::call_index(9)] + #[pallet::weight(Weight::from_parts(10_000u64, 0) + T::DbWeight::get().writes(1u64))] + pub fn set_recommended_benchmark_app( + origin: OriginFor, + address: Address, + ) -> DispatchResult { + T::GovernanceOrigin::ensure_origin(origin)?; + RecommendedBenchmarkApp::::set(Some(address)); + Self::deposit_event(Event::RecommendedBenchmarkAppChanged { address }); + Ok(()) + } } impl Pallet where - T: crate::mq::Config, + T: mq::Config, T::AccountId: Into + From, { fn add_ticket(info: TicketInfo) -> TicketId { @@ -343,9 +524,42 @@ pub mod pallet { id }; Tickets::::insert(id, info); + Self::deposit_event(Event::TicketCreated { id }); + id + } + + fn add_worker_list(info: WorkerListInfo) -> ListId { + let id = { + let id = NextWorkerListId::::get(); + NextWorkerListId::::put(id.wrapping_add(1)); + id + }; + WorkerLists::::insert(id, info); + Self::deposit_event(Event::WorkerListCreated { id }); id } + fn update_metrics_token( + worker_pubkey: &WorkerPublicKey, + matrics_token: &MetricsToken, + ) -> DispatchResult { + let Some(mut worker_session) = WorkerSessions::::get(worker_pubkey) else { + return Err(Error::::WorkerNotFound.into()); + }; + ensure!( + worker_session.last_metrics_sn < matrics_token.metrics_sn, + Error::::SessionMismatch + ); + ensure!( + worker_session.session_id == matrics_token.worker_session, + Error::::OutdatedMessage + ); + worker_session.last_metrics_sn = matrics_token.metrics_sn; + worker_session.last_nonce = matrics_token.nonce; + WorkerSessions::::insert(worker_pubkey, worker_session); + Ok(()) + } + pub fn on_worker_event_received(message: DecodedMessage) -> DispatchResult { ensure!(message.sender.is_pallet(), Error::::InvalidMessageSender); let SystemEvent::WorkerEvent(event) = message.payload else { @@ -359,7 +573,7 @@ pub mod pallet { WorkerEvent::Started { session_id, .. } => { WorkingWorkers::::insert( &worker_pubkey, - WorkerState { + WorkingState { session_id, unresponsive: false, last_iterations: 0, @@ -388,4 +602,4 @@ pub mod pallet { Ok(()) } } -} \ No newline at end of file +} diff --git a/standalone/runtime/src/lib.rs b/standalone/runtime/src/lib.rs index 1c2d16167..2aa8dc102 100644 --- a/standalone/runtime/src/lib.rs +++ b/standalone/runtime/src/lib.rs @@ -39,8 +39,8 @@ use frame_support::{ fungible::HoldConsideration, tokens::{PayFromAccount, UnityAssetBalanceConversion}, AsEnsureOriginWithArg, ConstU128, ConstU32, Currency, EitherOfDiverse, EqualPrivilegeOnly, - Everything, Imbalance, InstanceFilter, KeyOwnerProofSystem, LockIdentifier, OnUnbalanced, - SortedMembers, WithdrawReasons, LinearStoragePrice, + Everything, Imbalance, InstanceFilter, KeyOwnerProofSystem, LinearStoragePrice, + LockIdentifier, OnUnbalanced, SortedMembers, WithdrawReasons, }, weights::{ constants::{ @@ -110,7 +110,7 @@ mod voter_bags; pub use phala_pallets::{ pallet_base_pool, pallet_computation, pallet_mq, pallet_phat, pallet_phat_tokenomic, - pallet_registry, pallet_stake_pool, pallet_stake_pool_v2, pallet_vault, + pallet_registry, pallet_stake_pool, pallet_stake_pool_v2, pallet_vault, pallet_wapod_workers, pallet_wrapped_balances, puppets, }; use phat_offchain_rollup::{anchor as pallet_anchor, oracle as pallet_oracle}; @@ -337,8 +337,9 @@ impl InstanceFilter for ProxyType { | RuntimeCall::Elections(..) | RuntimeCall::Treasury(..) ), - ProxyType::Staking => - matches!(c, RuntimeCall::Staking(..) | RuntimeCall::FastUnstake(..)), + ProxyType::Staking => { + matches!(c, RuntimeCall::Staking(..) | RuntimeCall::FastUnstake(..)) + } ProxyType::StakePoolManager => matches!( c, RuntimeCall::Utility { .. } @@ -450,7 +451,7 @@ impl pallet_babe::Config for Runtime { type KeyOwnerProof = >::Proof; type EquivocationReportSystem = - pallet_babe::EquivocationReportSystem; + pallet_babe::EquivocationReportSystem; } parameter_types! { @@ -713,13 +714,16 @@ impl Get> for OffchainRandomBalancing { max => { let seed = sp_io::offchain::random_seed(); let random = ::decode(&mut TrailingZeroInput::new(&seed)) - .expect("input is padded with zeroes; qed") % - max.saturating_add(1); + .expect("input is padded with zeroes; qed") + % max.saturating_add(1); random as usize - }, + } }; - let config = BalancingConfig { iterations, tolerance: 0 }; + let config = BalancingConfig { + iterations, + tolerance: 0, + }; Some(config) } } @@ -1212,7 +1216,7 @@ impl pallet_grandpa::Config for Runtime { type MaxSetIdSessionEntries = MaxSetIdSessionEntries; type KeyOwnerProof = >::Proof; type EquivocationReportSystem = - pallet_grandpa::EquivocationReportSystem; + pallet_grandpa::EquivocationReportSystem; } parameter_types! { @@ -1280,7 +1284,7 @@ impl pallet_society::Config for Runtime { type ClaimPeriod = ClaimPeriod; type MaxLockDuration = MaxLockDuration; type FounderSetOrigin = - pallet_collective::EnsureProportionMoreThan; + pallet_collective::EnsureProportionMoreThan; type ChallengePeriod = ChallengePeriod; type MaxPayouts = MaxPayouts; type MaxBids = MaxBids; @@ -1526,6 +1530,11 @@ impl pallet_phat_tokenomic::Config for Runtime { type Currency = Balances; } +impl pallet_wapod_workers::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + type Currency = Balances; +} + parameter_types! { pub const QueuePrefix: &'static [u8] = b"_queue/"; pub const QueueCapacity: u32 = 128; @@ -1626,6 +1635,7 @@ construct_runtime!( PhalaBasePool: pallet_base_pool, PhalaPhatContracts: pallet_phat, PhalaPhatTokenomic: pallet_phat_tokenomic, + PhalaWapodWorkers: pallet_wapod_workers, // Rollup and Oracles PhatRollupAnchor: pallet_anchor = 100,