Skip to content

Commit

Permalink
#52 tcp sockets finalized
Browse files Browse the repository at this point in the history
  • Loading branch information
akovari committed May 8, 2021
1 parent 19916e3 commit 62d8d60
Show file tree
Hide file tree
Showing 24 changed files with 196 additions and 122 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ language: generic
env:
- JAVA_HOME=$HOME/jdk PATH=$JAVA_HOME/bin:$HOME/bin:$PATH GRAALVM_VERSION="21.0.0" GRAALVM_JAVA_VERSION="11"
before_install:
- curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
- curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/microsoft.list
- sudo apt-get update
- sudo apt-get install -y powershell
- travis_retry wget --no-verbose -O $HOME/travis-wait-enhanced.tar.gz https://github.com/crazy-max/travis-wait-enhanced/releases/download/v1.2.0/travis-wait-enhanced_1.2.0_linux_x86_64.tar.gz
- mkdir -p $HOME/bin
- tar -zxf $HOME/travis-wait-enhanced.tar.gz -C $HOME/bin
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/echo-server/server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncore

class EchoHandler(asyncore.dispatcher_with_send):

class EchoHandler(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(8192)
if data:
self.send(data)


class EchoServer(asyncore.dispatcher):

def __init__(self, host, port):
Expand All @@ -20,5 +21,6 @@ def handle_accepted(self, sock, addr):
print('Incoming connection from %s' % repr(addr))
handler = EchoHandler(sock)


server = EchoServer('localhost', 5555)
asyncore.loop()
6 changes: 3 additions & 3 deletions benchmarks/echo-server/server.yona
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
with socket\Server::channel "127.0.0.1" 5555 as channel
with socket\tcp\Server::channel (:tcp, "127.0.0.1", 5555) as channel
(\ ->
with daemon socket\Server::accept channel as connection
socket\Connection::read_line connection |> socket\Connection::write connection
with daemon socket\tcp\Server::accept channel as connection
socket\tcp\Connection::read_line connection |> socket\tcp\Connection::write connection
end) |> infi
end
8 changes: 4 additions & 4 deletions examples/sockets/client.yona
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ let

keep_reading = \b -> if b == (ord 'o') then false else true # end on 'o'
in
with socket\Client::connect addr port as connection
with socket\tcp\Client::connect addr port as connection
do
_prog_name -| msg -| _ = System::args
socket\Connection::read_line connection
socket\tcp\Connection::read_line connection
IO::println "<- {msg}"
socket\Connection::write connection msg
socket\tcp\Connection::write connection msg
IO::print "-> "
socket\Connection::read_until connection keep_reading |> Seq::decode |> IO::println
socket\tcp\Connection::read_until connection keep_reading |> Seq::decode |> IO::println
end
end
9 changes: 4 additions & 5 deletions examples/sockets/server.yona
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ let
addr = "127.0.0.1"
port = 5555

accept = \channel -> do
with daemon socket\Server::accept channel as connection
accept = \channel ->
with daemon socket\tcp\Server::accept channel as connection
do
IO::println "connection accepted"
socket\Connection::write connection "welcome: "
socket\tcp\Connection::write connection "welcome: "
IO::println "-> welcome: "
IO::print "<- "
request = socket\Connection::read_line connection |> Seq::decode
Expand All @@ -17,10 +17,9 @@ let
IO::println "-> {request}"
end
end
end
in

with socket\Server::channel addr port as channel
with socket\tcp\Server::channel (:tcp, addr port) as channel
do
IO::println "listening on {addr}:{port}"
infi <| \-> accept channel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module socket\Connection exports read_line as
module socket\tcp\Connection exports read_line as
# read from socket, until LF (ord 10)
read_line connection = read_until connection (\b -> b != 10b)
end
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import yona.runtime.ContextManager;
import yona.runtime.NativeObject;
import yona.runtime.NativeObjectContextManager;
import yona.runtime.network.YonaConnection;
import yona.runtime.network.TCPConnection;

final class ConnectionContextManager extends NativeObjectContextManager<YonaConnection> {
public ConnectionContextManager(YonaConnection yonaConnection, Context context) {
super("connection", context.lookupGlobalFunction("socket\\Connection", "run"), yonaConnection);
final class ConnectionContextManager extends NativeObjectContextManager<TCPConnection> {
public ConnectionContextManager(TCPConnection TCPConnection, Context context) {
super("connection", context.lookupGlobalFunction("socket\\Connection", "run"), TCPConnection);
}

public static ConnectionContextManager adapt(ContextManager<?> contextManager, Context context, Node node) {
return new ConnectionContextManager(((NativeObject<YonaConnection>) contextManager.getData(node)).getValue(), context);
return new ConnectionContextManager(((NativeObject<TCPConnection>) contextManager.getData(node)).getValue(), context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import yona.runtime.Seq;
import yona.runtime.async.Promise;
import yona.runtime.exceptions.BadArgException;
import yona.runtime.network.YonaClientChannel;
import yona.runtime.network.YonaConnection;
import yona.runtime.network.TCPClientChannel;
import yona.runtime.network.TCPConnection;
import yona.runtime.stdlib.Builtins;
import yona.runtime.stdlib.ExportedFunction;

Expand All @@ -23,7 +23,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Client")
@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Client")
public final class SocketClientBuiltinModule implements BuiltinModule {
@NodeInfo(shortName = "connect")
abstract static class ConnectBuiltin extends BuiltinNode {
Expand All @@ -37,9 +37,9 @@ public Object connect(Seq hostname, long port, @CachedContext(YonaLanguage.class
clientSocketChannel.configureBlocking(false);
clientSocketChannel.connect(new InetSocketAddress(hostname.asJavaString(this), (int) port));
SelectionKey selectionKey = clientSocketChannel.register(context.socketSelector, SelectionKey.OP_CONNECT);
YonaClientChannel yonaClientChannel = new YonaClientChannel(context, clientSocketChannel, selectionKey, this, dispatch);
selectionKey.attach(yonaClientChannel);
Promise result = yonaClientChannel.yonaConnectionPromise.map((yonaConnection) -> new ConnectionContextManager((YonaConnection) yonaConnection, context), this);
TCPClientChannel TCPClientChannel = new TCPClientChannel(context, clientSocketChannel, selectionKey, this, dispatch);
selectionKey.attach(TCPClientChannel);
Promise result = TCPClientChannel.yonaConnectionPromise.map((yonaConnection) -> new ConnectionContextManager((TCPConnection) yonaConnection, context), this);
context.socketSelector.wakeup();
return result;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
import yona.ast.builtin.modules.BuiltinModuleInfo;
import yona.runtime.*;
import yona.runtime.async.Promise;
import yona.runtime.network.YonaConnection;
import yona.runtime.network.TCPConnection;
import yona.runtime.stdlib.Builtins;
import yona.runtime.stdlib.ExportedFunction;
import yona.runtime.threading.ExecutableFunction;

import java.io.IOException;

@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Connection")
@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Connection")
public final class SocketConnectionBuiltinModule implements BuiltinModule {
@NodeInfo(shortName = "run")
abstract static class RunBuiltin extends BuiltinNode {
Expand Down Expand Up @@ -58,8 +57,8 @@ public Object run(ContextManager<?> contextManager, Function function, @CachedLi

private static <T> T closeConnection(ConnectionContextManager connectionContextManager, T result, Node node) {
try {
YonaConnection yonaConnection = connectionContextManager.nativeData(node);
yonaConnection.selectionKey.channel().close();
TCPConnection TCPConnection = connectionContextManager.nativeData(node);
TCPConnection.selectionKey.channel().close();
return result;
} catch (IOException e) {
throw new yona.runtime.exceptions.IOException(e, node);
Expand All @@ -73,8 +72,8 @@ abstract static class CloseBuiltin extends BuiltinNode {
public Object close(ContextManager<?> contextManager, @CachedContext(YonaLanguage.class) Context context) {
ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this);
try {
YonaConnection yonaConnection = connectionContextManager.nativeData(this);
yonaConnection.selectionKey.channel().close();
TCPConnection TCPConnection = connectionContextManager.nativeData(this);
TCPConnection.selectionKey.channel().close();
return Unit.INSTANCE;
} catch (IOException e) {
throw new yona.runtime.exceptions.IOException(e, this);
Expand All @@ -87,9 +86,9 @@ abstract static class ReadUntilBuiltin extends BuiltinNode {
@Specialization
public Object readUntil(ContextManager<?> contextManager, Function untilCallback, @CachedLibrary(limit = "3") InteropLibrary dispatch, @CachedContext(YonaLanguage.class) Context context) {
ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this);
YonaConnection yonaConnection = connectionContextManager.nativeData(this);
TCPConnection TCPConnection = connectionContextManager.nativeData(this);
Promise promise = new Promise(dispatch);
yonaConnection.readQueue.submit(new YonaConnection.ReadRequest(untilCallback, promise));
TCPConnection.readQueue.submit(new TCPConnection.ReadRequest(untilCallback, promise));
context.socketSelector.wakeup();
return promise;
}
Expand All @@ -100,9 +99,9 @@ abstract static class WriteBuiltin extends BuiltinNode {
@Specialization
public Object write(ContextManager<?> contextManager, Seq data, @CachedLibrary(limit = "3") InteropLibrary dispatch, @CachedContext(YonaLanguage.class) Context context) {
ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this);
YonaConnection yonaConnection = connectionContextManager.nativeData(this);
TCPConnection TCPConnection = connectionContextManager.nativeData(this);
Promise promise = new Promise(dispatch);
yonaConnection.writeQueue.submit(new YonaConnection.WriteRequest(data, promise));
TCPConnection.writeQueue.submit(new TCPConnection.WriteRequest(data, promise));
context.socketSelector.wakeup();
return promise;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.nodes.NodeInfo;
import com.oracle.truffle.api.nodes.UnexpectedResultException;
import yona.TypesGen;
import yona.YonaException;
import yona.YonaLanguage;
import yona.ast.builtin.BuiltinNode;
Expand All @@ -18,26 +20,25 @@
import yona.runtime.*;
import yona.runtime.async.Promise;
import yona.runtime.exceptions.BadArgException;
import yona.runtime.network.YonaConnection;
import yona.runtime.network.YonaServerChannel;
import yona.runtime.network.TCPConnection;
import yona.runtime.network.TCPServerChannel;
import yona.runtime.stdlib.Builtins;
import yona.runtime.stdlib.ExportedFunction;
import yona.runtime.threading.ExecutableFunction;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;

@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Server")
@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Server")
public final class SocketServerBuiltinModule implements BuiltinModule {
private static final class ChannelContextManager extends NativeObjectContextManager<YonaServerChannel> {
public ChannelContextManager(YonaServerChannel yonaServerChannel, Context context) {
super("server_channel", context.lookupGlobalFunction("socket\\Server", "run"), yonaServerChannel);
private static final class ChannelContextManager extends NativeObjectContextManager<TCPServerChannel> {
public ChannelContextManager(TCPServerChannel TCPServerChannel, Context context) {
super("server_channel", context.lookupGlobalFunction("socket\\Server", "run"), TCPServerChannel);
}

public static ChannelContextManager adapt(ContextManager<?> contextManager, Context context, Node node) {
return new ChannelContextManager(((NativeObject<YonaServerChannel>) contextManager.getData(node)).getValue(), context);
return new ChannelContextManager(((NativeObject<TCPServerChannel>) contextManager.getData(node)).getValue(), context);
}
}

Expand Down Expand Up @@ -73,8 +74,8 @@ public Object run(ContextManager<?> contextManager, Function function, @CachedLi

private static <T> T closeChannel(ChannelContextManager connectionContextManager, T result, Node node) {
try {
YonaServerChannel yonaServerChannel = connectionContextManager.nativeData(node);
yonaServerChannel.serverSocketChannel.close();
TCPServerChannel TCPServerChannel = connectionContextManager.nativeData(node);
TCPServerChannel.serverSocketChannel.close();
return result;
} catch (IOException e) {
throw new yona.runtime.exceptions.IOException(e, node);
Expand All @@ -85,7 +86,54 @@ private static <T> T closeChannel(ChannelContextManager connectionContextManager
@NodeInfo(shortName = "channel")
abstract static class ChannelBuiltin extends BuiltinNode {
@Specialization
public Object channel(Seq hostname, long port, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) {
public Object channel(Tuple args, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) {
Object unwrappedArgs = args.unwrapPromises(this);
if (unwrappedArgs instanceof Promise unwrappedArgsPromise) {
return unwrappedArgsPromise.map((resultArgs) -> createSocket((Object[]) resultArgs, context, dispatch), this);
} else if (unwrappedArgs instanceof Object[]) {
return createSocket((Object[]) unwrappedArgs, context, dispatch);
} else {
throw YonaException.typeError(this, unwrappedArgs);
}
}

private ChannelContextManager createSocket(Object[] args, Context context, InteropLibrary dispatch) {
if (args.length != 3) {
throw new BadArgException("socket\tcp::Channel expects triple of a type, host and port. Type must be :tcp", this);
}

Symbol socketType;
try {
socketType = TypesGen.expectSymbol(args[0]);
} catch (UnexpectedResultException e) {
throw new BadArgException("Expected symbol, got " + args[0], e, this);
}

switch (socketType.asString()) {
case "tcp":
Seq hostname;
long port;
try {
hostname = TypesGen.expectSeq(args[1]);
} catch (UnexpectedResultException e) {
throw new BadArgException("Expected string, got: " + args[1], e, this);
}
try {
port = TypesGen.expectLong(args[2]);
} catch (UnexpectedResultException e) {
throw new BadArgException("Expected integer, got: " + args[2], e, this);
}

return tcpSocket(hostname, port, context, dispatch);

// case "unix":
// // TODO implement
default:
throw new BadArgException("Unsupported socket type: " + socketType, this);
}
}

private ChannelContextManager tcpSocket(Seq hostname, long port, Context context, InteropLibrary dispatch) {
if (port < 0L || port >= 65535L) {
throw new BadArgException("port must be between 0 and 65535", this);
}
Expand All @@ -94,7 +142,7 @@ public Object channel(Seq hostname, long port, @CachedContext(YonaLanguage.class
serverSocketChannel.bind(new InetSocketAddress(hostname.asJavaString(this), (int) port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(context.socketSelector, SelectionKey.OP_ACCEPT);
YonaServerChannel result = new YonaServerChannel(context, serverSocketChannel, selectionKey, this, dispatch);
TCPServerChannel result = new TCPServerChannel(context, serverSocketChannel, selectionKey, this, dispatch);
selectionKey.attach(result);
context.socketSelector.wakeup();
return new ChannelContextManager(result, context);
Expand All @@ -109,11 +157,11 @@ abstract static class AcceptBuiltin extends BuiltinNode {
@Specialization
public Object accept(ContextManager<?> contextManager, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) {
ChannelContextManager connectionContextManager = ChannelContextManager.adapt(contextManager, context, this);
YonaServerChannel yonaServerChannel = connectionContextManager.nativeData(this);
TCPServerChannel TCPServerChannel = connectionContextManager.nativeData(this);
Promise promise = new Promise(dispatch);
yonaServerChannel.connectionPromises.submit(promise);
TCPServerChannel.connectionPromises.submit(promise);
context.socketSelector.wakeup();
return promise.map(yonaConnection -> new ConnectionContextManager((YonaConnection) yonaConnection, context), this);
return promise.map(yonaConnection -> new ConnectionContextManager((TCPConnection) yonaConnection, context), this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ private Object executeResultNode(final VirtualFrame frame, final String name, fi
}
} else {
try {
// TODO if isDaemon, resultValue should be wrapped into promise and the code from the branch above should apply to it
Object result = library.execute(wrapFunction, contextManager, TypesGen.expectFunction(resultValue));
if (result instanceof Promise resultPromise) {
shouldCleanup = false;
Expand Down
5 changes: 0 additions & 5 deletions language/src/main/java/yona/runtime/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,6 @@ public Function lookupGlobalFunction(String fqn, String function) {
@CompilerDirectives.TruffleBoundary
public void dispose() {
// LOGGER.fine("Threading shutting down");
try {
socketSelector.close();
} catch (IOException e) {
e.printStackTrace();
}
threading.dispose();
ioExecutor.shutdown();
assert ioExecutor.shutdownNow().isEmpty();
Expand Down
1 change: 0 additions & 1 deletion language/src/main/java/yona/runtime/network/NIOQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public final class NIOQueue<T> {
public final SingleConsumer consumer;

public final T[] items;
private final Class<T> itemElementType;

public NIOQueue(Class<T> itemsType, int maxLength) {
this.queue = new MultiProducerSingleConsumerCursors(maxLength);
Expand Down
Loading

0 comments on commit 62d8d60

Please sign in to comment.