From e529bf04cd4b8bfba805a7fdf3e0ea302a0544cf Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Thu, 23 Nov 2017 17:49:56 +0000 Subject: [PATCH 01/26] PseudoDead processes handling not completed --- src/main/scala/app/Messages.scala | 5 +++ src/main/scala/layers/PartialView.scala | 52 ++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 8ee095e..0d0d96f 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -55,6 +55,11 @@ case class GossipRequest(mid: Int) // Heartbeat case class Heartbeat() +case class IsAlive(p: String) + +case class Check() + +case class ReplyIsAlive(p: String) //Application case class MessagesStats(address: String) diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index f362ce8..1f1ba4b 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -23,6 +23,8 @@ class PartialView extends Actor { val aViewSize = 4 val pViewSize = 30 val aliveProcesses = scala.collection.mutable.Map[String, Double]() + var pseudoDead = scala.collection.mutable.Map[String, Double]() + //var checkAlive = scala.collection.mutable.Map[String, Double]() override def receive = { @@ -141,6 +143,26 @@ class PartialView extends Actor { } } + case isAlive: IsAlive => { + + val process = context.actorSelection(s"${isAlive.p}/user/partialView") + process ! Check + + + } + + case checkAlive: Check => { + sender ! ReplyIsAlive + } + + case replyIsAlive: ReplyIsAlive => { + + pseudoDead -= replyIsAlive.p + + val newTimer: Double = System.currentTimeMillis() + aliveProcesses += (replyIsAlive.p -> newTimer) + + } } @@ -231,6 +253,14 @@ class PartialView extends Actor { //log.debug("Checking for dead processes") for ((p, t) <- aliveProcesses) { + // check for processes with heartbeat timers bigger than 10s + if ((System.currentTimeMillis() - t) >= 7000) { + log.debug("Process " + p + " is dead????") + askIfAlive(p) + } + } + + for ((p, t) <- pseudoDead) { // check for processes with heartbeat timers bigger than 10s if ((System.currentTimeMillis() - t) >= 10000) { aliveProcesses -= p @@ -238,11 +268,31 @@ class PartialView extends Actor { passiveView = passiveView.filter(!_.equals(p)) //log.debug("Process " + p + " removed from passive view") - log.debug("Process: " + p + " is dead") + log.debug("Process: " + p + " is DEFINITELY dead") + var process = context.actorSelection(s"${myself}/user/informationDissemination") process ! BroadcastMessage("del", p) } } + + + } + + def askIfAlive(p: String) = { + aliveProcesses -= p + log.debug("Removing " + p + " from alivePROCESSES") + for(aux <- aliveProcesses){ + log.debug("process " + aux + " is ALIVE!!!") + } + + val timer: Double = System.currentTimeMillis() + pseudoDead += (p -> timer) + for(n <- activeView){ + var process = context.actorSelection(s"${n}/user/partialView") + process ! IsAlive(p) + } + + } } From 31bb2e39d9ce9f11191ba861209030e018792889 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Fri, 24 Nov 2017 15:27:18 +0000 Subject: [PATCH 02/26] PseudoDead handling completed --- src/main/scala/app/Messages.scala | 6 +- .../layers/InformationDissemination.scala | 2 +- src/main/scala/layers/PartialView.scala | 76 +++++++++++++------ 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 0d0d96f..a8dddc9 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -57,9 +57,11 @@ case class Heartbeat() case class IsAlive(p: String) -case class Check() +case class Check(from: String) -case class ReplyIsAlive(p: String) +case class ReplyIsAlive(from: String) + +case class AliveMessage(p: String) //Application case class MessagesStats(address: String) diff --git a/src/main/scala/layers/InformationDissemination.scala b/src/main/scala/layers/InformationDissemination.scala index 7f50f17..1125101 100644 --- a/src/main/scala/layers/InformationDissemination.scala +++ b/src/main/scala/layers/InformationDissemination.scala @@ -115,7 +115,7 @@ class InformationDissemination extends Actor { if (gossipMessage.forwardBcastMsg.bCastMessage.messageType.equals("del")) { val mid = (gossipMessage.forwardBcastMsg.bCastMessage.node + "add").hashCode delivered = delivered.filter(!_.mid.equals(mid)) - log.error("Wrong deletion") + } requested = requested.filter(_.equals(gossipMessage.forwardBcastMsg.mid)) diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index 1f1ba4b..2691f6b 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -22,9 +22,9 @@ class PartialView extends Actor { val PRWL = 3 val aViewSize = 4 val pViewSize = 30 - val aliveProcesses = scala.collection.mutable.Map[String, Double]() + var aliveProcesses = scala.collection.mutable.Map[String, Double]() var pseudoDead = scala.collection.mutable.Map[String, Double]() - //var checkAlive = scala.collection.mutable.Map[String, Double]() + var checkAlive = scala.collection.mutable.Map[String, Double]() override def receive = { @@ -144,28 +144,45 @@ class PartialView extends Actor { } case isAlive: IsAlive => { + val timer: Double = System.currentTimeMillis() + checkAlive += ( isAlive.p -> timer ) val process = context.actorSelection(s"${isAlive.p}/user/partialView") - process ! Check - - + process ! Check(sender.path.address.toString) } - case checkAlive: Check => { - sender ! ReplyIsAlive + case checkIsAlive: Check => { + sender ! ReplyIsAlive(checkIsAlive.from) } case replyIsAlive: ReplyIsAlive => { - - pseudoDead -= replyIsAlive.p + checkAlive -= sender.path.address.toString val newTimer: Double = System.currentTimeMillis() - aliveProcesses += (replyIsAlive.p -> newTimer) + aliveProcesses += (sender.path.address.toString -> newTimer) + + val process = context.actorSelection(s"${replyIsAlive.from}/user/partialView") + process ! AliveMessage(sender.path.address.toString) } + case alive: AliveMessage => { + pseudoDead -= alive.p + val newTimer: Double = System.currentTimeMillis() + aliveProcesses += (alive.p -> newTimer) + } + } + /* + - - - - - - - - - - - - - - - FIM MENSAGENS - - - - - - - - - - - - - - + + + + + + - - - - - - - - - - - - - - - - METODOS - - - - - - - - - - - - - - - - */ + def dropRandomNodeFromActiveView() = { val node: String = Random.shuffle(activeView).head @@ -253,7 +270,7 @@ class PartialView extends Actor { //log.debug("Checking for dead processes") for ((p, t) <- aliveProcesses) { - // check for processes with heartbeat timers bigger than 10s + // check for processes with heartbeat timers bigger than 7s if ((System.currentTimeMillis() - t) >= 7000) { log.debug("Process " + p + " is dead????") askIfAlive(p) @@ -263,28 +280,23 @@ class PartialView extends Actor { for ((p, t) <- pseudoDead) { // check for processes with heartbeat timers bigger than 10s if ((System.currentTimeMillis() - t) >= 10000) { - aliveProcesses -= p - activeView = activeView.filter(!_.equals(p)) - passiveView = passiveView.filter(!_.equals(p)) - //log.debug("Process " + p + " removed from passive view") - - log.debug("Process: " + p + " is DEFINITELY dead") - - - var process = context.actorSelection(s"${myself}/user/informationDissemination") - process ! BroadcastMessage("del", p) + removeFromSystem(p) } } + for ((p, t) <- checkAlive) { + // check for processes with heartbeat timers bigger than 10s + if ((System.currentTimeMillis() - t) >= 10000) { + removeFromSystem(p) + } + } } def askIfAlive(p: String) = { + aliveProcesses -= p log.debug("Removing " + p + " from alivePROCESSES") - for(aux <- aliveProcesses){ - log.debug("process " + aux + " is ALIVE!!!") - } val timer: Double = System.currentTimeMillis() pseudoDead += (p -> timer) @@ -295,4 +307,20 @@ class PartialView extends Actor { } + + def removeFromSystem(p: String) = { + + log.debug("Process: " + p + " is DEFINITELY dead") + + aliveProcesses -= p + pseudoDead -= p + checkAlive -= p + activeView = activeView.filter(!_.equals(p)) + passiveView = passiveView.filter(!_.equals(p)) + + var process = context.actorSelection(s"${myself}/user/informationDissemination") + process ! BroadcastMessage("del", p) + } + + } From 6533e84331849ba724273bac1bd581b47e9359f6 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Fri, 24 Nov 2017 22:42:05 +0000 Subject: [PATCH 03/26] Storage implemented --- src/main/scala/app/Messages.scala | 24 ++++++++++++++++++- src/main/scala/layers/Storage.scala | 36 +++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/layers/Storage.scala diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index a8dddc9..7af1da1 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -52,9 +52,21 @@ case class AntiEntropy(knownMessages: List[Int]) case class GossipRequest(mid: Int) + +//Storage + +case class Write(id: String, data: List[Object]) + +case class Read(id: String) + + + + // Heartbeat + case class Heartbeat() +// Verify PseudoDead processes case class IsAlive(p: String) case class Check(from: String) @@ -63,7 +75,11 @@ case class ReplyIsAlive(from: String) case class AliveMessage(p: String) + + + //Application + case class MessagesStats(address: String) case class ReplyMessagesStats( @@ -81,4 +97,10 @@ case class ReplyMessagesStats( antiEntropyReceived: Int, antiEntropySent: Int - ) \ No newline at end of file + ) + + + + + + diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala new file mode 100644 index 0000000..32c5c78 --- /dev/null +++ b/src/main/scala/layers/Storage.scala @@ -0,0 +1,36 @@ +package layers + +import akka.actor.Actor +import app._ + +class Storage extends Actor{ + + var storage = scala.collection.mutable.HashMap[String, List[Object]]() + + override def receive = { + + /*case init: InitStorage => { + myself = init.selfAddress + globalView = globalView :+ myself + + val process = context.actorSelection(s"${init.contactNode}/user/globalView") + process ! ShowGV + } + */ + + case write: Write => { + + storage.put(write.id, write.data) + + } + + case read: Read => { + + storage.get(read.id) + + } + + + + } +} From aaab954a8c34d6301a64c177783010357855a89b Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Fri, 24 Nov 2017 23:27:41 +0000 Subject: [PATCH 04/26] Storage operations --- src/main/scala/app/Messages.scala | 2 +- src/main/scala/layers/GlobalView.scala | 4 ++++ src/main/scala/layers/PartialView.scala | 6 +----- src/main/scala/layers/Storage.scala | 10 ++++++++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 7af1da1..9dcc996 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -55,7 +55,7 @@ case class GossipRequest(mid: Int) //Storage -case class Write(id: String, data: List[Object]) +case class Write(id: String, data: List[Byte]) case class Read(id: String) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index fb60be3..dbb68be 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -11,12 +11,16 @@ class GlobalView extends Actor { var globalView: List[String] = List.empty var myself: String = "" + var id: String = "" + override def receive = { case init: InitGlobView => { myself = init.selfAddress globalView = globalView :+ myself + id = init.selfAddress.hashCode.toString + val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV } diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index 2691f6b..e50b1b3 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -115,7 +115,7 @@ class PartialView extends Actor { } case AskPassiveView(priority) => { - log.debug("Node: " + sender.path.address.toString + "Asked for a new node with priority: " + + log.debug("Node: " + sender.path.address.toString + " asked for a new node with priority: " + priority) if (priority.equals("force")) { @@ -272,7 +272,6 @@ class PartialView extends Actor { for ((p, t) <- aliveProcesses) { // check for processes with heartbeat timers bigger than 7s if ((System.currentTimeMillis() - t) >= 7000) { - log.debug("Process " + p + " is dead????") askIfAlive(p) } } @@ -296,7 +295,6 @@ class PartialView extends Actor { def askIfAlive(p: String) = { aliveProcesses -= p - log.debug("Removing " + p + " from alivePROCESSES") val timer: Double = System.currentTimeMillis() pseudoDead += (p -> timer) @@ -310,8 +308,6 @@ class PartialView extends Actor { def removeFromSystem(p: String) = { - log.debug("Process: " + p + " is DEFINITELY dead") - aliveProcesses -= p pseudoDead -= p checkAlive -= p diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 32c5c78..b260266 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -5,7 +5,9 @@ import app._ class Storage extends Actor{ - var storage = scala.collection.mutable.HashMap[String, List[Object]]() + var storage = scala.collection.mutable.HashMap[String, List[Byte]]() + var defaultData: List[Byte] = List.empty + override def receive = { @@ -26,7 +28,11 @@ class Storage extends Actor{ case read: Read => { - storage.get(read.id) + if(storage.exists(_ == read.id)) { + storage.get(read.id) + } + else + defaultData } From 701acb0898fe62d8f8174f1f0e79f1f8004b56c3 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Sat, 25 Nov 2017 02:05:16 +0000 Subject: [PATCH 05/26] Storage finished --- src/main/scala/app/Messages.scala | 1 + src/main/scala/app/Process.scala | 4 +++- src/main/scala/layers/GlobalView.scala | 1 + src/main/scala/layers/Replication.scala | 5 +++++ src/main/scala/layers/Storage.scala | 2 -- 5 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/layers/Replication.scala diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 9dcc996..4216e91 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -26,6 +26,7 @@ case class NotifyGlobalView(address: String) case class ShowGV(address: String) + //Other case class ReplyShowView(replyType: String, myself: String, nodes: List[String]) diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index c71b019..3e36dd2 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -3,7 +3,7 @@ package app import akka.actor.{ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import com.typesafe.scalalogging.Logger -import layers.{GlobalView, InformationDissemination, PartialView} +import layers.{GlobalView, InformationDissemination, PartialView, Storage} object Process extends App { @@ -22,6 +22,8 @@ object Process extends App { val globalView = sys.actorOf(Props[GlobalView], "globalView") val partialView = sys.actorOf(Props[PartialView], "partialView") val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination") + val storage = sys.actorOf(Props[Storage], "storage") + var contactNode = "" if (args.length > 1) { diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index dbb68be..a0cb8bc 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -23,6 +23,7 @@ class GlobalView extends Actor { val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV + } case message: BroadcastMessage => { diff --git a/src/main/scala/layers/Replication.scala b/src/main/scala/layers/Replication.scala new file mode 100644 index 0000000..564613a --- /dev/null +++ b/src/main/scala/layers/Replication.scala @@ -0,0 +1,5 @@ +package layers + +class Replication { + +} diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index b260266..2f73cbe 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -36,7 +36,5 @@ class Storage extends Actor{ } - - } } From a8408fd7b9190ccb79f4e613d33d12d6085262d1 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Sat, 25 Nov 2017 18:12:32 +0000 Subject: [PATCH 06/26] Start of replication - Paxos implementation --- src/main/scala/app/Messages.scala | 22 +++++++ src/main/scala/app/Process.scala | 5 ++ src/main/scala/layers/Replication.scala | 5 -- src/main/scala/replication/Accepter.scala | 55 ++++++++++++++++++ src/main/scala/replication/Learner.scala | 35 +++++++++++ src/main/scala/replication/Proposer.scala | 71 +++++++++++++++++++++++ 6 files changed, 188 insertions(+), 5 deletions(-) delete mode 100644 src/main/scala/layers/Replication.scala create mode 100644 src/main/scala/replication/Accepter.scala create mode 100644 src/main/scala/replication/Learner.scala create mode 100644 src/main/scala/replication/Proposer.scala diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 4216e91..cad94e9 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -62,6 +62,28 @@ case class Read(id: String) +//Replication + +case class InitPaxos() + +case class AskSeqNum() + +case class ReplySeqNum(seqNum: Int) + +case class Propose(value: String) + +case class Prepare(seqNum: Int, value: String) + +case class Prepare_OK(seqNum: Int, value: String) + +case class Accept(seqNum: Int, value: String) + +case class Accept_OK(seqNum: Int, value: String) + +case class Decided(value: String) + + + // Heartbeat diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index 3e36dd2..77e55ec 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -4,6 +4,7 @@ import akka.actor.{ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import com.typesafe.scalalogging.Logger import layers.{GlobalView, InformationDissemination, PartialView, Storage} +import replication._ object Process extends App { @@ -22,8 +23,12 @@ object Process extends App { val globalView = sys.actorOf(Props[GlobalView], "globalView") val partialView = sys.actorOf(Props[PartialView], "partialView") val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination") + val storage = sys.actorOf(Props[Storage], "storage") + val proposer = sys.actorOf(Props[Proposer], "proposer") + val accepter = sys.actorOf(Props[Accepter], "accepter") + val learner = sys.actorOf(Props[Learner], "learner") var contactNode = "" if (args.length > 1) { diff --git a/src/main/scala/layers/Replication.scala b/src/main/scala/layers/Replication.scala deleted file mode 100644 index 564613a..0000000 --- a/src/main/scala/layers/Replication.scala +++ /dev/null @@ -1,5 +0,0 @@ -package layers - -class Replication { - -} diff --git a/src/main/scala/replication/Accepter.scala b/src/main/scala/replication/Accepter.scala new file mode 100644 index 0000000..436788d --- /dev/null +++ b/src/main/scala/replication/Accepter.scala @@ -0,0 +1,55 @@ +package replication + +import akka.actor.Actor +import app._ + +class Accepter extends Actor{ + + var allNodes: List[String] = List.empty + var seqNumP: Int = 0 + var seqNumA: Int = 0 + var valueA: String = "" + + override def receive = { + + case init: InitPaxos => { + + val process = context.actorSelection(s"${sender.path.address.toString}/user/globalView") + process ! ShowGV + + } + + case reply: ReplyShowView => { + for(n <- reply.nodes){ + allNodes += n + } + } + + case prepare: Prepare => { + + if(prepare.seqNum > seqNumP){ + seqNumP = prepare.seqNum + + sender ! Prepare_OK(prepare.seqNum, prepare.value) + } + + } + + case accept: Accept => { + if(accept.seqNum >= seqNumP){ + seqNumA = accept.seqNum + valueA = accept.value + + sender ! Accept_OK(accept.seqNum, "") + + for(n <- allNodes){ + val process = context.actorSelection(s"${sender.path.address.toString}/user/learner") + process ! Accept_OK(seqNumA, valueA) + } + } + + } + + + } +} diff --git a/src/main/scala/replication/Learner.scala b/src/main/scala/replication/Learner.scala new file mode 100644 index 0000000..995dbfb --- /dev/null +++ b/src/main/scala/replication/Learner.scala @@ -0,0 +1,35 @@ +package replication + +import akka.actor.Actor +import app._ + +class Learner extends Actor{ + + var decision: String = "" + var seqNumA: Int = 0 + var valueA: String = "" + //var aset + + override def receive = { + + case acceptOk: Accept_OK => { + if(acceptOk.seqNum > seqNumA){ + seqNumA = acceptOk.seqNum + valueA = acceptOk.value + + //aset.reset() + + } + else if(acceptOk.seqNum < seqNumA){ + + } + //aset.add(sender.path.address.toString) + + /*if( aset.size > allNodes.size/2){ + decision = valueA + */ + } + + } + +} diff --git a/src/main/scala/replication/Proposer.scala b/src/main/scala/replication/Proposer.scala new file mode 100644 index 0000000..326f6b4 --- /dev/null +++ b/src/main/scala/replication/Proposer.scala @@ -0,0 +1,71 @@ +package replication + +import akka.actor.Actor +import app._ + + +class Proposer extends Actor{ + + var allNodes: List[String] = List.empty + var seqNum: Int = 0; + var value: String = ""; + var client: String = ""; + + override def receive = { + + case init: InitPaxos => { + + val process = context.actorSelection(s"${sender.path.address.toString}/user/globalView") + process ! ShowGV + + val process2 = context.actorSelection(s"${sender.path.address.toString}/user/proposer") + process2 ! AskSeqNum + + } + + case reply: ReplyShowView => { + for(n <- reply.nodes){ + allNodes += n + } + } + + /* + case askSeqNum: AskSeqNum => { + sender ! ReplySeqNum(seqNum) + } + + case replySeqNum: ReplySeqNum => { + seqNum = replySeqNum.seqNum + } + */ + + case propose: Propose => { + + client = sender.path.address.toString + + for(n <- allNodes){ + val process = context.actorSelection(s"${n}/user/accepter") + process ! Prepare(seqNum, propose.value) + } + + } + + case prepareOk: Prepare_OK => { + value = prepareOk.value + + for(n <- allNodes){ + val process = context.actorSelection(s"${n}/user/accepter") + process ! Accept(seqNum, value) + } + + } + + case acceptOk: Accept_OK => { + //val process = context.actorSelection(s"${client}/user/accepter") + //process ! Decided(value) + } + + } + + +} From 842dae36d391b288ae6fd5e9b6c6b430e0799c6f Mon Sep 17 00:00:00 2001 From: aanciaes Date: Tue, 28 Nov 2017 22:48:07 +0000 Subject: [PATCH 07/26] test commit --- src/main/scala/layers/PartialView.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index e50b1b3..92e7867 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -302,8 +302,6 @@ class PartialView extends Actor { var process = context.actorSelection(s"${n}/user/partialView") process ! IsAlive(p) } - - } def removeFromSystem(p: String) = { From 78d49e46fd475a1ba7f26d20abb4dfe93e88077f Mon Sep 17 00:00:00 2001 From: aanciaes Date: Wed, 29 Nov 2017 00:48:55 +0000 Subject: [PATCH 08/26] Global view indentifier --- src/main/scala/app/Process.scala | 4 ++-- src/main/scala/layers/GlobalView.scala | 9 ++++++--- src/main/scala/layers/Storage.scala | 6 ++++-- src/main/scala/replication/Accepter.scala | 4 ++-- src/main/scala/replication/Proposer.scala | 4 ++-- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index 77e55ec..e38e87c 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -26,9 +26,9 @@ object Process extends App { val storage = sys.actorOf(Props[Storage], "storage") - val proposer = sys.actorOf(Props[Proposer], "proposer") + /*val proposer = sys.actorOf(Props[Proposer], "proposer") val accepter = sys.actorOf(Props[Accepter], "accepter") - val learner = sys.actorOf(Props[Learner], "learner") + val learner = sys.actorOf(Props[Learner], "learner")*/ var contactNode = "" if (args.length > 1) { diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index a0cb8bc..9d9e7e5 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -4,6 +4,8 @@ import akka.actor.Actor import app._ import com.typesafe.scalalogging.Logger +import scala.util.Random + class GlobalView extends Actor { val log = Logger("scala.slick") @@ -11,7 +13,7 @@ class GlobalView extends Actor { var globalView: List[String] = List.empty var myself: String = "" - var id: String = "" + var id: Int = 0 override def receive = { @@ -19,11 +21,12 @@ class GlobalView extends Actor { myself = init.selfAddress globalView = globalView :+ myself - id = init.selfAddress.hashCode.toString + val random = new Random() + id = math.abs(init.selfAddress.hashCode%1000) + println ("Unique Identifier: " + id) val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV - } case message: BroadcastMessage => { diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 2f73cbe..af7a556 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -22,8 +22,11 @@ class Storage extends Actor{ case write: Write => { - storage.put(write.id, write.data) + + + + storage.put(write.id, write.data) } case read: Read => { @@ -35,6 +38,5 @@ class Storage extends Actor{ defaultData } - } } diff --git a/src/main/scala/replication/Accepter.scala b/src/main/scala/replication/Accepter.scala index 436788d..904f5b0 100644 --- a/src/main/scala/replication/Accepter.scala +++ b/src/main/scala/replication/Accepter.scala @@ -19,11 +19,11 @@ class Accepter extends Actor{ } - case reply: ReplyShowView => { + /*case reply: ReplyShowView => { for(n <- reply.nodes){ allNodes += n } - } + }*/ case prepare: Prepare => { diff --git a/src/main/scala/replication/Proposer.scala b/src/main/scala/replication/Proposer.scala index 326f6b4..e5f3b48 100644 --- a/src/main/scala/replication/Proposer.scala +++ b/src/main/scala/replication/Proposer.scala @@ -23,11 +23,11 @@ class Proposer extends Actor{ } - case reply: ReplyShowView => { + /*case reply: ReplyShowView => { for(n <- reply.nodes){ allNodes += n } - } + }*/ /* case askSeqNum: AskSeqNum => { From e0ee2ecea876b04c732e8928ca899f10dfe8c6a2 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Thu, 30 Nov 2017 15:32:59 +0000 Subject: [PATCH 09/26] Storage implemented --- src/main/scala/app/Application.scala | 10 ++++++++ src/main/scala/app/Messages.scala | 3 ++- src/main/scala/app/Process.scala | 5 ++-- src/main/scala/layers/GlobalView.scala | 30 +++++++++++++++++++++-- src/main/scala/layers/Storage.scala | 5 ++-- src/main/scala/replication/Accepter.scala | 2 ++ src/main/scala/replication/Learner.scala | 2 ++ src/main/scala/replication/Proposer.scala | 2 ++ 8 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index 3ef33dc..08fa77e 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -20,6 +20,7 @@ object Application extends App { case "gv" if (words.length == 2) => showGV(words(1)) case "pv" if (words.length == 2) => showPV(words(1)) case "ms" if (words.length == 2) => messagesStats(words(1)) + case "w" if(words.length == 3) => write(words(1), words(2)) //case "msall" if (words.length == 2) => messagesStatsAll() case "clear" => { for (i <- 1 to 20) @@ -42,6 +43,10 @@ object Application extends App { appActor ! MessagesStats(process) } + def write(process: String, data: String) ={ + appActor ! Write(process, data) + } + //def messagesStatsAll() = { //} @@ -93,6 +98,11 @@ object Application extends App { println () println ("-------------------------------------------------------------") } + + case writeStorage : Write => { + val process = sys.actorSelection(s"${writeStorage.id}/user/globalView") + process ! Write(writeStorage.id, writeStorage.data) + } } } } \ No newline at end of file diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index cad94e9..43482ed 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -56,7 +56,8 @@ case class GossipRequest(mid: Int) //Storage -case class Write(id: String, data: List[Byte]) +//case class Write(id: String, data: List[Byte]) +case class Write(id: String, data: String) case class Read(id: String) diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index e38e87c..d64688c 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -3,8 +3,8 @@ package app import akka.actor.{ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import com.typesafe.scalalogging.Logger -import layers.{GlobalView, InformationDissemination, PartialView, Storage} -import replication._ +import layers.{GlobalView, InformationDissemination, PartialView} +//import replication._ object Process extends App { @@ -24,7 +24,6 @@ object Process extends App { val partialView = sys.actorOf(Props[PartialView], "partialView") val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination") - val storage = sys.actorOf(Props[Storage], "storage") /*val proposer = sys.actorOf(Props[Proposer], "proposer") val accepter = sys.actorOf(Props[Accepter], "accepter") diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 9d9e7e5..42949fb 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -15,6 +15,10 @@ class GlobalView extends Actor { var id: Int = 0 + //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() + var storage = scala.collection.mutable.HashMap[String, String]() + var defaultData: List[Byte] = List.empty + override def receive = { case init: InitGlobView => { @@ -34,8 +38,8 @@ class GlobalView extends Actor { message.messageType match { case "add" => { if (!message.node.equals(myself)) - log.debug ("adding: " + message.node + " to global view") - globalView = globalView :+ message.node + log.debug("adding: " + message.node + " to global view") + globalView = globalView :+ message.node } case "del" => { if (!message.node.equals(myself)) @@ -55,5 +59,27 @@ class GlobalView extends Actor { for (n <- reply.nodes.filter(!_.equals(myself))) globalView = globalView :+ n } + + + + + // - - - - - - - - STORAGE - - - - - - - - + + case write: Write => { + log.debug("Received write with key: " + (write.id.hashCode%1000).toString) + log.debug("Data: " + write.data) + storage.put((write.id.hashCode%1000).toString, write.data) + + } + + case read: Read => { + + if (storage.exists(_ == read.id)) { + storage.get((read.id.hashCode%1000).toString) + } + else + defaultData + + } } } \ No newline at end of file diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index af7a556..3a164cc 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -1,3 +1,4 @@ +/* package layers import akka.actor.Actor @@ -11,14 +12,13 @@ class Storage extends Actor{ override def receive = { - /*case init: InitStorage => { + case init: InitStorage => { myself = init.selfAddress globalView = globalView :+ myself val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV } - */ case write: Write => { @@ -40,3 +40,4 @@ class Storage extends Actor{ } } } +*/ diff --git a/src/main/scala/replication/Accepter.scala b/src/main/scala/replication/Accepter.scala index 904f5b0..d53abfb 100644 --- a/src/main/scala/replication/Accepter.scala +++ b/src/main/scala/replication/Accepter.scala @@ -1,3 +1,4 @@ +/* package replication import akka.actor.Actor @@ -53,3 +54,4 @@ class Accepter extends Actor{ } } +*/ diff --git a/src/main/scala/replication/Learner.scala b/src/main/scala/replication/Learner.scala index 995dbfb..c29081a 100644 --- a/src/main/scala/replication/Learner.scala +++ b/src/main/scala/replication/Learner.scala @@ -1,3 +1,4 @@ +/* package replication import akka.actor.Actor @@ -33,3 +34,4 @@ class Learner extends Actor{ } } +*/ diff --git a/src/main/scala/replication/Proposer.scala b/src/main/scala/replication/Proposer.scala index e5f3b48..af6764e 100644 --- a/src/main/scala/replication/Proposer.scala +++ b/src/main/scala/replication/Proposer.scala @@ -1,3 +1,4 @@ +/* package replication import akka.actor.Actor @@ -69,3 +70,4 @@ class Proposer extends Actor{ } +*/ From 2643d09b1940124a9b4a16ebec31478c0b27fe96 Mon Sep 17 00:00:00 2001 From: aanciaes Date: Thu, 30 Nov 2017 23:52:02 +0000 Subject: [PATCH 10/26] Global view identifier --- src/main/scala/layers/GlobalView.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 42949fb..cc3a29f 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -25,8 +25,7 @@ class GlobalView extends Actor { myself = init.selfAddress globalView = globalView :+ myself - val random = new Random() - id = math.abs(init.selfAddress.hashCode%1000) + id = math.abs(init.selfAddress.reverse.hashCode%1000) println ("Unique Identifier: " + id) val process = context.actorSelection(s"${init.contactNode}/user/globalView") From b5145c2a368426cfda5947b9041291c0c3fb46cd Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Fri, 1 Dec 2017 02:41:12 +0000 Subject: [PATCH 11/26] Write to the nearest Process ID (ForwardWrite not working yet) --- src/main/scala/app/Application.scala | 2 +- src/main/scala/app/Messages.scala | 2 + src/main/scala/layers/GlobalView.scala | 53 +++++++++++++++++++++++--- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index 08fa77e..a90c59e 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -100,7 +100,7 @@ object Application extends App { } case writeStorage : Write => { - val process = sys.actorSelection(s"${writeStorage.id}/user/globalView") + val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") process ! Write(writeStorage.id, writeStorage.data) } } diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 43482ed..6c1053b 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -61,6 +61,8 @@ case class Write(id: String, data: String) case class Read(id: String) +case class ForwardWrite(id: Int, data: String) + //Replication diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index cc3a29f..e19be34 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -3,8 +3,9 @@ package layers import akka.actor.Actor import app._ import com.typesafe.scalalogging.Logger +import scala.util.control.Breaks._ + -import scala.util.Random class GlobalView extends Actor { @@ -13,6 +14,7 @@ class GlobalView extends Actor { var globalView: List[String] = List.empty var myself: String = "" + var id: Int = 0 //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() @@ -25,6 +27,7 @@ class GlobalView extends Actor { myself = init.selfAddress globalView = globalView :+ myself + id = math.abs(init.selfAddress.reverse.hashCode%1000) println ("Unique Identifier: " + id) @@ -55,8 +58,10 @@ class GlobalView extends Actor { //Since all global views are up to date, on init //Gets contact node global view and copies it to is own case reply: ReplyShowView => { - for (n <- reply.nodes.filter(!_.equals(myself))) + for (n <- reply.nodes.filter(!_.equals(myself))){ globalView = globalView :+ n + + } } @@ -65,10 +70,22 @@ class GlobalView extends Actor { // - - - - - - - - STORAGE - - - - - - - - case write: Write => { - log.debug("Received write with key: " + (write.id.hashCode%1000).toString) - log.debug("Data: " + write.data) - storage.put((write.id.hashCode%1000).toString, write.data) + log.debug("Received write with key: " + (write.id.hashCode%1000)) + log.debug("With the data: " + write.data) + var idWrite = write.id.hashCode%1000 + var hashedProcesses = scala.collection.mutable.HashMap[Int, String]() + for(n<-globalView){ + hashedProcesses.put((n.hashCode%1000), n) + } + hashedProcesses.toSeq.sortBy(_._1) + + if(hashedProcesses.contains(idWrite)) { + storage.put((write.id.hashCode % 1000).toString, write.data) + log.debug("Process id " + write.id.hashCode % 1000 + "EXISTS and ADDED the data " + write.data) + } + else + findProcessForWrite(write.id.hashCode%1000, hashedProcesses, write.data) } case read: Read => { @@ -80,5 +97,31 @@ class GlobalView extends Actor { defaultData } + + case forwardWrite: ForwardWrite => { + log.debug("Process: " + forwardWrite.id.hashCode()%1000 + " STORED the data: " + forwardWrite.data) + storage.put((forwardWrite.id.hashCode()%1000).toString, forwardWrite.data) + } + } + + def findProcessForWrite(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], data: String) ={ + + log.debug("Process " + idProcess + " does NOT EXIST in the System") + + var previousN = 0 + + + for (n <- hashedProcesses) { + if (n._1 > idProcess) { + log.debug("Process " + n + "is too high") + log.debug("Forwarding WRITE to: " + previousN) + + val process = context.actorSelection(s"${hashedProcesses.get(previousN)}/user/globalView") + process ! ForwardWrite(idProcess, data) + break + } + previousN = n._1 + } + } } \ No newline at end of file From ab7ff71e352aa9824fe26d279dc24b2093d05c05 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Fri, 1 Dec 2017 20:00:05 +0000 Subject: [PATCH 12/26] Forward Write update (not working yet) --- src/main/scala/layers/GlobalView.scala | 54 +++++++++++-------- .../layers/InformationDissemination.scala | 16 +++--- src/main/scala/layers/PartialView.scala | 38 ++++++------- 3 files changed, 58 insertions(+), 50 deletions(-) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index e19be34..783c08a 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -10,6 +10,7 @@ import scala.util.control.Breaks._ class GlobalView extends Actor { val log = Logger("scala.slick") + val hashID_2551 : Int = 474 var globalView: List[String] = List.empty var myself: String = "" @@ -36,11 +37,11 @@ class GlobalView extends Actor { } case message: BroadcastMessage => { - log.debug("Global view receive Broacast Message from: " + sender.path.address.toString) + //log.debug("Global view receive Broacast Message from: " + sender.path.address.toString) message.messageType match { case "add" => { if (!message.node.equals(myself)) - log.debug("adding: " + message.node + " to global view") + //log.debug("adding: " + message.node + " to global view") globalView = globalView :+ message.node } case "del" => { @@ -70,28 +71,29 @@ class GlobalView extends Actor { // - - - - - - - - STORAGE - - - - - - - - case write: Write => { - log.debug("Received write with key: " + (write.id.hashCode%1000)) - log.debug("With the data: " + write.data) - var idWrite = write.id.hashCode%1000 + + var idWrite = math.abs(write.id.reverse.hashCode%1000) + log.debug("Received write with key: " + idWrite) + var hashedProcesses = scala.collection.mutable.HashMap[Int, String]() - for(n<-globalView){ - hashedProcesses.put((n.hashCode%1000), n) + for(n<-globalView){ + hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) } hashedProcesses.toSeq.sortBy(_._1) if(hashedProcesses.contains(idWrite)) { - storage.put((write.id.hashCode % 1000).toString, write.data) - log.debug("Process id " + write.id.hashCode % 1000 + "EXISTS and ADDED the data " + write.data) + storage.put((idWrite).toString, write.data) + log.debug("Process id " + idWrite + "EXISTS and ADDED the data " + write.data) } else - findProcessForWrite(write.id.hashCode%1000, hashedProcesses, write.data) + findProcessForWrite(idWrite, hashedProcesses, write.data) } case read: Read => { if (storage.exists(_ == read.id)) { - storage.get((read.id.hashCode%1000).toString) + storage.get(math.abs((read.id.hashCode%1000)).toString) } else defaultData @@ -99,8 +101,8 @@ class GlobalView extends Actor { } case forwardWrite: ForwardWrite => { - log.debug("Process: " + forwardWrite.id.hashCode()%1000 + " STORED the data: " + forwardWrite.data) - storage.put((forwardWrite.id.hashCode()%1000).toString, forwardWrite.data) + log.debug("Process: " + forwardWrite.id + " STORED the data: " + forwardWrite.data) + storage.put((forwardWrite.id).toString, forwardWrite.data) } } @@ -108,20 +110,26 @@ class GlobalView extends Actor { log.debug("Process " + idProcess + " does NOT EXIST in the System") - var previousN = 0 + var previousN = hashID_2551 + for (n <- hashedProcesses) { + log.debug("hashProcess: " + n) + log.debug("processID: " + n._1) + if (n._1 > idProcess) { + log.debug("Process " + n + "is too high") + log.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) - for (n <- hashedProcesses) { - if (n._1 > idProcess) { - log.debug("Process " + n + "is too high") - log.debug("Forwarding WRITE to: " + previousN) + var aux : (Iterable[String],Iterable[String]) = hashedProcesses.get(previousN).splitAt(1) + println("AUX: " + aux._1) - val process = context.actorSelection(s"${hashedProcesses.get(previousN)}/user/globalView") - process ! ForwardWrite(idProcess, data) - break - } - previousN = n._1 + + val process = context.actorSelection(s"${hashedProcesses.get(previousN)}/user/globalView") + process ! ForwardWrite(idProcess, data) + break } + else + previousN = n._1 + } } } \ No newline at end of file diff --git a/src/main/scala/layers/InformationDissemination.scala b/src/main/scala/layers/InformationDissemination.scala index 1125101..53f34d0 100644 --- a/src/main/scala/layers/InformationDissemination.scala +++ b/src/main/scala/layers/InformationDissemination.scala @@ -51,7 +51,7 @@ class InformationDissemination extends Actor { case bcastMessage: BroadcastMessage => { totalReceivedMessages = totalReceivedMessages + 1 - log.debug("Initializing bCast") + //log.debug("Initializing bCast") val mid = (bcastMessage.node + bcastMessage.messageType).hashCode @@ -67,23 +67,23 @@ class InformationDissemination extends Actor { case view: ReplyShowView => { - log.debug("Got self active view") + //log.debug("Got self active view") var gossipTargets: List[String] = List.empty currentNeighbours = view.nodes - for (n <- currentNeighbours) - log.debug("Neigh: " + n) + /*for (n <- currentNeighbours) + log.debug("Neigh: " + n)*/ for (msg <- pending) { gossipTargets = randomSelection(msg.senderAddress, msg.forwardBcastMsg.bCastMessage.node) - for (n <- gossipTargets) - log.debug("Random: " + n) + /*for (n <- gossipTargets) + log.debug("Random: " + n)*/ for (p <- gossipTargets) { val process = context.actorSelection(s"${p}/user/informationDissemination") - log.debug("Sending gossip message to: " + p) + //log.debug("Sending gossip message to: " + p) if (msg.forwardBcastMsg.hop <= r) { process ! GossipMessage(ForwardBcast(msg.forwardBcastMsg.mid, msg.forwardBcastMsg.bCastMessage, msg.forwardBcastMsg.hop + 1)) @@ -106,7 +106,7 @@ class InformationDissemination extends Actor { totalReceivedMessages = totalReceivedMessages + 1 gossipMessagesReceived = gossipMessagesReceived + 1 - log.debug("Receiving gossip message from: " + sender.path.address.toString) + //log.debug("Receiving gossip message from: " + sender.path.address.toString) if (!delivered.exists(m => (m.mid.equals(gossipMessage.forwardBcastMsg.mid)))) { delivered = delivered :+ gossipMessage.forwardBcastMsg diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index 92e7867..03e4cca 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -38,33 +38,33 @@ class PartialView extends Actor { contactNode ! Join() addNodeActiveView(message.contactNode) - log.debug("Init message - Sending join to: " + contactNode) + //log.debug("Init message - Sending join to: " + contactNode) val process = context.actorSelection(s"${myself}/user/informationDissemination") process ! BroadcastMessage("add", myself) } context.system.scheduler.schedule(0 seconds, 5 seconds)(startHeartbeat()) - log.debug("Heartbeat of process: " + myself + " has started") + //log.debug("Heartbeat of process: " + myself + " has started") context.system.scheduler.schedule(0 seconds, 5 seconds)(checkDeadProcesses()) - log.debug("Process: " + myself + " is now checking for dead neighbours") + //log.debug("Process: " + myself + " is now checking for dead neighbours") } case receiveJoin: Join => { - log.debug("receiving join from: " + sender) + //log.debug("receiving join from: " + sender) addNodeActiveView(sender.path.address.toString) activeView.filter(node => !node.equals(sender.path.address.toString)).foreach(node => { val process = context.actorSelection(s"${node}/user/partialView") process ! ForwardJoin(sender.path.address.toString, ARWL, myself) - log.debug("Forwarding join to: " + process) + //log.debug("Forwarding join to: " + process) }) } case receiveForward: ForwardJoin => { - log.debug("Receiving FowardJoin from: " + sender + " with awrl: " + receiveForward.arwl) + //log.debug("Receiving FowardJoin from: " + sender + " with awrl: " + receiveForward.arwl) if (receiveForward.arwl == 0 || activeView.size == 1) { addAndNotify(receiveForward.newNode) @@ -74,7 +74,7 @@ class PartialView extends Actor { addNodePassiveView(receiveForward.newNode) } - log.debug("receiving Foward join (Not added directly)") + //log.debug("receiving Foward join (Not added directly)") try { val node: String = Random.shuffle(activeView.filter(node => @@ -82,11 +82,11 @@ class PartialView extends Actor { && !(node.equals(receiveForward.newNode)) && !(node.equals(receiveForward.contactNode)))).head - log.debug("node shuffled: " + node) + //log.debug("node shuffled: " + node) val process = context.actorSelection(s"${node}/user/partialView") process ! ForwardJoin(receiveForward.newNode, receiveForward.arwl - 1, receiveForward.contactNode) - log.debug("FowardJoin with shuffle to: " + process) + //log.debug("FowardJoin with shuffle to: " + process) } catch { case ex: NoSuchElementException => { @@ -97,17 +97,17 @@ class PartialView extends Actor { } case receiveNotify: Notify => { - log.debug("Receiving notify from: " + sender.path.address.toString) + //log.debug("Receiving notify from: " + sender.path.address.toString) addNodeActiveView(sender.path.address.toString) } case disconnectRandomNode: Disconnect => { - log.debug("Receiving disconnect") + //log.debug("Receiving disconnect") if (activeView.contains(disconnectRandomNode.nodeToDisconnect)) { activeView = activeView.filter(!_.equals(disconnectRandomNode.nodeToDisconnect)) addNodePassiveView(disconnectRandomNode.nodeToDisconnect) aliveProcesses -= disconnectRandomNode.nodeToDisconnect - log.debug("Disconnecting: " + disconnectRandomNode.nodeToDisconnect) + //log.debug("Disconnecting: " + disconnectRandomNode.nodeToDisconnect) //Update active view with node from passive view askPassiveView(disconnectRandomNode.nodeToDisconnect) @@ -121,11 +121,11 @@ class PartialView extends Actor { if (priority.equals("force")) { //forces the process to add sender to his active view even if it is full (drops one randomly) addAndNotify(sender.path.address.toString) - log.debug("Node: "+ sender.path.address.toString + " moved from passive to active view") + //log.debug("Node: "+ sender.path.address.toString + " moved from passive to active view") } else { if (activeView.length < aViewSize) { addAndNotify(sender.path.address.toString) - log.debug("Node: "+ sender.path.address.toString + " moved from passive to active view") + //log.debug("Node: "+ sender.path.address.toString + " moved from passive to active view") } } } @@ -189,9 +189,9 @@ class PartialView extends Actor { activeView = activeView.filter(!_.equals(node)) aliveProcesses -= node addNodePassiveView(node) - log.debug("Disconnecting: " + node) + //log.debug("Disconnecting: " + node) - log.debug("Sending disconnect message: " + node) + //log.debug("Sending disconnect message: " + node) val process = context.actorSelection(s"${node}/user/partialView") process ! Disconnect(myself) } @@ -227,7 +227,7 @@ class PartialView extends Actor { val process = context.actorSelection(s"${newNode}/user/partialView") if (!activeView.contains(newNode) || !((newNode).equals(myself))) process ! Notify() - log.debug("Added Node directly - Notifying: " + process) + //log.debug("Added Node directly - Notifying: " + process) } @@ -238,7 +238,7 @@ class PartialView extends Actor { if (nodeToAsk == null) log.warn("No node was selected to perform passive view ask") else { - log.debug("Asking passive view for a new node: " + nodeToAsk) + //log.debug("Asking passive view for a new node: " + nodeToAsk) val process = context.actorSelection(s"${nodeToAsk}/user/partialView") @@ -261,7 +261,7 @@ class PartialView extends Actor { } def addToAliveProcesses(node: String) = { - log.debug("Process " + node + " added to alive processes of " + myself) + //log.debug("Process " + node + " added to alive processes of " + myself) val timer: Double = System.currentTimeMillis() aliveProcesses += (node -> timer) } From e2291633401424d7c78d6a5e9904c5d1f6754854 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Sat, 2 Dec 2017 22:29:24 +0000 Subject: [PATCH 13/26] Write & ForwardWrite implemented (completed) --- src/main/scala/layers/GlobalView.scala | 28 ++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 783c08a..0c38c2a 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -83,8 +83,19 @@ class GlobalView extends Actor { hashedProcesses.toSeq.sortBy(_._1) if(hashedProcesses.contains(idWrite)) { - storage.put((idWrite).toString, write.data) - log.debug("Process id " + idWrite + "EXISTS and ADDED the data " + write.data) + log.debug("HashID " + idWrite + " exists") + if(!idWrite.equals(myself.reverse.hashCode%1000)){ + log.debug("Its not me tho...") + log.debug("Forwarding to HashID " + idWrite) + val process = context.actorSelection(s"${hashedProcesses.get(idWrite).get}/user/globalView") + process ! ForwardWrite(idWrite, write.data) + } + else { + log.debug("And its me!!") + log.debug("Storing HashID " + idWrite.toString + " with the data: " + write.data) + storage.put(idWrite.toString, write.data) + } + } else findProcessForWrite(idWrite, hashedProcesses, write.data) @@ -101,11 +112,16 @@ class GlobalView extends Actor { } case forwardWrite: ForwardWrite => { - log.debug("Process: " + forwardWrite.id + " STORED the data: " + forwardWrite.data) + log.debug("Process hashID: " + math.abs(myself.reverse.hashCode%1000) + " STORED ID: " + forwardWrite.id + " with the data: " + forwardWrite.data) storage.put((forwardWrite.id).toString, forwardWrite.data) } } + + // - - - - - - - - - - - - - - - - - - - - - - - + + + def findProcessForWrite(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], data: String) ={ log.debug("Process " + idProcess + " does NOT EXIST in the System") @@ -119,11 +135,11 @@ class GlobalView extends Actor { log.debug("Process " + n + "is too high") log.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) - var aux : (Iterable[String],Iterable[String]) = hashedProcesses.get(previousN).splitAt(1) - println("AUX: " + aux._1) + println("id of Previous Process: " + hashedProcesses.get(previousN).get) + - val process = context.actorSelection(s"${hashedProcesses.get(previousN)}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") process ! ForwardWrite(idProcess, data) break } From f317f385f7c7728aee4e4a10c6926b17a3690a59 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Sun, 3 Dec 2017 00:21:22 +0000 Subject: [PATCH 14/26] Write and Read output in Application --- src/main/scala/app/Application.scala | 17 ++++- src/main/scala/app/Messages.scala | 8 ++- src/main/scala/layers/GlobalView.scala | 86 ++++++++++++++++++++++---- 3 files changed, 97 insertions(+), 14 deletions(-) diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index a90c59e..45e16f8 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -20,7 +20,8 @@ object Application extends App { case "gv" if (words.length == 2) => showGV(words(1)) case "pv" if (words.length == 2) => showPV(words(1)) case "ms" if (words.length == 2) => messagesStats(words(1)) - case "w" if(words.length == 3) => write(words(1), words(2)) + case "write" if(words.length == 3) => write(words(1), words(2)) + case "read" if(words.length == 2) => read(words(1)) //case "msall" if (words.length == 2) => messagesStatsAll() case "clear" => { for (i <- 1 to 20) @@ -47,6 +48,9 @@ object Application extends App { appActor ! Write(process, data) } + def read(process: String) ={ + appActor ! Read(process) + } //def messagesStatsAll() = { //} @@ -78,6 +82,12 @@ object Application extends App { println ("-------------------------------------------------------------") } + case replyStore: ReplyStoreAction => { + println ("-------------------------------------------------------------") + println (s"${replyStore.replyType} from ${replyStore.myself} with the DATA: ${replyStore.data}") + println ("-------------------------------------------------------------") + } + case stats : ReplyMessagesStats => { println ("-------------------------------------------------------------") println(s"Messages from ${sender.path.address.toString}") @@ -103,6 +113,11 @@ object Application extends App { val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") process ! Write(writeStorage.id, writeStorage.data) } + + case readStorage : Read => { + val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") + process ! Read(readStorage.id) + } } } } \ No newline at end of file diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 6c1053b..dfc2c01 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -1,5 +1,7 @@ package app +import akka.actor.ActorRef + //pView case class InitMessage(selfAddress: String, contactNode: String) @@ -31,6 +33,8 @@ case class ShowGV(address: String) case class ReplyShowView(replyType: String, myself: String, nodes: List[String]) +case class ReplyStoreAction(replyType: String, myself: String, data: String) + // Information Dissemination @@ -61,9 +65,9 @@ case class Write(id: String, data: String) case class Read(id: String) -case class ForwardWrite(id: Int, data: String) - +case class ForwardWrite(id: Int, data: String, appID: ActorRef) +case class ForwardRead(id: Int, appID: ActorRef) //Replication diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 0c38c2a..4743e8b 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -1,8 +1,9 @@ package layers -import akka.actor.Actor +import akka.actor.{Actor, ActorRef} import app._ import com.typesafe.scalalogging.Logger + import scala.util.control.Breaks._ @@ -72,7 +73,6 @@ class GlobalView extends Actor { case write: Write => { - var idWrite = math.abs(write.id.reverse.hashCode%1000) log.debug("Received write with key: " + idWrite) @@ -84,36 +84,81 @@ class GlobalView extends Actor { if(hashedProcesses.contains(idWrite)) { log.debug("HashID " + idWrite + " exists") + if(!idWrite.equals(myself.reverse.hashCode%1000)){ log.debug("Its not me tho...") log.debug("Forwarding to HashID " + idWrite) val process = context.actorSelection(s"${hashedProcesses.get(idWrite).get}/user/globalView") - process ! ForwardWrite(idWrite, write.data) + process ! ForwardWrite(idWrite, write.data, sender) } else { log.debug("And its me!!") log.debug("Storing HashID " + idWrite.toString + " with the data: " + write.data) storage.put(idWrite.toString, write.data) + + //Send back to Application + sender ! ReplyStoreAction("Write", myself, write.data) } } else - findProcessForWrite(idWrite, hashedProcesses, write.data) + findProcessForWrite(idWrite, hashedProcesses, write.data, sender) } + case read: Read => { - if (storage.exists(_ == read.id)) { - storage.get(math.abs((read.id.hashCode%1000)).toString) + print("Received Read from application") + var idRead = math.abs(read.id.reverse.hashCode%1000) + var hashedProcesses = scala.collection.mutable.HashMap[Int, String]() + for(n<-globalView){ + hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) + } + hashedProcesses.toSeq.sortBy(_._1) + + if(hashedProcesses.contains(idRead)) { + log.debug("HashID " + idRead + " exists") + if(!idRead.equals(myself.reverse.hashCode%1000)) { + log.debug("Its not me tho...") + + if (storage.exists(_ == idRead)) { + log.debug("But I have it stored!!") + storage.get(idRead.toString) + log.debug("Read completed! Data: " + storage.get(idRead.toString)) + sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) + } + else{ + findProcessForRead(idRead, hashedProcesses, sender) + } + + } + else { //ITS MEEE + log.debug("I have the read! Data: " + storage.get(idRead.toString)) + storage.get(idRead.toString) + sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) + } + } else defaultData - } case forwardWrite: ForwardWrite => { log.debug("Process hashID: " + math.abs(myself.reverse.hashCode%1000) + " STORED ID: " + forwardWrite.id + " with the data: " + forwardWrite.data) storage.put((forwardWrite.id).toString, forwardWrite.data) + + //Send back to Application + val process = context.actorSelection(s"${forwardWrite.appID.path}") + process ! ReplyStoreAction("Write", myself, forwardWrite.data) + } + + case forwardRead: ForwardRead => { + log.debug("Process hashID: " + math.abs(myself.reverse.hashCode%1000) + " Got stored HashID: " + forwardRead.id + " with the data: " + storage.get(forwardRead.id.toString)) + storage.get(forwardRead.id.toString) + + //Send back to Application + val process = context.actorSelection(s"${forwardRead.appID.path}") + process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.id.toString).get) } } @@ -122,7 +167,7 @@ class GlobalView extends Actor { - def findProcessForWrite(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], data: String) ={ + def findProcessForWrite(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], data: String, appID: ActorRef) ={ log.debug("Process " + idProcess + " does NOT EXIST in the System") @@ -133,14 +178,33 @@ class GlobalView extends Actor { log.debug("processID: " + n._1) if (n._1 > idProcess) { log.debug("Process " + n + "is too high") - log.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) + log.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + process ! ForwardWrite(idProcess, data, appID) + break + } + else + previousN = n._1 + } - println("id of Previous Process: " + hashedProcesses.get(previousN).get) + } + def findProcessForRead(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], appID: ActorRef) ={ + log.debug("Process " + idProcess + " does NOT EXIST in the System") + + var previousN = hashID_2551 + + for (n <- hashedProcesses) { + log.debug("hashProcess: " + n) + log.debug("processID: " + n._1) + if (n._1 > idProcess) { + log.debug("Process " + n + "is too high") + log.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") - process ! ForwardWrite(idProcess, data) + process ! ForwardRead(idProcess, appID) break } else From 3f54d0dd95ba57c51bcbd446ebf9f2f999e7a0cc Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Sun, 3 Dec 2017 23:41:35 +0000 Subject: [PATCH 15/26] Update on Read --- src/main/scala/app/Application.scala | 20 ++++++++++++++++++-- src/main/scala/layers/GlobalView.scala | 24 +++++++++++++++++------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index 45e16f8..96ecab2 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -13,8 +13,14 @@ object Application extends App { val appActor = sys.actorOf(Props[appActor], "appActor") while (true) { + //println("A ler outro input...") val line = scala.io.StdIn.readLine() + //println("Linha lida: " + line) var words: Array[String] = line.split("\\s") + //println("Lista de palavras lidas:") + //for(n<-words){ + //println(n) + //} words(0) match { case "gv" if (words.length == 2) => showGV(words(1)) @@ -69,6 +75,16 @@ object Application extends App { process ! ShowPV } + case Write(id, data) => { + val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") + process ! Write(id, data) + } + + case Read(id) => { + val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") + process ! Read(id) + } + case MessagesStats(x) => { val process = sys.actorSelection(s"${x}/user/informationDissemination") process ! MessagesStats @@ -109,7 +125,7 @@ object Application extends App { println ("-------------------------------------------------------------") } - case writeStorage : Write => { + /*case writeStorage : Write => { val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") process ! Write(writeStorage.id, writeStorage.data) } @@ -117,7 +133,7 @@ object Application extends App { case readStorage : Read => { val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") process ! Read(readStorage.id) - } + }*/ } } } \ No newline at end of file diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 4743e8b..6cc596d 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -21,7 +21,7 @@ class GlobalView extends Actor { //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() var storage = scala.collection.mutable.HashMap[String, String]() - var defaultData: List[Byte] = List.empty + //var defaultData: List[Byte] = List.empty override def receive = { @@ -140,7 +140,7 @@ class GlobalView extends Actor { } else - defaultData + findProcessForRead(idRead, hashedProcesses, sender) } case forwardWrite: ForwardWrite => { @@ -153,12 +153,22 @@ class GlobalView extends Actor { } case forwardRead: ForwardRead => { - log.debug("Process hashID: " + math.abs(myself.reverse.hashCode%1000) + " Got stored HashID: " + forwardRead.id + " with the data: " + storage.get(forwardRead.id.toString)) - storage.get(forwardRead.id.toString) - //Send back to Application - val process = context.actorSelection(s"${forwardRead.appID.path}") - process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.id.toString).get) + if(storage.contains(forwardRead.id.toString)) { + log.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.id + " with the data: " + storage.get(forwardRead.id.toString)) + storage.get(forwardRead.id.toString) + + //Send back to Application + val process = context.actorSelection(s"${forwardRead.appID.path}") + process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.id.toString).get) + } + else{ + log.debug("The read with the id: " + forwardRead.id + " does not exist in the System.") + + //Send back to Application + val process = context.actorSelection(s"${forwardRead.appID.path}") + process ! ReplyStoreAction("Read", myself, "Read not found in the System!") + } } } From e1295eff7b8ddfba699f650293befa16489e408c Mon Sep 17 00:00:00 2001 From: aanciaes Date: Mon, 4 Dec 2017 12:19:23 +0000 Subject: [PATCH 16/26] fixing bugs --- src/main/resources/logback.xml | 5 +- src/main/scala/app/Application.scala | 14 +-- src/main/scala/app/Messages.scala | 8 +- src/main/scala/layers/GlobalView.scala | 111 ++++++++++-------- .../layers/InformationDissemination.scala | 2 +- src/main/scala/layers/PartialView.scala | 2 +- 6 files changed, 76 insertions(+), 66 deletions(-) diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index ccf2d6b..d4ab3cc 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -9,9 +9,10 @@ - + + - + \ No newline at end of file diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index 96ecab2..a1a7a63 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -6,7 +6,7 @@ import com.typesafe.scalalogging.Logger object Application extends App { - val log = Logger("scala.slick") + val log = Logger("phase1") val config = ConfigFactory.load.getConfig("ApplicationConfig") val sys = ActorSystem("akkaSystem", config) @@ -50,12 +50,12 @@ object Application extends App { appActor ! MessagesStats(process) } - def write(process: String, data: String) ={ - appActor ! Write(process, data) + def write(dataId: String, data: String) ={ + appActor ! Write(dataId, data) } - def read(process: String) ={ - appActor ! Read(process) + def read(dataId: String) ={ + appActor ! Read(dataId) } //def messagesStatsAll() = { @@ -75,9 +75,9 @@ object Application extends App { process ! ShowPV } - case Write(id, data) => { + case Write(dataId, data) => { val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") - process ! Write(id, data) + process ! Write(dataId, data) } case Read(id) => { diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index dfc2c01..4ca2661 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -61,13 +61,13 @@ case class GossipRequest(mid: Int) //Storage //case class Write(id: String, data: List[Byte]) -case class Write(id: String, data: String) +case class Write(dataId: String, data: String) -case class Read(id: String) +case class Read(dataId: String) -case class ForwardWrite(id: Int, data: String, appID: ActorRef) +case class ForwardWrite(hashedDataId: Int, data: String, appID: ActorRef) -case class ForwardRead(id: Int, appID: ActorRef) +case class ForwardRead(hashedDataId: Int, appID: ActorRef) //Replication diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 6cc596d..c95d468 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -10,7 +10,9 @@ import scala.util.control.Breaks._ class GlobalView extends Actor { - val log = Logger("scala.slick") + val log = Logger("phase1") + val log2 = Logger("phase2") + val hashID_2551 : Int = 474 var globalView: List[String] = List.empty @@ -73,28 +75,27 @@ class GlobalView extends Actor { case write: Write => { - var idWrite = math.abs(write.id.reverse.hashCode%1000) - log.debug("Received write with key: " + idWrite) + var hashedDataId = math.abs(write.dataId.reverse.hashCode%1000) + log2.debug("Received write with key: " + hashedDataId) - var hashedProcesses = scala.collection.mutable.HashMap[Int, String]() + var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() for(n<-globalView){ hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) } - hashedProcesses.toSeq.sortBy(_._1) - if(hashedProcesses.contains(idWrite)) { - log.debug("HashID " + idWrite + " exists") + if(hashedProcesses.contains(hashedDataId)) { + log2.debug("HashID " + hashedDataId + " exists") - if(!idWrite.equals(myself.reverse.hashCode%1000)){ - log.debug("Its not me tho...") - log.debug("Forwarding to HashID " + idWrite) - val process = context.actorSelection(s"${hashedProcesses.get(idWrite).get}/user/globalView") - process ! ForwardWrite(idWrite, write.data, sender) + if(!hashedDataId.equals(myself.reverse.hashCode%1000)){ + log2.debug("Its not me tho...") + log2.debug("Forwarding to HashID " + hashedDataId) + val process = context.actorSelection(s"${hashedProcesses.get(hashedDataId).get}/user/globalView") + process ! ForwardWrite(hashedDataId, write.data, sender) } else { - log.debug("And its me!!") - log.debug("Storing HashID " + idWrite.toString + " with the data: " + write.data) - storage.put(idWrite.toString, write.data) + log2.debug("And its me!!") + log2.debug("Storing HashID " + hashedDataId.toString + " with the data: " + write.data) + storage.put(hashedDataId.toString, write.data) //Send back to Application sender ! ReplyStoreAction("Write", myself, write.data) @@ -102,29 +103,29 @@ class GlobalView extends Actor { } else - findProcessForWrite(idWrite, hashedProcesses, write.data, sender) + findProcessForWrite(hashedDataId, hashedProcesses, write.data, sender) } case read: Read => { print("Received Read from application") - var idRead = math.abs(read.id.reverse.hashCode%1000) - var hashedProcesses = scala.collection.mutable.HashMap[Int, String]() + var idRead = math.abs(read.dataId.reverse.hashCode%1000) + var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() for(n<-globalView){ hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) } hashedProcesses.toSeq.sortBy(_._1) if(hashedProcesses.contains(idRead)) { - log.debug("HashID " + idRead + " exists") + log2.debug("HashID " + idRead + " exists") if(!idRead.equals(myself.reverse.hashCode%1000)) { - log.debug("Its not me tho...") + log2.debug("Its not me tho...") - if (storage.exists(_ == idRead)) { - log.debug("But I have it stored!!") + if (storage.contains(idRead.toString)) { + log2.debug("But I have it stored!!") storage.get(idRead.toString) - log.debug("Read completed! Data: " + storage.get(idRead.toString)) + log2.debug("Read completed! Data: " + storage.get(idRead.toString)) sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) } else{ @@ -133,7 +134,7 @@ class GlobalView extends Actor { } else { //ITS MEEE - log.debug("I have the read! Data: " + storage.get(idRead.toString)) + log2.debug("I have the read! Data: " + storage.get(idRead.toString)) storage.get(idRead.toString) sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) } @@ -144,8 +145,8 @@ class GlobalView extends Actor { } case forwardWrite: ForwardWrite => { - log.debug("Process hashID: " + math.abs(myself.reverse.hashCode%1000) + " STORED ID: " + forwardWrite.id + " with the data: " + forwardWrite.data) - storage.put((forwardWrite.id).toString, forwardWrite.data) + log2.debug("myself hashed: " + math.abs(myself.reverse.hashCode%1000) + " STORED ID: " + forwardWrite.hashedDataId + " with the data: " + forwardWrite.data) + storage.put((forwardWrite.hashedDataId).toString, forwardWrite.data) //Send back to Application val process = context.actorSelection(s"${forwardWrite.appID.path}") @@ -154,16 +155,16 @@ class GlobalView extends Actor { case forwardRead: ForwardRead => { - if(storage.contains(forwardRead.id.toString)) { - log.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.id + " with the data: " + storage.get(forwardRead.id.toString)) - storage.get(forwardRead.id.toString) + if(storage.contains(forwardRead.hashedDataId.toString)) { + log2.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.hashedDataId + " with the data: " + storage.get(forwardRead.hashedDataId.toString)) + storage.get(forwardRead.hashedDataId.toString) //Send back to Application val process = context.actorSelection(s"${forwardRead.appID.path}") - process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.id.toString).get) + process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.hashedDataId.toString).get) } else{ - log.debug("The read with the id: " + forwardRead.id + " does not exist in the System.") + log2.debug("The read with the id: " + forwardRead.hashedDataId + " does not exist in the System.") //Send back to Application val process = context.actorSelection(s"${forwardRead.appID.path}") @@ -177,41 +178,49 @@ class GlobalView extends Actor { - def findProcessForWrite(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], data: String, appID: ActorRef) ={ + def findProcessForWrite(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], data: String, appID: ActorRef) ={ - log.debug("Process " + idProcess + " does NOT EXIST in the System") + log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") var previousN = hashID_2551 - for (n <- hashedProcesses) { - log.debug("hashProcess: " + n) - log.debug("processID: " + n._1) - if (n._1 > idProcess) { - log.debug("Process " + n + "is too high") - log.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + if(hashedProcesses.size==1){ + storage.put((hashedDataId).toString, data) - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") - process ! ForwardWrite(idProcess, data, appID) - break + //Send back to Application + val process = context.actorSelection(s"${appID.path}") + process ! ReplyStoreAction("Write", myself, data) + + }else{ + for ((hash, process) <- hashedProcesses) { + log2.debug("hashProcess: " + hash) + log2.debug("process: " + process) + if (hash > hashedDataId) { + log2.debug("Process " + hash + "is too high") + log2.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + process ! ForwardWrite(hashedDataId, data, appID) + break + } + else + previousN = hash } - else - previousN = n._1 } - } - def findProcessForRead(idProcess: Int, hashedProcesses: scala.collection.mutable.HashMap[Int, String], appID: ActorRef) ={ + def findProcessForRead(idProcess: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], appID: ActorRef) ={ - log.debug("Process " + idProcess + " does NOT EXIST in the System") + log2.debug("Process " + idProcess + " does NOT EXIST in the System") var previousN = hashID_2551 for (n <- hashedProcesses) { - log.debug("hashProcess: " + n) - log.debug("processID: " + n._1) + log2.debug("hashProcess: " + n) + log2.debug("processID: " + n._1) if (n._1 > idProcess) { - log.debug("Process " + n + "is too high") - log.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) + log2.debug("Process " + n + "is too high") + log2.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") process ! ForwardRead(idProcess, appID) diff --git a/src/main/scala/layers/InformationDissemination.scala b/src/main/scala/layers/InformationDissemination.scala index 53f34d0..9846aad 100644 --- a/src/main/scala/layers/InformationDissemination.scala +++ b/src/main/scala/layers/InformationDissemination.scala @@ -12,7 +12,7 @@ import scala.concurrent.ExecutionContext.Implicits.global class InformationDissemination extends Actor { - val log = Logger("scala.slick") + val log = Logger("phase1") //Messages Stats var totalSentMessages: Int = 0 diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index 03e4cca..4ade824 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -13,7 +13,7 @@ import scala.concurrent.ExecutionContext.Implicits.global class PartialView extends Actor { - val log = Logger("scala.slick") + val log = Logger("phase1") var activeView: List[String] = List.empty var passiveView: List[String] = List.empty From 7e548323fc9fbac3e69d1ca81476c53e3f56ea87 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Mon, 4 Dec 2017 15:19:58 +0000 Subject: [PATCH 17/26] Write and Read fully implemented and tested --- src/main/scala/app/Application.scala | 26 +--- src/main/scala/app/Messages.scala | 1 - src/main/scala/layers/GlobalView.scala | 173 ++++++++++++++----------- 3 files changed, 106 insertions(+), 94 deletions(-) diff --git a/src/main/scala/app/Application.scala b/src/main/scala/app/Application.scala index a1a7a63..e4be50f 100644 --- a/src/main/scala/app/Application.scala +++ b/src/main/scala/app/Application.scala @@ -7,20 +7,17 @@ import com.typesafe.scalalogging.Logger object Application extends App { val log = Logger("phase1") + val defaultProcess = "akka.tcp://AkkaSystem@127.0.0.1:2551" val config = ConfigFactory.load.getConfig("ApplicationConfig") val sys = ActorSystem("akkaSystem", config) val appActor = sys.actorOf(Props[appActor], "appActor") while (true) { - //println("A ler outro input...") + val line = scala.io.StdIn.readLine() - //println("Linha lida: " + line) var words: Array[String] = line.split("\\s") - //println("Lista de palavras lidas:") - //for(n<-words){ - //println(n) - //} + words(0) match { case "gv" if (words.length == 2) => showGV(words(1)) @@ -76,13 +73,13 @@ object Application extends App { } case Write(dataId, data) => { - val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") + val process = sys.actorSelection(s"${defaultProcess}/user/globalView") process ! Write(dataId, data) } - case Read(id) => { - val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") - process ! Read(id) + case Read(dataId) => { + val process = sys.actorSelection(s"${defaultProcess}/user/globalView") + process ! Read(dataId) } case MessagesStats(x) => { @@ -125,15 +122,6 @@ object Application extends App { println ("-------------------------------------------------------------") } - /*case writeStorage : Write => { - val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") - process ! Write(writeStorage.id, writeStorage.data) - } - - case readStorage : Read => { - val process = sys.actorSelection(s"${"akka.tcp://AkkaSystem@127.0.0.1:2551"}/user/globalView") - process ! Read(readStorage.id) - }*/ } } } \ No newline at end of file diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 4ca2661..abcda4a 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -60,7 +60,6 @@ case class GossipRequest(mid: Int) //Storage -//case class Write(id: String, data: List[Byte]) case class Write(dataId: String, data: String) case class Read(dataId: String) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index c95d468..093e730 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -4,26 +4,18 @@ import akka.actor.{Actor, ActorRef} import app._ import com.typesafe.scalalogging.Logger -import scala.util.control.Breaks._ - - class GlobalView extends Actor { val log = Logger("phase1") val log2 = Logger("phase2") - val hashID_2551 : Int = 474 + val hashID_2551: Int = math.abs(("akka.tcp://AkkaSystem@127.0.0.1:2551").reverse.hashCode % 1000) //474 in localhost var globalView: List[String] = List.empty var myself: String = "" - - var id: Int = 0 - - //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() var storage = scala.collection.mutable.HashMap[String, String]() - //var defaultData: List[Byte] = List.empty override def receive = { @@ -32,8 +24,8 @@ class GlobalView extends Actor { globalView = globalView :+ myself - id = math.abs(init.selfAddress.reverse.hashCode%1000) - println ("Unique Identifier: " + id) + id = math.abs(init.selfAddress.reverse.hashCode % 1000) + println("Unique Identifier: " + id) val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV @@ -44,8 +36,8 @@ class GlobalView extends Actor { message.messageType match { case "add" => { if (!message.node.equals(myself)) - //log.debug("adding: " + message.node + " to global view") - globalView = globalView :+ message.node + //log.debug("adding: " + message.node + " to global view") + globalView = globalView :+ message.node } case "del" => { if (!message.node.equals(myself)) @@ -62,7 +54,7 @@ class GlobalView extends Actor { //Since all global views are up to date, on init //Gets contact node global view and copies it to is own case reply: ReplyShowView => { - for (n <- reply.nodes.filter(!_.equals(myself))){ + for (n <- reply.nodes.filter(!_.equals(myself))) { globalView = globalView :+ n } @@ -75,18 +67,18 @@ class GlobalView extends Actor { case write: Write => { - var hashedDataId = math.abs(write.dataId.reverse.hashCode%1000) + var hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000) log2.debug("Received write with key: " + hashedDataId) var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() - for(n<-globalView){ - hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) + for (n <- globalView) { + hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n) } - if(hashedProcesses.contains(hashedDataId)) { + if (hashedProcesses.contains(hashedDataId)) { log2.debug("HashID " + hashedDataId + " exists") - if(!hashedDataId.equals(myself.reverse.hashCode%1000)){ + if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { log2.debug("Its not me tho...") log2.debug("Forwarding to HashID " + hashedDataId) val process = context.actorSelection(s"${hashedProcesses.get(hashedDataId).get}/user/globalView") @@ -110,42 +102,42 @@ class GlobalView extends Actor { case read: Read => { print("Received Read from application") - var idRead = math.abs(read.dataId.reverse.hashCode%1000) + var hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000) var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() - for(n<-globalView){ - hashedProcesses.put(math.abs((n.reverse.hashCode%1000)), n) + for (n <- globalView) { + hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n) } - hashedProcesses.toSeq.sortBy(_._1) - if(hashedProcesses.contains(idRead)) { - log2.debug("HashID " + idRead + " exists") - if(!idRead.equals(myself.reverse.hashCode%1000)) { + + if (hashedProcesses.contains(hashedDataId)) { + log2.debug("HashID " + hashedDataId + " exists") + if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { log2.debug("Its not me tho...") - if (storage.contains(idRead.toString)) { + if (storage.contains(hashedDataId.toString)) { log2.debug("But I have it stored!!") - storage.get(idRead.toString) - log2.debug("Read completed! Data: " + storage.get(idRead.toString)) - sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) + storage.get(hashedDataId.toString) + log2.debug("Read completed! Data: " + storage.get(hashedDataId.toString)) + sender ! ReplyStoreAction("Read", myself, storage.get(hashedDataId.toString).get) } - else{ - findProcessForRead(idRead, hashedProcesses, sender) + else { + findProcessForRead(hashedDataId, hashedProcesses, sender) } } else { //ITS MEEE - log2.debug("I have the read! Data: " + storage.get(idRead.toString)) - storage.get(idRead.toString) - sender ! ReplyStoreAction("Read", myself, storage.get(idRead.toString).get) + log2.debug("I have the read! Data: " + storage.get(hashedDataId.toString)) + storage.get(hashedDataId.toString) + sender ! ReplyStoreAction("Read", myself, storage.get(hashedDataId.toString).get) } } else - findProcessForRead(idRead, hashedProcesses, sender) + findProcessForRead(hashedDataId, hashedProcesses, sender) } case forwardWrite: ForwardWrite => { - log2.debug("myself hashed: " + math.abs(myself.reverse.hashCode%1000) + " STORED ID: " + forwardWrite.hashedDataId + " with the data: " + forwardWrite.data) + log2.debug("myself hashed: " + math.abs(myself.reverse.hashCode % 1000) + " STORED ID: " + forwardWrite.hashedDataId + " with the data: " + forwardWrite.data) storage.put((forwardWrite.hashedDataId).toString, forwardWrite.data) //Send back to Application @@ -154,16 +146,16 @@ class GlobalView extends Actor { } case forwardRead: ForwardRead => { - - if(storage.contains(forwardRead.hashedDataId.toString)) { - log2.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.hashedDataId + " with the data: " + storage.get(forwardRead.hashedDataId.toString)) + println("---ForwardRead---") + if (storage.contains(forwardRead.hashedDataId.toString)) { + log2.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.hashedDataId + " with the data: " + storage.get(forwardRead.hashedDataId.toString).get) storage.get(forwardRead.hashedDataId.toString) //Send back to Application val process = context.actorSelection(s"${forwardRead.appID.path}") process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.hashedDataId.toString).get) } - else{ + else { log2.debug("The read with the id: " + forwardRead.hashedDataId + " does not exist in the System.") //Send back to Application @@ -177,58 +169,91 @@ class GlobalView extends Actor { // - - - - - - - - - - - - - - - - - - - - - - - - - def findProcessForWrite(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], data: String, appID: ActorRef) ={ - + def findProcessForWrite(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], data: String, appID: ActorRef) = { log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") - var previousN = hashID_2551 - - if(hashedProcesses.size==1){ - storage.put((hashedDataId).toString, data) + var previousN = hashID_2551 + var count = 1 + var break = false - //Send back to Application - val process = context.actorSelection(s"${appID.path}") - process ! ReplyStoreAction("Write", myself, data) + for ((hash, process) <- hashedProcesses) { - }else{ - for ((hash, process) <- hashedProcesses) { + if (!break) { log2.debug("hashProcess: " + hash) log2.debug("process: " + process) + if (hash > hashedDataId) { - log2.debug("Process " + hash + "is too high") - log2.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + log2.debug("Process " + hash + " is too high") - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") - process ! ForwardWrite(hashedDataId, data, appID) - break + //write 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 + if (hash == hashedProcesses.firstKey) { + val process = context.actorSelection(s"${hashedProcesses.last._2}/user/globalView") + process ! ForwardWrite(hashedDataId, data, appID) + break = true + } + else { + log2.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + process ! ForwardWrite(hashedDataId, data, appID) + break = true + } } - else + + else { + if (count == hashedProcesses.size) { + val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/globalView") + process ! ForwardWrite(hashedDataId, data, appID) + break = true + } + count = count + 1 previousN = hash + } } } } - def findProcessForRead(idProcess: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], appID: ActorRef) ={ - log2.debug("Process " + idProcess + " does NOT EXIST in the System") + def findProcessForRead(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], appID: ActorRef) = { + log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") - var previousN = hashID_2551 + var previousN = hashID_2551 + var count = 1 + var break = false - for (n <- hashedProcesses) { - log2.debug("hashProcess: " + n) - log2.debug("processID: " + n._1) - if (n._1 > idProcess) { - log2.debug("Process " + n + "is too high") - log2.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN)) + for ((hash, process) <- hashedProcesses) { - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") - process ! ForwardRead(idProcess, appID) - break + if (!break) { + log2.debug("hashProcess: " + hash) + log2.debug("processID: " + process) + if (hash > hashedDataId) { + log2.debug("Process " + hash + " is too high") + + //read 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 + if (hash == hashedProcesses.firstKey) { + val process = context.actorSelection(s"${hashedProcesses.last._2}/user/globalView") + process ! ForwardRead(hashedDataId, appID) + break = true + } + else { + log2.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) + + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + process ! ForwardRead(hashedDataId, appID) + break = true + } + } + + else { + if (count == hashedProcesses.size) { + val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/globalView") + process ! ForwardRead(hashedDataId, appID) + break = true + } + count = count + 1 + previousN = hash + } } - else - previousN = n._1 } - } } \ No newline at end of file From d685f391a2d1deada0bc482a796d75c19731371d Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Wed, 6 Dec 2017 02:06:41 +0000 Subject: [PATCH 18/26] New Class for Storage, Write and Read tested. --- src/main/scala/app/Messages.scala | 2 + src/main/scala/app/Process.scala | 5 +- src/main/scala/layers/GlobalView.scala | 70 ++++++------------------- src/main/scala/layers/Replication.scala | 17 ++++++ src/main/scala/layers/Storage.scala | 47 ++++++++++------- 5 files changed, 67 insertions(+), 74 deletions(-) create mode 100644 src/main/scala/layers/Replication.scala diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index abcda4a..9cb190f 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -60,6 +60,8 @@ case class GossipRequest(mid: Int) //Storage +case class InitStorage(selfAddress: String) + case class Write(dataId: String, data: String) case class Read(dataId: String) diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index d64688c..2911f4f 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -3,7 +3,7 @@ package app import akka.actor.{ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import com.typesafe.scalalogging.Logger -import layers.{GlobalView, InformationDissemination, PartialView} +import layers.{GlobalView, InformationDissemination, PartialView, Storage} //import replication._ @@ -23,7 +23,7 @@ object Process extends App { val globalView = sys.actorOf(Props[GlobalView], "globalView") val partialView = sys.actorOf(Props[PartialView], "partialView") val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination") - + val storage = sys.actorOf(Props[Storage], "storage") /*val proposer = sys.actorOf(Props[Proposer], "proposer") val accepter = sys.actorOf(Props[Accepter], "accepter") @@ -38,6 +38,7 @@ object Process extends App { globalView ! InitGlobView(selfAddress, contactNode) partialView ! InitMessage(selfAddress, contactNode) informationDissemination ! InitGossip(selfAddress) + storage ! InitStorage(selfAddress) def configureRemote(): Config = { diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 093e730..825e7c7 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -15,7 +15,8 @@ class GlobalView extends Actor { var globalView: List[String] = List.empty var myself: String = "" var id: Int = 0 - var storage = scala.collection.mutable.HashMap[String, String]() + //var storage = scala.collection.mutable.HashMap[String, String]() + //var pending = scala.collection.mutable.Queue[] override def receive = { @@ -23,7 +24,6 @@ class GlobalView extends Actor { myself = init.selfAddress globalView = globalView :+ myself - id = math.abs(init.selfAddress.reverse.hashCode % 1000) println("Unique Identifier: " + id) @@ -81,16 +81,15 @@ class GlobalView extends Actor { if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { log2.debug("Its not me tho...") log2.debug("Forwarding to HashID " + hashedDataId) - val process = context.actorSelection(s"${hashedProcesses.get(hashedDataId).get}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(hashedDataId).get}/user/storage") process ! ForwardWrite(hashedDataId, write.data, sender) } else { log2.debug("And its me!!") log2.debug("Storing HashID " + hashedDataId.toString + " with the data: " + write.data) - storage.put(hashedDataId.toString, write.data) - //Send back to Application - sender ! ReplyStoreAction("Write", myself, write.data) + val process = context.actorSelection(s"${myself}/user/storage") + process ! ForwardWrite(hashedDataId, write.data, sender) } } @@ -113,56 +112,18 @@ class GlobalView extends Actor { log2.debug("HashID " + hashedDataId + " exists") if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { log2.debug("Its not me tho...") - - if (storage.contains(hashedDataId.toString)) { - log2.debug("But I have it stored!!") - storage.get(hashedDataId.toString) - log2.debug("Read completed! Data: " + storage.get(hashedDataId.toString)) - sender ! ReplyStoreAction("Read", myself, storage.get(hashedDataId.toString).get) - } - else { - findProcessForRead(hashedDataId, hashedProcesses, sender) - } - + findProcessForRead(hashedDataId, hashedProcesses, sender) } else { //ITS MEEE - log2.debug("I have the read! Data: " + storage.get(hashedDataId.toString)) - storage.get(hashedDataId.toString) - sender ! ReplyStoreAction("Read", myself, storage.get(hashedDataId.toString).get) - } + val process = context.actorSelection(s"${myself}/user/storage") + process ! ForwardRead(hashedDataId, sender) + } } else findProcessForRead(hashedDataId, hashedProcesses, sender) } - case forwardWrite: ForwardWrite => { - log2.debug("myself hashed: " + math.abs(myself.reverse.hashCode % 1000) + " STORED ID: " + forwardWrite.hashedDataId + " with the data: " + forwardWrite.data) - storage.put((forwardWrite.hashedDataId).toString, forwardWrite.data) - - //Send back to Application - val process = context.actorSelection(s"${forwardWrite.appID.path}") - process ! ReplyStoreAction("Write", myself, forwardWrite.data) - } - - case forwardRead: ForwardRead => { - println("---ForwardRead---") - if (storage.contains(forwardRead.hashedDataId.toString)) { - log2.debug("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + forwardRead.hashedDataId + " with the data: " + storage.get(forwardRead.hashedDataId.toString).get) - storage.get(forwardRead.hashedDataId.toString) - - //Send back to Application - val process = context.actorSelection(s"${forwardRead.appID.path}") - process ! ReplyStoreAction("Read", myself, storage.get(forwardRead.hashedDataId.toString).get) - } - else { - log2.debug("The read with the id: " + forwardRead.hashedDataId + " does not exist in the System.") - - //Send back to Application - val process = context.actorSelection(s"${forwardRead.appID.path}") - process ! ReplyStoreAction("Read", myself, "Read not found in the System!") - } - } } @@ -187,14 +148,15 @@ class GlobalView extends Actor { //write 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 if (hash == hashedProcesses.firstKey) { - val process = context.actorSelection(s"${hashedProcesses.last._2}/user/globalView") + log2.debug("Forward Write to: " + hashedProcesses.last._2) + val process = context.actorSelection(s"${hashedProcesses.last._2}/user/storage") process ! ForwardWrite(hashedDataId, data, appID) break = true } else { log2.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/storage") process ! ForwardWrite(hashedDataId, data, appID) break = true } @@ -202,7 +164,7 @@ class GlobalView extends Actor { else { if (count == hashedProcesses.size) { - val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/storage") process ! ForwardWrite(hashedDataId, data, appID) break = true } @@ -231,14 +193,14 @@ class GlobalView extends Actor { //read 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 if (hash == hashedProcesses.firstKey) { - val process = context.actorSelection(s"${hashedProcesses.last._2}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.last._2}/user/storage") process ! ForwardRead(hashedDataId, appID) break = true } else { log2.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/storage") process ! ForwardRead(hashedDataId, appID) break = true } @@ -246,7 +208,7 @@ class GlobalView extends Actor { else { if (count == hashedProcesses.size) { - val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/globalView") + val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/storage") process ! ForwardRead(hashedDataId, appID) break = true } diff --git a/src/main/scala/layers/Replication.scala b/src/main/scala/layers/Replication.scala new file mode 100644 index 0000000..a74270c --- /dev/null +++ b/src/main/scala/layers/Replication.scala @@ -0,0 +1,17 @@ +/* +package layers + +import akka.actor._ + +class Replication extends Actor{ + + var stateMachine = scala.collection.mutable.Map[String,String]() + + override def receive = { + + + + } + +} +*/ diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 3a164cc..bacaaa2 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -1,4 +1,3 @@ -/* package layers import akka.actor.Actor @@ -6,38 +5,50 @@ import app._ class Storage extends Actor{ - var storage = scala.collection.mutable.HashMap[String, List[Byte]]() - var defaultData: List[Byte] = List.empty + val hashID_2551: Int = math.abs(("akka.tcp://AkkaSystem@127.0.0.1:2551").reverse.hashCode % 1000) //474 in localhost + var storage = scala.collection.mutable.HashMap[String, String]() + var pending = scala.collection.mutable.Queue[String]() + //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() + //var defaultData: List[Byte] = List.empty + + var myself: String = "" override def receive = { case init: InitStorage => { myself = init.selfAddress - globalView = globalView :+ myself - - val process = context.actorSelection(s"${init.contactNode}/user/globalView") - process ! ShowGV } - case write: Write => { - - + case write: ForwardWrite => { + println("myself hashed: " + math.abs(myself.reverse.hashCode % 1000) + " STORED ID: " + write.hashedDataId + " with the data: " + write.data.toString) + storage.put((write.hashedDataId).toString, write.data) - - storage.put(write.id, write.data) + //Send back to Application + val process = context.actorSelection(s"${write.appID.path}") + process ! ReplyStoreAction("Write", myself, write.data) } - case read: Read => { + case read: ForwardRead => { + println("---ForwardRead---") + if (storage.contains(read.hashedDataId.toString)) { + println("Process hashID: " + math.abs(myself.reverse.hashCode % 1000) + " Got stored HashID: " + read.hashedDataId + " with the data: " + storage.get(read.hashedDataId.toString).get.toString) + storage.get(read.hashedDataId.toString) + + println("Read completed! Data: " + storage.get(read.hashedDataId.toString)) - if(storage.exists(_ == read.id)) { - storage.get(read.id) + //Send back to Application + val process = context.actorSelection(s"${read.appID.path}") + process ! ReplyStoreAction("Read", myself, storage.get(read.hashedDataId.toString).get) } - else - defaultData + else { + println("The read with the id: " + read.hashedDataId + " does not exist in the System.") + //Send back to Application + val process = context.actorSelection(s"${read.appID.path}") + process ! ReplyStoreAction("Read", myself, "Read not found in the System!") + } } } } -*/ From 47ccf80fb559599b3ebb6a20f537488507e3e1aa Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Wed, 6 Dec 2017 17:31:58 +0000 Subject: [PATCH 19/26] Initialization of Storage with list of Replicas --- src/main/scala/app/Messages.scala | 5 +- src/main/scala/app/Process.scala | 2 +- src/main/scala/layers/GlobalView.scala | 77 +++++++++++++++++++------ src/main/scala/layers/PartialView.scala | 7 ++- src/main/scala/layers/Replication.scala | 17 ------ src/main/scala/layers/Storage.scala | 21 ++++--- 6 files changed, 79 insertions(+), 50 deletions(-) delete mode 100644 src/main/scala/layers/Replication.scala diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index 9cb190f..fad71ac 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -1,6 +1,7 @@ package app import akka.actor.ActorRef +import scala.collection.mutable._ //pView case class InitMessage(selfAddress: String, contactNode: String) @@ -60,8 +61,6 @@ case class GossipRequest(mid: Int) //Storage -case class InitStorage(selfAddress: String) - case class Write(dataId: String, data: String) case class Read(dataId: String) @@ -72,7 +71,7 @@ case class ForwardRead(hashedDataId: Int, appID: ActorRef) //Replication -case class InitPaxos() +case class InitReplication(replicas: TreeMap[Int, String], selfAddress: String) case class AskSeqNum() diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index 2911f4f..1c9a165 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -38,7 +38,7 @@ object Process extends App { globalView ! InitGlobView(selfAddress, contactNode) partialView ! InitMessage(selfAddress, contactNode) informationDissemination ! InitGossip(selfAddress) - storage ! InitStorage(selfAddress) + def configureRemote(): Config = { diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 825e7c7..7af355d 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -3,20 +3,21 @@ package layers import akka.actor.{Actor, ActorRef} import app._ import com.typesafe.scalalogging.Logger +import scala.collection.mutable._ class GlobalView extends Actor { val log = Logger("phase1") val log2 = Logger("phase2") - + val N_REPLICAS = 3 val hashID_2551: Int = math.abs(("akka.tcp://AkkaSystem@127.0.0.1:2551").reverse.hashCode % 1000) //474 in localhost var globalView: List[String] = List.empty var myself: String = "" - var id: Int = 0 - //var storage = scala.collection.mutable.HashMap[String, String]() - //var pending = scala.collection.mutable.Queue[] + var myHashedId: Int = 0 + var hashedProcesses = TreeMap[Int, String]() + override def receive = { @@ -24,8 +25,8 @@ class GlobalView extends Actor { myself = init.selfAddress globalView = globalView :+ myself - id = math.abs(init.selfAddress.reverse.hashCode % 1000) - println("Unique Identifier: " + id) + myHashedId = math.abs(init.selfAddress.reverse.hashCode % 1000) + println("Unique Identifier: " + myHashedId) val process = context.actorSelection(s"${init.contactNode}/user/globalView") process ! ShowGV @@ -54,13 +55,32 @@ class GlobalView extends Actor { //Since all global views are up to date, on init //Gets contact node global view and copies it to is own case reply: ReplyShowView => { + for (n <- reply.nodes.filter(!_.equals(myself))) { globalView = globalView :+ n + } + if(globalView.size >= N_REPLICAS){ + for(p <- globalView){ + val process = context.actorSelection(s"${p}/user/globalView") + process ! InitReplication + } } } + case InitReplication => { + updateHashedProcesses(globalView) + + var replicas : TreeMap[Int, String] = findReplicas() + + val process = context.actorSelection(s"${myself}/user/storage") + process ! InitReplication(replicas, myself) + + } + + + // - - - - - - - - STORAGE - - - - - - - - @@ -70,11 +90,6 @@ class GlobalView extends Actor { var hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000) log2.debug("Received write with key: " + hashedDataId) - var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() - for (n <- globalView) { - hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n) - } - if (hashedProcesses.contains(hashedDataId)) { log2.debug("HashID " + hashedDataId + " exists") @@ -102,11 +117,6 @@ class GlobalView extends Actor { print("Received Read from application") var hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000) - var hashedProcesses = scala.collection.mutable.TreeMap[Int, String]() - for (n <- globalView) { - hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n) - } - if (hashedProcesses.contains(hashedDataId)) { log2.debug("HashID " + hashedDataId + " exists") @@ -129,8 +139,39 @@ class GlobalView extends Actor { // - - - - - - - - - - - - - - - - - - - - - - - + def updateHashedProcesses(globalView: List[String]) = { + for (n <- globalView) { + hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n) + } + } + + def findReplicas() = { + + var replicas = TreeMap[Int, String]() + + var count = 0 + var it = hashedProcesses.iterator + var break = false + while(true && !break){ + var p = it.next() + + if(p._1 == myHashedId || count != 0){ + replicas.put(p._1, p._2) + count = count + 1 + } + + if(count == 3){ + break = true + } + + if(!it.hasNext){ + it = hashedProcesses.iterator + } + } + replicas + } - def findProcessForWrite(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], data: String, appID: ActorRef) = { + def findProcessForWrite(hashedDataId: Int, hashedProcesses: TreeMap[Int, String], data: String, appID: ActorRef) = { log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") var previousN = hashID_2551 @@ -176,7 +217,7 @@ class GlobalView extends Actor { } - def findProcessForRead(hashedDataId: Int, hashedProcesses: scala.collection.mutable.TreeMap[Int, String], appID: ActorRef) = { + def findProcessForRead(hashedDataId: Int, hashedProcesses: TreeMap[Int, String], appID: ActorRef) = { log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") var previousN = hashID_2551 diff --git a/src/main/scala/layers/PartialView.scala b/src/main/scala/layers/PartialView.scala index 4ade824..b9dc31d 100644 --- a/src/main/scala/layers/PartialView.scala +++ b/src/main/scala/layers/PartialView.scala @@ -7,6 +7,7 @@ import app._ import com.typesafe.scalalogging.Logger import scala.util.Random +import scala.collection.mutable._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global @@ -22,9 +23,9 @@ class PartialView extends Actor { val PRWL = 3 val aViewSize = 4 val pViewSize = 30 - var aliveProcesses = scala.collection.mutable.Map[String, Double]() - var pseudoDead = scala.collection.mutable.Map[String, Double]() - var checkAlive = scala.collection.mutable.Map[String, Double]() + var aliveProcesses = Map[String, Double]() + var pseudoDead = Map[String, Double]() + var checkAlive = Map[String, Double]() override def receive = { diff --git a/src/main/scala/layers/Replication.scala b/src/main/scala/layers/Replication.scala deleted file mode 100644 index a74270c..0000000 --- a/src/main/scala/layers/Replication.scala +++ /dev/null @@ -1,17 +0,0 @@ -/* -package layers - -import akka.actor._ - -class Replication extends Actor{ - - var stateMachine = scala.collection.mutable.Map[String,String]() - - override def receive = { - - - - } - -} -*/ diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index bacaaa2..bbd731c 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -2,22 +2,27 @@ package layers import akka.actor.Actor import app._ +import scala.collection.mutable._ class Storage extends Actor{ - val hashID_2551: Int = math.abs(("akka.tcp://AkkaSystem@127.0.0.1:2551").reverse.hashCode % 1000) //474 in localhost - - var storage = scala.collection.mutable.HashMap[String, String]() - var pending = scala.collection.mutable.Queue[String]() - //var storage = scala.collection.mutable.HashMap[String, List[Byte]]() - //var defaultData: List[Byte] = List.empty - + var storage = HashMap[String, String]() + var pending = Queue[String]() + var replicas = TreeMap[Int, String]() var myself: String = "" override def receive = { - case init: InitStorage => { + case init: InitReplication => { myself = init.selfAddress + + replicas = init.replicas + + println("My replicas are: ") + for(r <- replicas){ + println(r) + } + println("- - - - - - - - - - - -") } case write: ForwardWrite => { From 30e675db17e3bb3c04918304ec3f6b69a86bad0a Mon Sep 17 00:00:00 2001 From: aanciaes Date: Wed, 6 Dec 2017 19:01:17 +0000 Subject: [PATCH 20/26] Starting to implement State Machine --- src/main/scala/app/Process.scala | 4 ---- src/main/scala/replication/StateMachine.scala | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/replication/StateMachine.scala diff --git a/src/main/scala/app/Process.scala b/src/main/scala/app/Process.scala index 1c9a165..8a1ba38 100644 --- a/src/main/scala/app/Process.scala +++ b/src/main/scala/app/Process.scala @@ -25,10 +25,6 @@ object Process extends App { val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination") val storage = sys.actorOf(Props[Storage], "storage") - /*val proposer = sys.actorOf(Props[Proposer], "proposer") - val accepter = sys.actorOf(Props[Accepter], "accepter") - val learner = sys.actorOf(Props[Learner], "learner")*/ - var contactNode = "" if (args.length > 1) { contactNode = args(1) diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala new file mode 100644 index 0000000..6706020 --- /dev/null +++ b/src/main/scala/replication/StateMachine.scala @@ -0,0 +1,15 @@ +package replication + +import scala.collection.mutable.TreeMap + +class StateMachine (bucket : Int) { + + var counter = 0 + var stateMachine = TreeMap[Int, Operation]() + stateMachine.put(1, Operation("", 1, "")) + +} + + + +case class Operation (op : String, key : Int, data : String) \ No newline at end of file From 8bbde177bd4f802cec2335bcf3223cb1448003bd Mon Sep 17 00:00:00 2001 From: aanciaes Date: Wed, 6 Dec 2017 19:13:17 +0000 Subject: [PATCH 21/26] State Machine --- src/main/scala/replication/StateMachine.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala index 6706020..d56f7b0 100644 --- a/src/main/scala/replication/StateMachine.scala +++ b/src/main/scala/replication/StateMachine.scala @@ -6,10 +6,12 @@ class StateMachine (bucket : Int) { var counter = 0 var stateMachine = TreeMap[Int, Operation]() - stateMachine.put(1, Operation("", 1, "")) -} + def write (index: Int, key: Int, data: String) = { + stateMachine.put(index, Operation("write", key, data)) + } +} case class Operation (op : String, key : Int, data : String) \ No newline at end of file From 03d806fa80620f552d6dbe7766e81b84c410fb33 Mon Sep 17 00:00:00 2001 From: aanciaes Date: Wed, 6 Dec 2017 19:25:09 +0000 Subject: [PATCH 22/26] Initializing State Machines --- src/main/scala/layers/Storage.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index bbd731c..18a3d28 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -2,6 +2,9 @@ package layers import akka.actor.Actor import app._ +import replication.StateMachine + +import scala.collection.mutable import scala.collection.mutable._ class Storage extends Actor{ @@ -9,6 +12,7 @@ class Storage extends Actor{ var storage = HashMap[String, String]() var pending = Queue[String]() var replicas = TreeMap[Int, String]() + var buckets = TreeMap [Int, StateMachine]() var myself: String = "" override def receive = { @@ -18,8 +22,12 @@ class Storage extends Actor{ replicas = init.replicas + for((hash, addr) <- replicas){ + buckets.put(hash, new StateMachine(hash)) + } + println("My replicas are: ") - for(r <- replicas){ + for(r <- buckets){ println(r) } println("- - - - - - - - - - - -") From df7616552abbd3d7a55c020b72f7969c44750d95 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Thu, 7 Dec 2017 00:18:46 +0000 Subject: [PATCH 23/26] Write on Replicas without PAXOS. --- src/main/scala/app/Messages.scala | 7 ++++-- src/main/scala/layers/GlobalView.scala | 10 ++++---- src/main/scala/layers/Storage.scala | 25 +++++++++++++++---- src/main/scala/replication/StateMachine.scala | 14 +++++++++-- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/main/scala/app/Messages.scala b/src/main/scala/app/Messages.scala index fad71ac..244aa70 100644 --- a/src/main/scala/app/Messages.scala +++ b/src/main/scala/app/Messages.scala @@ -69,9 +69,12 @@ case class ForwardWrite(hashedDataId: Int, data: String, appID: ActorRef) case class ForwardRead(hashedDataId: Int, appID: ActorRef) + + + //Replication -case class InitReplication(replicas: TreeMap[Int, String], selfAddress: String) +case class InitReplication(replicas: TreeMap[Int, String], selfAddress: String, myselfHashed: Int) case class AskSeqNum() @@ -89,7 +92,7 @@ case class Accept_OK(seqNum: Int, value: String) case class Decided(value: String) - +case class WriteOP(opCounter: Int, hashDataId: Int, data: String, leaderHash: Int) // Heartbeat diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 7af355d..27a2c2b 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -72,10 +72,10 @@ class GlobalView extends Actor { case InitReplication => { updateHashedProcesses(globalView) - var replicas : TreeMap[Int, String] = findReplicas() + val replicas : TreeMap[Int, String] = findReplicas() val process = context.actorSelection(s"${myself}/user/storage") - process ! InitReplication(replicas, myself) + process ! InitReplication(replicas, myself, myHashedId) } @@ -87,7 +87,7 @@ class GlobalView extends Actor { case write: Write => { - var hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000) + val hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000) log2.debug("Received write with key: " + hashedDataId) if (hashedProcesses.contains(hashedDataId)) { @@ -116,7 +116,7 @@ class GlobalView extends Actor { case read: Read => { print("Received Read from application") - var hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000) + val hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000) if (hashedProcesses.contains(hashedDataId)) { log2.debug("HashID " + hashedDataId + " exists") @@ -147,7 +147,7 @@ class GlobalView extends Actor { def findReplicas() = { - var replicas = TreeMap[Int, String]() + val replicas = TreeMap[Int, String]() var count = 0 var it = hashedProcesses.iterator diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 18a3d28..78bbd89 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -4,7 +4,6 @@ import akka.actor.Actor import app._ import replication.StateMachine -import scala.collection.mutable import scala.collection.mutable._ class Storage extends Actor{ @@ -12,22 +11,23 @@ class Storage extends Actor{ var storage = HashMap[String, String]() var pending = Queue[String]() var replicas = TreeMap[Int, String]() - var buckets = TreeMap [Int, StateMachine]() + var stateMachines = TreeMap [Int, StateMachine]() var myself: String = "" + var myselfHashed: Int = 0 override def receive = { case init: InitReplication => { myself = init.selfAddress - + myselfHashed = init.myselfHashed replicas = init.replicas for((hash, addr) <- replicas){ - buckets.put(hash, new StateMachine(hash)) + stateMachines.put(hash, new StateMachine(hash, replicas)) } println("My replicas are: ") - for(r <- buckets){ + for(r <- stateMachines){ println(r) } println("- - - - - - - - - - - -") @@ -36,6 +36,13 @@ class Storage extends Actor{ case write: ForwardWrite => { println("myself hashed: " + math.abs(myself.reverse.hashCode % 1000) + " STORED ID: " + write.hashedDataId + " with the data: " + write.data.toString) + + val stateCounter = stateMachines.get(myselfHashed).get.getCounter() + for(r <- replicas){ + val process = context.actorSelection(s"${r._2}/user/storage") + process ! WriteOP(stateCounter, write.hashedDataId, write.data, myselfHashed) + } + storage.put((write.hashedDataId).toString, write.data) //Send back to Application @@ -63,5 +70,13 @@ class Storage extends Actor{ process ! ReplyStoreAction("Read", myself, "Read not found in the System!") } } + + + case writeOp: WriteOP => { + if(myselfHashed == writeOp.leaderHash){ + storage.put(writeOp.hashDataId.toString, writeOp.data) + } + stateMachines.get(myselfHashed).get.write(writeOp.opCounter, writeOp.hashDataId, writeOp.data) + } } } diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala index d56f7b0..56621ae 100644 --- a/src/main/scala/replication/StateMachine.scala +++ b/src/main/scala/replication/StateMachine.scala @@ -2,14 +2,24 @@ package replication import scala.collection.mutable.TreeMap -class StateMachine (bucket : Int) { +class StateMachine (bucket : Int, setReplicas: TreeMap[Int, String]) { var counter = 0 var stateMachine = TreeMap[Int, Operation]() - + var replicas: TreeMap[Int, String] = setReplicas def write (index: Int, key: Int, data: String) = { + stateMachine.put(index, Operation("write", key, data)) + + } + + def getCounter(): Int ={ + counter + } + + def setCounter(newCounter: Int) = { + counter = newCounter } } From 986a663257c85364cbcaf07398440569e60d6871 Mon Sep 17 00:00:00 2001 From: aanciaes Date: Thu, 7 Dec 2017 01:27:39 +0000 Subject: [PATCH 24/26] Process now write on correct state machine - New method to statically find closest match for a key --- src/main/scala/app/utils.scala | 32 +++++++++ src/main/scala/layers/GlobalView.scala | 68 ++----------------- src/main/scala/layers/Storage.scala | 10 +-- src/main/scala/replication/StateMachine.scala | 2 +- 4 files changed, 42 insertions(+), 70 deletions(-) create mode 100644 src/main/scala/app/utils.scala diff --git a/src/main/scala/app/utils.scala b/src/main/scala/app/utils.scala new file mode 100644 index 0000000..dd85aad --- /dev/null +++ b/src/main/scala/app/utils.scala @@ -0,0 +1,32 @@ +package app + +import scala.collection.mutable._ + +object FindProcess { + + def matchKeys(hashedDataId: Int, map: TreeMap[Int, _]): Int = { + + val hashID_2551: Int = math.abs(("akka.tcp://AkkaSystem@127.0.0.1:2551").reverse.hashCode % 1000) //474 in localhost + var previousN = hashID_2551 + + var count = 1 + + for ((hash, _) <- map) { + if (hash > hashedDataId) { + + if (hash == map.firstKey) { + return map.lastKey + } else { + return previousN + } + } else { + if (count == map.size) { + return hash + } + count = count + 1 + previousN = hash + } + } + return 0 + } +} \ No newline at end of file diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 27a2c2b..166a7d1 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -83,33 +83,18 @@ class GlobalView extends Actor { - // - - - - - - - - STORAGE - - - - - - - - + // - - - - - - - - STORAGE - - - - - - - - // case write: Write => { val hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000) log2.debug("Received write with key: " + hashedDataId) - if (hashedProcesses.contains(hashedDataId)) { - log2.debug("HashID " + hashedDataId + " exists") - - if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { - log2.debug("Its not me tho...") - log2.debug("Forwarding to HashID " + hashedDataId) - val process = context.actorSelection(s"${hashedProcesses.get(hashedDataId).get}/user/storage") - process ! ForwardWrite(hashedDataId, write.data, sender) - } - else { - log2.debug("And its me!!") - log2.debug("Storing HashID " + hashedDataId.toString + " with the data: " + write.data) + val processId = FindProcess.matchKeys(hashedDataId, hashedProcesses) - val process = context.actorSelection(s"${myself}/user/storage") - process ! ForwardWrite(hashedDataId, write.data, sender) - } - } - else - findProcessForWrite(hashedDataId, hashedProcesses, write.data, sender) + val process = context.actorSelection(s"${hashedProcesses.get(processId).get}/user/storage") + process ! ForwardWrite(hashedDataId, write.data, sender) } @@ -171,51 +156,6 @@ class GlobalView extends Actor { replicas } - def findProcessForWrite(hashedDataId: Int, hashedProcesses: TreeMap[Int, String], data: String, appID: ActorRef) = { - log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") - - var previousN = hashID_2551 - var count = 1 - var break = false - - for ((hash, process) <- hashedProcesses) { - - if (!break) { - log2.debug("hashProcess: " + hash) - log2.debug("process: " + process) - - if (hash > hashedDataId) { - log2.debug("Process " + hash + " is too high") - - //write 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 - if (hash == hashedProcesses.firstKey) { - log2.debug("Forward Write to: " + hashedProcesses.last._2) - val process = context.actorSelection(s"${hashedProcesses.last._2}/user/storage") - process ! ForwardWrite(hashedDataId, data, appID) - break = true - } - else { - log2.debug("Forwarding WRITE to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) - - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/storage") - process ! ForwardWrite(hashedDataId, data, appID) - break = true - } - } - - else { - if (count == hashedProcesses.size) { - val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/storage") - process ! ForwardWrite(hashedDataId, data, appID) - break = true - } - count = count + 1 - previousN = hash - } - } - } - } - def findProcessForRead(hashedDataId: Int, hashedProcesses: TreeMap[Int, String], appID: ActorRef) = { log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 78bbd89..653bbb8 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -27,14 +27,14 @@ class Storage extends Actor{ } println("My replicas are: ") - for(r <- stateMachines){ + for(r <- replicas){ println(r) } println("- - - - - - - - - - - -") } case write: ForwardWrite => { - + println ("Forward Write received") println("myself hashed: " + math.abs(myself.reverse.hashCode % 1000) + " STORED ID: " + write.hashedDataId + " with the data: " + write.data.toString) val stateCounter = stateMachines.get(myselfHashed).get.getCounter() @@ -43,8 +43,6 @@ class Storage extends Actor{ process ! WriteOP(stateCounter, write.hashedDataId, write.data, myselfHashed) } - storage.put((write.hashedDataId).toString, write.data) - //Send back to Application val process = context.actorSelection(s"${write.appID.path}") process ! ReplyStoreAction("Write", myself, write.data) @@ -73,10 +71,12 @@ class Storage extends Actor{ case writeOp: WriteOP => { + println("Write Op received") if(myselfHashed == writeOp.leaderHash){ storage.put(writeOp.hashDataId.toString, writeOp.data) } - stateMachines.get(myselfHashed).get.write(writeOp.opCounter, writeOp.hashDataId, writeOp.data) + val stateHash = FindProcess.matchKeys(writeOp.hashDataId, stateMachines) + stateMachines.get(stateHash).get.write(writeOp.opCounter, writeOp.hashDataId, writeOp.data) } } } diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala index 56621ae..3c8f791 100644 --- a/src/main/scala/replication/StateMachine.scala +++ b/src/main/scala/replication/StateMachine.scala @@ -11,7 +11,7 @@ class StateMachine (bucket : Int, setReplicas: TreeMap[Int, String]) { def write (index: Int, key: Int, data: String) = { stateMachine.put(index, Operation("write", key, data)) - + println ("Writing on state machine bucket:" + bucket + " index:" + index + " with key-> " + key, " and data -> " + data) } def getCounter(): Int ={ From 518ba763160b034ec1ba8e984f14053cfa0a7fa7 Mon Sep 17 00:00:00 2001 From: Joao Reis Date: Thu, 7 Dec 2017 14:04:51 +0000 Subject: [PATCH 25/26] Counter of Operation in each state machine done. --- src/main/scala/layers/GlobalView.scala | 63 ++----------------- src/main/scala/layers/Storage.scala | 4 +- src/main/scala/replication/StateMachine.scala | 1 + 3 files changed, 8 insertions(+), 60 deletions(-) diff --git a/src/main/scala/layers/GlobalView.scala b/src/main/scala/layers/GlobalView.scala index 166a7d1..f813a4c 100644 --- a/src/main/scala/layers/GlobalView.scala +++ b/src/main/scala/layers/GlobalView.scala @@ -103,26 +103,16 @@ class GlobalView extends Actor { print("Received Read from application") val hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000) - if (hashedProcesses.contains(hashedDataId)) { - log2.debug("HashID " + hashedDataId + " exists") - if (!hashedDataId.equals(myself.reverse.hashCode % 1000)) { - log2.debug("Its not me tho...") - findProcessForRead(hashedDataId, hashedProcesses, sender) - } - else { //ITS MEEE + val processId = FindProcess.matchKeys(hashedDataId, hashedProcesses) - val process = context.actorSelection(s"${myself}/user/storage") - process ! ForwardRead(hashedDataId, sender) - } - } - else - findProcessForRead(hashedDataId, hashedProcesses, sender) + val process = context.actorSelection(s"${hashedProcesses.get(processId).get}/user/storage") + process ! ForwardRead(hashedDataId, sender) } } - // - - - - - - - - - - - - - - - - - - - - - - - + // - - - - - - - - - - - - - - - - - - - - - - - // def updateHashedProcesses(globalView: List[String]) = { for (n <- globalView) { @@ -138,7 +128,7 @@ class GlobalView extends Actor { var it = hashedProcesses.iterator var break = false while(true && !break){ - var p = it.next() + val p = it.next() if(p._1 == myHashedId || count != 0){ replicas.put(p._1, p._2) @@ -156,47 +146,4 @@ class GlobalView extends Actor { replicas } - - def findProcessForRead(hashedDataId: Int, hashedProcesses: TreeMap[Int, String], appID: ActorRef) = { - log2.debug("Process " + hashedDataId + " does NOT EXIST in the System") - - var previousN = hashID_2551 - var count = 1 - var break = false - - for ((hash, process) <- hashedProcesses) { - - if (!break) { - log2.debug("hashProcess: " + hash) - log2.debug("processID: " + process) - if (hash > hashedDataId) { - log2.debug("Process " + hash + " is too high") - - //read 300 qdo hashedProcesses = {450, 750, 900} tem que ir po 900 - if (hash == hashedProcesses.firstKey) { - val process = context.actorSelection(s"${hashedProcesses.last._2}/user/storage") - process ! ForwardRead(hashedDataId, appID) - break = true - } - else { - log2.debug("Forwarding READ to: " + previousN + " with the following address: " + hashedProcesses.get(previousN).get) - - val process = context.actorSelection(s"${hashedProcesses.get(previousN).get}/user/storage") - process ! ForwardRead(hashedDataId, appID) - break = true - } - } - - else { - if (count == hashedProcesses.size) { - val process = context.actorSelection(s"${hashedProcesses.get(hash).get}/user/storage") - process ! ForwardRead(hashedDataId, appID) - break = true - } - count = count + 1 - previousN = hash - } - } - } - } } \ No newline at end of file diff --git a/src/main/scala/layers/Storage.scala b/src/main/scala/layers/Storage.scala index 653bbb8..b3f03d9 100644 --- a/src/main/scala/layers/Storage.scala +++ b/src/main/scala/layers/Storage.scala @@ -22,8 +22,8 @@ class Storage extends Actor{ myselfHashed = init.myselfHashed replicas = init.replicas - for((hash, addr) <- replicas){ - stateMachines.put(hash, new StateMachine(hash, replicas)) + for(r <- replicas){ + stateMachines.put(r._1, new StateMachine(r._1, replicas)) } println("My replicas are: ") diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala index 3c8f791..e6ff222 100644 --- a/src/main/scala/replication/StateMachine.scala +++ b/src/main/scala/replication/StateMachine.scala @@ -12,6 +12,7 @@ class StateMachine (bucket : Int, setReplicas: TreeMap[Int, String]) { stateMachine.put(index, Operation("write", key, data)) println ("Writing on state machine bucket:" + bucket + " index:" + index + " with key-> " + key, " and data -> " + data) + counter += 1 } def getCounter(): Int ={ From 5627748789decd0bc81b49a34622a1a642af8943 Mon Sep 17 00:00:00 2001 From: aanciaes Date: Fri, 8 Dec 2017 00:43:44 +0000 Subject: [PATCH 26/26] small bug fixes --- src/main/scala/app/utils.scala | 4 +++- src/main/scala/replication/StateMachine.scala | 11 +++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/main/scala/app/utils.scala b/src/main/scala/app/utils.scala index dd85aad..0bca698 100644 --- a/src/main/scala/app/utils.scala +++ b/src/main/scala/app/utils.scala @@ -29,4 +29,6 @@ object FindProcess { } return 0 } -} \ No newline at end of file +} + +case class Operation (op : String, key : Int, data : String) \ No newline at end of file diff --git a/src/main/scala/replication/StateMachine.scala b/src/main/scala/replication/StateMachine.scala index e6ff222..9d90260 100644 --- a/src/main/scala/replication/StateMachine.scala +++ b/src/main/scala/replication/StateMachine.scala @@ -1,5 +1,7 @@ package replication +import app.Operation + import scala.collection.mutable.TreeMap class StateMachine (bucket : Int, setReplicas: TreeMap[Int, String]) { @@ -18,11 +20,4 @@ class StateMachine (bucket : Int, setReplicas: TreeMap[Int, String]) { def getCounter(): Int ={ counter } - - def setCounter(newCounter: Int) = { - counter = newCounter - } - -} - -case class Operation (op : String, key : Int, data : String) \ No newline at end of file +} \ No newline at end of file