Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K means clustering #21

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the algorithm name to be a real name, liek KClusteringSchedulingAlgorithmExample.java

* Copyright 2012-2013 University Of Southern California
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.workflowsim.examples.scheduling;

import java.io.File;
import java.util.Calendar;
import java.util.List;
import org.cloudbus.cloudsim.Log;
import org.cloudbus.cloudsim.core.CloudSim;
import org.workflowsim.CondorVM;
import org.workflowsim.WorkflowDatacenter;
import org.workflowsim.Job;
import org.workflowsim.WorkflowEngine;
import org.workflowsim.WorkflowPlanner;
import org.workflowsim.examples.planning.DHEFTPlanningAlgorithmExample1;
import org.workflowsim.utils.ClusteringParameters;
import org.workflowsim.utils.OverheadParameters;
import org.workflowsim.utils.Parameters;
import org.workflowsim.utils.ReplicaCatalog;

/**
* This Data Aware Scheduling Algorithm
*
* @author Weiwei Chen
* @since WorkflowSim Toolkit 1.1
* @date Nov 9, 2013
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the author to be you and your email.

public class DataAwareSchedulingAlgorithmExample2 extends DHEFTPlanningAlgorithmExample1 {

////////////////////////// STATIC METHODS ///////////////////////
/**
* Creates main() to run this example This example has only one datacenter
* and one storage
*/
public static void main(String[] args) {


try {
// First step: Initialize the WorkflowSim package.

/**
* However, the exact number of vms may not necessarily be vmNum If
* the data center or the host doesn't have sufficient resources the
* exact vmNum would be smaller than that. Take care.
*/
int vmNum = 5;//number of vms;
/**
* Should change this based on real physical path
*/
String daxPath = "/Users/chenweiwei/Work/WorkflowSim-1.0/config/dax/Montage_100.xml";

File daxFile = new File(daxPath);
if (!daxFile.exists()) {
Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!");
return;
}

/**
* Since we are using HEFT planning algorithm, the scheduling
* algorithm should be static such that the scheduler would not
* override the result of the planner
*/
Parameters.SchedulingAlgorithm sch_method = Parameters.SchedulingAlgorithm.DATA;
Parameters.PlanningAlgorithm pln_method = Parameters.PlanningAlgorithm.INVALID;
ReplicaCatalog.FileSystem file_system = ReplicaCatalog.FileSystem.RANDOM;

/**
* No overheads
*/
OverheadParameters op = new OverheadParameters(0, null, null, null, null, 0);;

/**
* No Clustering
*/
ClusteringParameters.ClusteringMethod method = ClusteringParameters.ClusteringMethod.NONE;
ClusteringParameters cp = new ClusteringParameters(0, 0, method, null);

/**
* Initialize static parameters
*/
Parameters.init(vmNum, daxPath, null,
null, op, cp, sch_method, pln_method,
null, 0);
ReplicaCatalog.init(file_system);

// before creating any entities.
int num_user = 1; // number of grid users
Calendar calendar = Calendar.getInstance();
boolean trace_flag = false; // mean trace events

// Initialize the CloudSim library
CloudSim.init(num_user, calendar, trace_flag);

WorkflowDatacenter datacenter0 = createDatacenter("Datacenter_0");

/**
* Create a WorkflowPlanner with one schedulers.
*/
WorkflowPlanner wfPlanner = new WorkflowPlanner("planner_0", 1);
/**
* Create a WorkflowEngine.
*/
WorkflowEngine wfEngine = wfPlanner.getWorkflowEngine();
/**
* Create a list of VMs.The userId of a vm is basically the id of
* the scheduler that controls this vm.
*/
List<CondorVM> vmlist0 = createVM(wfEngine.getSchedulerId(0), Parameters.getVmNum());

/**
* Submits this list of vms to this WorkflowEngine.
*/
wfEngine.submitVmList(vmlist0, 0);

/**
* Binds the data centers with the scheduler.
*/
wfEngine.bindSchedulerDatacenter(datacenter0.getId(), 0);

CloudSim.startSimulation();

List<Job> outputList0 = wfEngine.getJobsReceivedList();

CloudSim.stopSimulation();

printJobList(outputList0);

} catch (Exception e) {
Log.printLine("The simulation has been terminated due to an unexpected error");
}
}
}
166 changes: 107 additions & 59 deletions sources/org/workflowsim/WorkflowDatacenter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.cloudbus.cloudsim.Cloudlet;
import org.cloudbus.cloudsim.CloudletScheduler;
import org.cloudbus.cloudsim.Consts;
Expand Down Expand Up @@ -57,6 +58,16 @@ public WorkflowDatacenter(String name,

@Override
protected void processOtherEvent(SimEvent ev) {
switch (ev.getTag()) {
case WorkflowSimTags.FILE_STAGE_OUT:
FileStageOutMessage message = (FileStageOutMessage) ev.getData();
ReplicaCatalog.addStorageList(message.getFileName(),
Integer.toString(message.getDestinationVm()));
break;
default:
break;
}

}

/**
Expand Down Expand Up @@ -110,12 +121,12 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
int userId = cl.getUserId();
int vmId = cl.getVmId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
CondorVM vm = (CondorVM)host.getVm(vmId, userId);
CondorVM vm = (CondorVM) host.getVm(vmId, userId);

switch (Parameters.getCostModel()) {
case DATACENTER:
// process this Cloudlet to this CloudResource
cl.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(),
cl.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(),
getCharacteristics().getCostPerBw());
break;
case VM:
Expand Down Expand Up @@ -222,7 +233,7 @@ private void stageInFile2FileSystem(Cloudlet cl) {
* name)
*/
case LOCAL:

case RANDOM:
ReplicaCatalog.addStorageList(file.getName(), this.getName());
/**
* Is it not really needed currently but it is left for
Expand Down Expand Up @@ -287,11 +298,14 @@ protected double processDataStageIn(List<File> requiredFiles, Cloudlet cl) throw
File file = iter.next();
//The input file is not an output File
if (isRealInputFile(requiredFiles, file)) {
double maxBwth = 0.0;
List siteList = ReplicaCatalog.getStorageList(file.getName());
if (siteList.isEmpty()) {
throw new Exception(file.getName() + " does not exist");
}
int vmId = cl.getVmId();
int userId = cl.getUserId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
Vm vm = host.getVm(vmId, userId);
switch (ReplicaCatalog.getFileSystem()) {
case SHARED:
//stage-in job
Expand All @@ -311,53 +325,12 @@ protected double processDataStageIn(List<File> requiredFiles, Cloudlet cl) throw
}
break;
case LOCAL:
time += calculateDataTransferDelay(file, userId, vmId, vm);
ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may merge remove 328-330 since it's the same as RANDOM?


int vmId = cl.getVmId();
int userId = cl.getUserId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
Vm vm = host.getVm(vmId, userId);

boolean requiredFileStagein = true;

for (Iterator it = siteList.iterator(); it.hasNext();) {
//site is where one replica of this data is located at
String site = (String) it.next();
if (site.equals(this.getName())) {
continue;
}
/**
* This file is already in the local vm and thus it
* is no need to transfer
*/
if (site.equals(Integer.toString(vmId))) {
requiredFileStagein = false;
break;
}
double bwth;
if (site.equals(Parameters.SOURCE)) {
//transfers from the source to the VM is limited to the VM bw only
bwth = vm.getBw();
//bwth = dcStorage.getBaseBandwidth();
} else {
//transfers between two VMs is limited to both VMs
bwth = Math.min(vm.getBw(), getVmAllocationPolicy().getHost(Integer.parseInt(site), userId).getVm(Integer.parseInt(site), userId).getBw());
//bwth = dcStorage.getBandwidth(Integer.parseInt(site), vmId);
}
if (bwth > maxBwth) {
maxBwth = bwth;
}
}
if (requiredFileStagein && maxBwth > 0.0) {
time += file.getSize() / Consts.MILLION * 8 / maxBwth;
}

