Skip to content

Commit

Permalink
Merge pull request #9 from fogbow/develop
Browse files Browse the repository at this point in the history
Alteração do nioWorkers para o valor 7. Melhoria de desempenho do
reverse tunnel
  • Loading branch information
giovannifs committed Mar 21, 2016
2 parents 3c11cbc + 0dd9a6d commit 74b0e88
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 26 deletions.
6 changes: 4 additions & 2 deletions reverse-tunnel.conf.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
tunnel_port=2222
tunnel_port_range=2224:2242
tunnel_host=0.0.0.0
http_port=2223
external_port_range=20000:30000
host_key_path=hostkey.ser
idle_token_timeout=86400
idle_token_timeout=86400
ports_per_ssh_server=5
check_ssh_servers_interval=120
16 changes: 12 additions & 4 deletions src/main/java/org/fogbowcloud/ssh/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,35 @@ public static void main(String[] args) throws IOException {
FileInputStream input = new FileInputStream(args[0]);
properties.load(input);

String tunnelPort = properties.getProperty("tunnel_port");
String tunnelPortRange = properties.getProperty("tunnel_port_range");
String[] tunnelPortRangeSplit = tunnelPortRange.split(":");
String tunnelHost = properties.getProperty("tunnel_host");
String httpPort = properties.getProperty("http_port");
String externalPortRange = properties.getProperty("external_port_range");
String[] externalRangeSplit = externalPortRange.split(":");
String externalHostKeyPath = properties.getProperty("host_key_path");
String idleTokenTimeoutStr = properties.getProperty("idle_token_timeout");
String portsPerShhServer = properties.getProperty("ports_per_ssh_server");
Long idleTokenTimeout = null;
if (idleTokenTimeoutStr != null) {
idleTokenTimeout = Long.parseLong(idleTokenTimeoutStr) * 1000;
}

String checkSSHServersIntervalStr = properties.getProperty("check_ssh_servers_interval");
int checkSSHServersInterval = Integer.parseInt(checkSSHServersIntervalStr);

TunnelHttpServer tunnelHttpServer = new TunnelHttpServer(
Integer.parseInt(httpPort),
tunnelHost,
Integer.parseInt(tunnelPort),
Integer.parseInt(tunnelPortRangeSplit[0]),
Integer.parseInt(tunnelPortRangeSplit[1]),
Integer.parseInt(externalRangeSplit[0]),
Integer.parseInt(externalRangeSplit[1]),
idleTokenTimeout,
externalHostKeyPath);
externalHostKeyPath,
Integer.parseInt(portsPerShhServer), checkSSHServersInterval);
tunnelHttpServer.start();

}

}
}
224 changes: 208 additions & 16 deletions src/main/java/org/fogbowcloud/ssh/TunnelHttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,91 @@
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.apache.sshd.common.util.Base64;
import org.json.JSONObject;

import fi.iki.elonen.NanoHTTPD;
import fi.iki.elonen.NanoHTTPD.Response.Status;

