Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #37 from aanciaes/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
aanciaes authored Dec 8, 2017
2 parents 40479e0 + 17b6cbb commit 460ca8b
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 50 deletions.
5 changes: 3 additions & 2 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
</encoder>
</appender>

<logger name="scala.slick" level="DEBUG"/>
<logger name="phase1" level="ERROR"/>
<logger name="phase2" level="DEBUG"/>

<root level="debug">
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
31 changes: 30 additions & 1 deletion src/main/scala/app/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@ import com.typesafe.scalalogging.Logger

object Application extends App {

val log = Logger("scala.slick")
val log = Logger("phase1")
val defaultProcess = "akka.tcp://[email protected]:2551"

val config = ConfigFactory.load.getConfig("ApplicationConfig")
val sys = ActorSystem("akkaSystem", config)
val appActor = sys.actorOf(Props[appActor], "appActor")

while (true) {

val line = scala.io.StdIn.readLine()
var words: Array[String] = line.split("\\s")


words(0) match {
case "gv" if (words.length == 2) => showGV(words(1))
case "pv" if (words.length == 2) => showPV(words(1))
case "ms" if (words.length == 2) => messagesStats(words(1))
case "write" if(words.length == 3) => write(words(1), words(2))
case "read" if(words.length == 2) => read(words(1))
//case "msall" if (words.length == 2) => messagesStatsAll()
case "clear" => {
for (i <- 1 to 20)
Expand All @@ -42,6 +47,13 @@ object Application extends App {
appActor ! MessagesStats(process)
}

def write(dataId: String, data: String) ={
appActor ! Write(dataId, data)
}

def read(dataId: String) ={
appActor ! Read(dataId)
}
//def messagesStatsAll() = {

//}
Expand All @@ -60,6 +72,16 @@ object Application extends App {
process ! ShowPV
}

case Write(dataId, data) => {
val process = sys.actorSelection(s"${defaultProcess}/user/globalView")
process ! Write(dataId, data)
}

case Read(dataId) => {
val process = sys.actorSelection(s"${defaultProcess}/user/globalView")
process ! Read(dataId)
}

case MessagesStats(x) => {
val process = sys.actorSelection(s"${x}/user/informationDissemination")
process ! MessagesStats
Expand All @@ -73,6 +95,12 @@ object Application extends App {
println ("-------------------------------------------------------------")
}

case replyStore: ReplyStoreAction => {
println ("-------------------------------------------------------------")
println (s"${replyStore.replyType} from ${replyStore.myself} with the DATA: ${replyStore.data}")
println ("-------------------------------------------------------------")
}

case stats : ReplyMessagesStats => {
println ("-------------------------------------------------------------")
println(s"Messages from ${sender.path.address.toString}")
Expand All @@ -93,6 +121,7 @@ object Application extends App {
println ()
println ("-------------------------------------------------------------")
}

}
}
}
64 changes: 63 additions & 1 deletion src/main/scala/app/Messages.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package app

import akka.actor.ActorRef
import scala.collection.mutable._

//pView
case class InitMessage(selfAddress: String, contactNode: String)

Expand All @@ -26,10 +29,13 @@ case class NotifyGlobalView(address: String)
case class ShowGV(address: String)



//Other

case class ReplyShowView(replyType: String, myself: String, nodes: List[String])

case class ReplyStoreAction(replyType: String, myself: String, data: String)


// Information Dissemination

Expand All @@ -52,11 +58,61 @@ case class AntiEntropy(knownMessages: List[Int])
case class GossipRequest(mid: Int)



//Storage

case class Write(dataId: String, data: String)

case class Read(dataId: String)

case class ForwardWrite(hashedDataId: Int, data: String, appID: ActorRef)

case class ForwardRead(hashedDataId: Int, appID: ActorRef)




//Replication

case class InitReplication(replicas: TreeMap[Int, String], selfAddress: String, myselfHashed: Int)

case class AskSeqNum()

case class ReplySeqNum(seqNum: Int)

case class Propose(value: String)

case class Prepare(seqNum: Int, value: String)

case class Prepare_OK(seqNum: Int, value: String)

case class Accept(seqNum: Int, value: String)

case class Accept_OK(seqNum: Int, value: String)

case class Decided(value: String)

case class WriteOP(opCounter: Int, hashDataId: Int, data: String, leaderHash: Int)


// Heartbeat

case class Heartbeat()

// Verify PseudoDead processes
case class IsAlive(p: String)

case class Check(from: String)

case class ReplyIsAlive(from: String)

case class AliveMessage(p: String)




//Application

case class MessagesStats(address: String)

case class ReplyMessagesStats(
Expand All @@ -74,4 +130,10 @@ case class ReplyMessagesStats(

antiEntropyReceived: Int,
antiEntropySent: Int
)
)






5 changes: 4 additions & 1 deletion src/main/scala/app/Process.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package app
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import com.typesafe.scalalogging.Logger
import layers.{GlobalView, InformationDissemination, PartialView}
import layers.{GlobalView, InformationDissemination, PartialView, Storage}
//import replication._


object Process extends App {
Expand All @@ -22,6 +23,7 @@ object Process extends App {
val globalView = sys.actorOf(Props[GlobalView], "globalView")
val partialView = sys.actorOf(Props[PartialView], "partialView")
val informationDissemination = sys.actorOf(Props[InformationDissemination], "informationDissemination")
val storage = sys.actorOf(Props[Storage], "storage")

var contactNode = ""
if (args.length > 1) {
Expand All @@ -33,6 +35,7 @@ object Process extends App {
partialView ! InitMessage(selfAddress, contactNode)
informationDissemination ! InitGossip(selfAddress)


def configureRemote(): Config = {

ConfigFactory.load.getConfig("Process").withValue("akka.remote.netty.tcp.port",
Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/app/utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package app

import scala.collection.mutable._

object FindProcess {

def matchKeys(hashedDataId: Int, map: TreeMap[Int, _]): Int = {

val hashID_2551: Int = math.abs(("akka.tcp://[email protected]:2551").reverse.hashCode % 1000) //474 in localhost
var previousN = hashID_2551

var count = 1

for ((hash, _) <- map) {
if (hash > hashedDataId) {

if (hash == map.firstKey) {
return map.lastKey
} else {
return previousN
}
} else {
if (count == map.size) {
return hash
}
count = count + 1
previousN = hash
}
}
return 0
}
}

case class Operation (op : String, key : Int, data : String)
108 changes: 103 additions & 5 deletions src/main/scala/layers/GlobalView.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package layers

import akka.actor.Actor
import akka.actor.{Actor, ActorRef}
import app._
import com.typesafe.scalalogging.Logger
import scala.collection.mutable._


class GlobalView extends Actor {

val log = Logger("scala.slick")
val log = Logger("phase1")
val log2 = Logger("phase2")
val N_REPLICAS = 3
val hashID_2551: Int = math.abs(("akka.tcp://[email protected]:2551").reverse.hashCode % 1000) //474 in localhost

var globalView: List[String] = List.empty
var myself: String = ""
var myHashedId: Int = 0
var hashedProcesses = TreeMap[Int, String]()


override def receive = {

case init: InitGlobView => {
myself = init.selfAddress
globalView = globalView :+ myself

myHashedId = math.abs(init.selfAddress.reverse.hashCode % 1000)
println("Unique Identifier: " + myHashedId)

val process = context.actorSelection(s"${init.contactNode}/user/globalView")
process ! ShowGV
}

case message: BroadcastMessage => {
log.debug("Global view receive Broacast Message from: " + sender.path.address.toString)
//log.debug("Global view receive Broacast Message from: " + sender.path.address.toString)
message.messageType match {
case "add" => {
if (!message.node.equals(myself))
log.debug ("adding: " + message.node + " to global view")
//log.debug("adding: " + message.node + " to global view")
globalView = globalView :+ message.node
}
case "del" => {
Expand All @@ -44,8 +55,95 @@ class GlobalView extends Actor {
//Since all global views are up to date, on init
//Gets contact node global view and copies it to is own
case reply: ReplyShowView => {
for (n <- reply.nodes.filter(!_.equals(myself)))

for (n <- reply.nodes.filter(!_.equals(myself))) {
globalView = globalView :+ n
}

if(globalView.size >= N_REPLICAS){
for(p <- globalView){
val process = context.actorSelection(s"${p}/user/globalView")
process ! InitReplication
}
}
}


case InitReplication => {
updateHashedProcesses(globalView)

val replicas : TreeMap[Int, String] = findReplicas()

val process = context.actorSelection(s"${myself}/user/storage")
process ! InitReplication(replicas, myself, myHashedId)

}





// - - - - - - - - STORAGE - - - - - - - - //

case write: Write => {

val hashedDataId = math.abs(write.dataId.reverse.hashCode % 1000)
log2.debug("Received write with key: " + hashedDataId)

val processId = FindProcess.matchKeys(hashedDataId, hashedProcesses)


val process = context.actorSelection(s"${hashedProcesses.get(processId).get}/user/storage")
process ! ForwardWrite(hashedDataId, write.data, sender)
}


case read: Read => {

print("Received Read from application")
val hashedDataId = math.abs(read.dataId.reverse.hashCode % 1000)

val processId = FindProcess.matchKeys(hashedDataId, hashedProcesses)

val process = context.actorSelection(s"${hashedProcesses.get(processId).get}/user/storage")
process ! ForwardRead(hashedDataId, sender)
}

}


// - - - - - - - - - - - - - - - - - - - - - - - //

def updateHashedProcesses(globalView: List[String]) = {
for (n <- globalView) {
hashedProcesses.put(math.abs((n.reverse.hashCode % 1000)), n)
}
}

def findReplicas() = {

val replicas = TreeMap[Int, String]()

var count = 0
var it = hashedProcesses.iterator
var break = false
while(true && !break){
val p = it.next()

if(p._1 == myHashedId || count != 0){
replicas.put(p._1, p._2)
count = count + 1
}

if(count == 3){
break = true
}

if(!it.hasNext){
it = hashedProcesses.iterator
}
}
replicas
}

}
Loading

0 comments on commit 460ca8b

Please sign in to comment.