/**
* For the case when storage is too small it is not
* handled here
*/
//We should add but since CondorVm has a small capability it often fails
//We currently don't use this storage to do anything meaningful. It is left for future.
//condorVm.addLocalFile(file);
case RANDOM:
time += calculateDataTransferDelay(file, userId, vmId, vm);
ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
break;
}
Expand Down Expand Up @@ -419,14 +392,55 @@ protected void checkCloudletCompletion() {
}
}
}

private double calculateDataTransferDelay(File file, int userId, int vmId, Vm vm) {
List siteList = ReplicaCatalog.getStorageList(file.getName());
boolean requiredFileStagein = true;
double time = 0.0;
double maxBwth = 0.0;
for (Iterator it = siteList.iterator(); it.hasNext();) {
//site is where one replica of this data is located at
String site = (String) it.next();
if (site.equals(this.getName())) {
continue;
}
/**
* This file is already in the local vm and thus it is no need to
* transfer
*/
if (site.equals(Integer.toString(vmId))) {
requiredFileStagein = false;
break;
}
double bwth;
if (site.equals(Parameters.SOURCE)) {
//transfers from the source to the VM is limited to the VM bw only
bwth = vm.getBw();
//bwth = dcStorage.getBaseBandwidth();
} else {
//transfers between two VMs is limited to both VMs
bwth = Math.min(vm.getBw(),
getVmAllocationPolicy().getHost(Integer.parseInt(site), userId)
.getVm(Integer.parseInt(site), userId).getBw());
//bwth = dcStorage.getBandwidth(Integer.parseInt(site), vmId);
}
if (bwth > maxBwth) {
maxBwth = bwth;
}
}
if (requiredFileStagein && maxBwth > 0.0) {
time = file.getSize() / (double) Consts.MILLION * 8 / maxBwth;
}
return time;
}