public class TunnelHttpServer extends NanoHTTPD {

private TunnelServer tunneling;

//private TunnelServer tunneling;
private static final int SSH_SERVER_VERIFICATION_TIME = 300;
private static final Logger LOGGER = Logger.getLogger(TunnelHttpServer.class);

private Map<Integer, TunnelServer> tunnelServers = new ConcurrentHashMap<Integer, TunnelServer>();

private String hostKeyPath;
private KeyPair kp;

private int lowerPort;
private int higherPort;
private String sshTunnelHost;
private int lowerSshTunnelPort;
private int higherSshTunnelPort;
private Long idleTokenTimeout;
private int checkSSHServersInterval;

private int portsPerShhServer;

private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

public TunnelHttpServer(int httpPort, String sshTunnelHost, int sshTunnelPort,
int lowerPort, int higherPort, Long idleTokenTimeout, String hostKeyPath) {
public TunnelHttpServer(int httpPort, String sshTunnelHost, int lowerSshTunnelPort, int higherSshTunnelPort,
int lowerPort, int higherPort, Long idleTokenTimeout, String hostKeyPath, int portsPerShhServer, int checkSSHServersInterval) {
super(httpPort);
this.hostKeyPath = hostKeyPath;

this.lowerPort = lowerPort;
this.higherPort = higherPort;
this.sshTunnelHost = sshTunnelHost;
this.lowerSshTunnelPort = lowerSshTunnelPort;
this.higherSshTunnelPort = higherSshTunnelPort;
this.idleTokenTimeout = idleTokenTimeout;
this.portsPerShhServer = portsPerShhServer;
this.checkSSHServersInterval = checkSSHServersInterval == 0 ? SSH_SERVER_VERIFICATION_TIME : checkSSHServersInterval;

try {
this.tunneling = new TunnelServer(sshTunnelHost, sshTunnelPort,
lowerPort, higherPort, idleTokenTimeout, hostKeyPath);
this.tunneling.start();

this.createNewTunnelServer();

executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {

List<TunnelServer> tunnelsToRemove = new ArrayList<TunnelServer>();

for(Entry<Integer, TunnelServer> entry : tunnelServers.entrySet()){
if(entry.getValue().getActiveTokensNumber() <= 0){
tunnelsToRemove.add(entry.getValue());
}
}

for(TunnelServer tunneling : tunnelsToRemove){
try {
removeTunnelServer(tunneling);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}

}
}, this.checkSSHServersInterval, this.checkSSHServersInterval, TimeUnit.SECONDS);

} catch (IOException e) {
e.printStackTrace();
LOGGER.error(e.getMessage(), e);
}
}

Expand All @@ -50,24 +111,66 @@ public Response serve(IHTTPSession session) {

if (method.equals(Method.GET)) {
if (splitUri.length == 4 && splitUri[3].equals("all")) {
Map<String, Integer> ports = this.tunneling.getPortByPrefix(tokenId);
Map<String, Integer> ports = new HashMap<String, Integer>();
for(TunnelServer tunneling : tunnelServers.values()){
ports.putAll(tunneling.getPortByPrefix(tokenId));
}
return new NanoHTTPD.Response(new JSONObject(ports).toString());
} else {
Integer port = this.tunneling.getPort(tokenId);
Integer port = this.getPortByTokenId(tokenId);
if (port == null) {
return new NanoHTTPD.Response(Status.NOT_FOUND,
MIME_PLAINTEXT, "404 Port Not Found");
}
return new NanoHTTPD.Response(port.toString());
}
}

if (method.equals(Method.POST)) {
Integer port = this.tunneling.createPort(tokenId);
if (port == null) {
return new NanoHTTPD.Response(Status.INTERNAL_ERROR, MIME_PLAINTEXT, "");

//TODO verify if the request can request new port. (ports quota per instance.)
Integer instancePort = null ;
Integer sshServerPort = null ;

if(tunnelServers.values() != null && !tunnelServers.values().isEmpty()){
for(TunnelServer tunneling : tunnelServers.values()){
instancePort = tunneling.createPort(tokenId);
if(instancePort != null){
sshServerPort = tunneling.getSshTunnelPort();
break;
}
}
}

if (instancePort == null) {
try {
TunnelServer tunneling = this.createNewTunnelServer();
if(tunneling != null){
instancePort = tunneling.createPort(tokenId);
sshServerPort = tunneling.getSshTunnelPort();
}
} catch (IOException e) {
return new NanoHTTPD.Response(Status.INTERNAL_ERROR, MIME_PLAINTEXT, "Error while creating shh server to handle new port.");
}
}

if (instancePort == null) {
return new NanoHTTPD.Response(Status.FORBIDDEN, MIME_PLAINTEXT, "Token [" + tokenId + "] didn't get any port. All ssh servers are busy.");
}
//Return format: instancePort:sshTunnelServerPort (int:int)
return new NanoHTTPD.Response(instancePort.toString()+":"+sshServerPort.toString());
}

if (method.equals(Method.DELETE)) {

if (splitUri.length == 4) {
String portNumber = splitUri[3];
if(Utils.isNumber(portNumber)){
if(this.releaseInstancePort(tokenId, Integer.parseInt(portNumber))){
return new NanoHTTPD.Response(Status.OK, MIME_PLAINTEXT, "OK");
}
}
}
return new NanoHTTPD.Response(port.toString());
return new NanoHTTPD.Response(Status.METHOD_NOT_ALLOWED, MIME_PLAINTEXT, "Token can not delete this port");
}

return new NanoHTTPD.Response(Status.METHOD_NOT_ALLOWED, MIME_PLAINTEXT, "");
Expand Down Expand Up @@ -105,5 +208,94 @@ public Response serve(IHTTPSession session) {

return new NanoHTTPD.Response(Status.METHOD_NOT_ALLOWED, MIME_PLAINTEXT, "");
}

private TunnelServer createNewTunnelServer() throws IOException{

//Setting available ports to this tunnel server
int initialPort = 0;
int endPort = 0;
int sshTunnelPort = 0;

Set<Integer> usedInitialPorts = new HashSet<Integer>();
for (TunnelServer tunnelServer : tunnelServers.values()) {
usedInitialPorts.add(new Integer(tunnelServer.getLowerPort()));
}

}
for(int port = lowerPort; port < higherPort; port+=portsPerShhServer){
if(!usedInitialPorts.contains(new Integer(port))){
initialPort = port;
break;
}
}

if(initialPort == 0){
return null;
}

endPort = initialPort+(portsPerShhServer-1);
if(endPort > higherPort){
endPort = higherPort;
}

//Setting the port that this tunnel Server listening to manage connections requests.
for(int port = lowerSshTunnelPort ; port <= higherSshTunnelPort ; port++){
if(!tunnelServers.containsKey(new Integer(port))){
sshTunnelPort = port;
break;
}
}

if(sshTunnelPort == 0){
return null;
}

TunnelServer tunneling = new TunnelServer(sshTunnelHost, sshTunnelPort,
initialPort, endPort, idleTokenTimeout, hostKeyPath);

tunnelServers.put(new Integer(sshTunnelPort), tunneling);
tunneling.start();

return tunneling;
}

private Integer getPortByTokenId(String tokenId){
for(TunnelServer tunneling : tunnelServers.values()){
if(tunneling.getPort(tokenId) != null){
return tunneling.getPort(tokenId);
}
}
return null;
}

//TODO: Create new method to validate if the requester have available quota to request new port.



private boolean releaseInstancePort(String tokenId, Integer port){
for(TunnelServer tunneling : tunnelServers.values()){

Integer actualPort = tunneling.getAllPorts().get(tokenId);

if( actualPort != null && (actualPort.compareTo(port)== 0) ){
tunneling.releasePort(port);
if(tunneling.getActiveTokensNumber() == 0){
try {
this.removeTunnelServer(tunneling);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
}
return false;
}

private void removeTunnelServer(TunnelServer tunneling) throws InterruptedException{
if(tunneling != null){
tunneling.stop();
LOGGER.warn("Removing ssh server with port: "+tunneling.getSshTunnelPort());
tunnelServers.remove(tunneling.getSshTunnelPort());
}
}
}
Loading

0 comments on commit 74b0e88

Please sign in to comment.