/*
* Register a file to the storage if it is an output file
* @param requiredFiles, all files to be stage-in
* @param cl, the job to be processed
* @pre $none
* @post $none
*/

private void register(Cloudlet cl) {
Task tl = (Task) cl;
List fList = tl.getFileList();
Expand All @@ -435,23 +449,57 @@ private void register(Cloudlet cl) {
if (file.getType() == FileType.OUTPUT.value)//output file
{

int vmId = cl.getVmId();
int userId = cl.getUserId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
CondorVM vm = (CondorVM) host.getVm(vmId, userId);
switch (ReplicaCatalog.getFileSystem()) {
case SHARED:
ReplicaCatalog.addStorageList(file.getName(), this.getName());
break;
case LOCAL:
int vmId = cl.getVmId();
int userId = cl.getUserId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
/**
* Left here for future work
*/
CondorVM vm = (CondorVM) host.getVm(vmId, userId);

ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
break;
case RANDOM:
ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
Random random = new Random(System.currentTimeMillis());
double factor = 0.1;
int vm2copy = (int) ((double) Parameters.getVmNum() * factor);
for (int i = 0; i < vm2copy; i++) {
int destination = (int) (random.nextDouble() * (double) Parameters.getVmNum());
FileStageOutMessage message = new FileStageOutMessage(destination, vmId, file.getName());
double delay = calculateDataTransferDelay(file, userId, vmId, vm);
send(this.getId(), delay, WorkflowSimTags.FILE_STAGE_OUT, message);
}
break;
}
}
}
}

private class FileStageOutMessage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move it out to be a public class. It might be useful in other classes


private int dest;
private int src;
private String name;

public FileStageOutMessage(
int dest, int src, String name) {
this.dest = dest;
this.src = src;
this.name = name;
}

public int getDestinationVm() {
return this.dest;
}

public int getSourceVm() {
return this.src;
}

public String getFileName() {
return this.name;
}
}
}
Loading