diff --git a/.travis.yml b/.travis.yml index 76724fb..34fbb87 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,10 @@ language: generic env: - JAVA_HOME=$HOME/jdk PATH=$JAVA_HOME/bin:$HOME/bin:$PATH GRAALVM_VERSION="21.0.0" GRAALVM_JAVA_VERSION="11" before_install: + - curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - + - curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/microsoft.list + - sudo apt-get update + - sudo apt-get install -y powershell - travis_retry wget --no-verbose -O $HOME/travis-wait-enhanced.tar.gz https://github.com/crazy-max/travis-wait-enhanced/releases/download/v1.2.0/travis-wait-enhanced_1.2.0_linux_x86_64.tar.gz - mkdir -p $HOME/bin - tar -zxf $HOME/travis-wait-enhanced.tar.gz -C $HOME/bin diff --git a/benchmarks/echo-server/server.py b/benchmarks/echo-server/server.py index a4e7271..2483fba 100644 --- a/benchmarks/echo-server/server.py +++ b/benchmarks/echo-server/server.py @@ -1,12 +1,13 @@ import asyncore -class EchoHandler(asyncore.dispatcher_with_send): +class EchoHandler(asyncore.dispatcher_with_send): def handle_read(self): data = self.recv(8192) if data: self.send(data) + class EchoServer(asyncore.dispatcher): def __init__(self, host, port): @@ -20,5 +21,6 @@ def handle_accepted(self, sock, addr): print('Incoming connection from %s' % repr(addr)) handler = EchoHandler(sock) + server = EchoServer('localhost', 5555) asyncore.loop() diff --git a/benchmarks/echo-server/server.yona b/benchmarks/echo-server/server.yona index 69f1987..9fb2a7b 100644 --- a/benchmarks/echo-server/server.yona +++ b/benchmarks/echo-server/server.yona @@ -1,6 +1,6 @@ -with socket\Server::channel "127.0.0.1" 5555 as channel +with socket\tcp\Server::channel (:tcp, "127.0.0.1", 5555) as channel (\ -> - with daemon socket\Server::accept channel as connection - socket\Connection::read_line connection |> socket\Connection::write connection + with daemon socket\tcp\Server::accept channel as connection + socket\tcp\Connection::read_line connection |> socket\tcp\Connection::write connection end) |> infi end diff --git a/examples/sockets/client.yona b/examples/sockets/client.yona index d0efe86..ebd2a6e 100644 --- a/examples/sockets/client.yona +++ b/examples/sockets/client.yona @@ -6,13 +6,13 @@ let keep_reading = \b -> if b == (ord 'o') then false else true # end on 'o' in -with socket\Client::connect addr port as connection +with socket\tcp\Client::connect addr port as connection do _prog_name -| msg -| _ = System::args - socket\Connection::read_line connection + socket\tcp\Connection::read_line connection IO::println "<- {msg}" - socket\Connection::write connection msg + socket\tcp\Connection::write connection msg IO::print "-> " - socket\Connection::read_until connection keep_reading |> Seq::decode |> IO::println + socket\tcp\Connection::read_until connection keep_reading |> Seq::decode |> IO::println end end diff --git a/examples/sockets/server.yona b/examples/sockets/server.yona index c74ca4a..033ca7a 100644 --- a/examples/sockets/server.yona +++ b/examples/sockets/server.yona @@ -4,11 +4,11 @@ let addr = "127.0.0.1" port = 5555 - accept = \channel -> do - with daemon socket\Server::accept channel as connection + accept = \channel -> + with daemon socket\tcp\Server::accept channel as connection do IO::println "connection accepted" - socket\Connection::write connection "welcome: " + socket\tcp\Connection::write connection "welcome: " IO::println "-> welcome: " IO::print "<- " request = socket\Connection::read_line connection |> Seq::decode @@ -17,10 +17,9 @@ let IO::println "-> {request}" end end - end in -with socket\Server::channel addr port as channel +with socket\tcp\Server::channel (:tcp, addr port) as channel do IO::println "listening on {addr}:{port}" infi <| \-> accept channel diff --git a/language/lib-yona/socket/Connection.yona b/language/lib-yona/socket/tcp/Connection.yona similarity index 69% rename from language/lib-yona/socket/Connection.yona rename to language/lib-yona/socket/tcp/Connection.yona index cc6da86..85ca739 100644 --- a/language/lib-yona/socket/Connection.yona +++ b/language/lib-yona/socket/tcp/Connection.yona @@ -1,4 +1,4 @@ -module socket\Connection exports read_line as +module socket\tcp\Connection exports read_line as # read from socket, until LF (ord 10) read_line connection = read_until connection (\b -> b != 10b) end diff --git a/language/src/main/java/yona/ast/builtin/modules/socket/ConnectionContextManager.java b/language/src/main/java/yona/ast/builtin/modules/socket/ConnectionContextManager.java index d63f697..ded59c3 100644 --- a/language/src/main/java/yona/ast/builtin/modules/socket/ConnectionContextManager.java +++ b/language/src/main/java/yona/ast/builtin/modules/socket/ConnectionContextManager.java @@ -5,14 +5,14 @@ import yona.runtime.ContextManager; import yona.runtime.NativeObject; import yona.runtime.NativeObjectContextManager; -import yona.runtime.network.YonaConnection; +import yona.runtime.network.TCPConnection; -final class ConnectionContextManager extends NativeObjectContextManager { - public ConnectionContextManager(YonaConnection yonaConnection, Context context) { - super("connection", context.lookupGlobalFunction("socket\\Connection", "run"), yonaConnection); +final class ConnectionContextManager extends NativeObjectContextManager { + public ConnectionContextManager(TCPConnection TCPConnection, Context context) { + super("connection", context.lookupGlobalFunction("socket\\Connection", "run"), TCPConnection); } public static ConnectionContextManager adapt(ContextManager contextManager, Context context, Node node) { - return new ConnectionContextManager(((NativeObject) contextManager.getData(node)).getValue(), context); + return new ConnectionContextManager(((NativeObject) contextManager.getData(node)).getValue(), context); } } diff --git a/language/src/main/java/yona/ast/builtin/modules/socket/SocketClientBuiltinModule.java b/language/src/main/java/yona/ast/builtin/modules/socket/SocketClientBuiltinModule.java index e12e1be..19d7e40 100644 --- a/language/src/main/java/yona/ast/builtin/modules/socket/SocketClientBuiltinModule.java +++ b/language/src/main/java/yona/ast/builtin/modules/socket/SocketClientBuiltinModule.java @@ -13,8 +13,8 @@ import yona.runtime.Seq; import yona.runtime.async.Promise; import yona.runtime.exceptions.BadArgException; -import yona.runtime.network.YonaClientChannel; -import yona.runtime.network.YonaConnection; +import yona.runtime.network.TCPClientChannel; +import yona.runtime.network.TCPConnection; import yona.runtime.stdlib.Builtins; import yona.runtime.stdlib.ExportedFunction; @@ -23,7 +23,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Client") +@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Client") public final class SocketClientBuiltinModule implements BuiltinModule { @NodeInfo(shortName = "connect") abstract static class ConnectBuiltin extends BuiltinNode { @@ -37,9 +37,9 @@ public Object connect(Seq hostname, long port, @CachedContext(YonaLanguage.class clientSocketChannel.configureBlocking(false); clientSocketChannel.connect(new InetSocketAddress(hostname.asJavaString(this), (int) port)); SelectionKey selectionKey = clientSocketChannel.register(context.socketSelector, SelectionKey.OP_CONNECT); - YonaClientChannel yonaClientChannel = new YonaClientChannel(context, clientSocketChannel, selectionKey, this, dispatch); - selectionKey.attach(yonaClientChannel); - Promise result = yonaClientChannel.yonaConnectionPromise.map((yonaConnection) -> new ConnectionContextManager((YonaConnection) yonaConnection, context), this); + TCPClientChannel TCPClientChannel = new TCPClientChannel(context, clientSocketChannel, selectionKey, this, dispatch); + selectionKey.attach(TCPClientChannel); + Promise result = TCPClientChannel.yonaConnectionPromise.map((yonaConnection) -> new ConnectionContextManager((TCPConnection) yonaConnection, context), this); context.socketSelector.wakeup(); return result; } catch (IOException e) { diff --git a/language/src/main/java/yona/ast/builtin/modules/socket/SocketConnectionBuiltinModule.java b/language/src/main/java/yona/ast/builtin/modules/socket/SocketConnectionBuiltinModule.java index 37b6fe2..cffa41a 100644 --- a/language/src/main/java/yona/ast/builtin/modules/socket/SocketConnectionBuiltinModule.java +++ b/language/src/main/java/yona/ast/builtin/modules/socket/SocketConnectionBuiltinModule.java @@ -17,14 +17,13 @@ import yona.ast.builtin.modules.BuiltinModuleInfo; import yona.runtime.*; import yona.runtime.async.Promise; -import yona.runtime.network.YonaConnection; +import yona.runtime.network.TCPConnection; import yona.runtime.stdlib.Builtins; import yona.runtime.stdlib.ExportedFunction; -import yona.runtime.threading.ExecutableFunction; import java.io.IOException; -@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Connection") +@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Connection") public final class SocketConnectionBuiltinModule implements BuiltinModule { @NodeInfo(shortName = "run") abstract static class RunBuiltin extends BuiltinNode { @@ -58,8 +57,8 @@ public Object run(ContextManager contextManager, Function function, @CachedLi private static T closeConnection(ConnectionContextManager connectionContextManager, T result, Node node) { try { - YonaConnection yonaConnection = connectionContextManager.nativeData(node); - yonaConnection.selectionKey.channel().close(); + TCPConnection TCPConnection = connectionContextManager.nativeData(node); + TCPConnection.selectionKey.channel().close(); return result; } catch (IOException e) { throw new yona.runtime.exceptions.IOException(e, node); @@ -73,8 +72,8 @@ abstract static class CloseBuiltin extends BuiltinNode { public Object close(ContextManager contextManager, @CachedContext(YonaLanguage.class) Context context) { ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this); try { - YonaConnection yonaConnection = connectionContextManager.nativeData(this); - yonaConnection.selectionKey.channel().close(); + TCPConnection TCPConnection = connectionContextManager.nativeData(this); + TCPConnection.selectionKey.channel().close(); return Unit.INSTANCE; } catch (IOException e) { throw new yona.runtime.exceptions.IOException(e, this); @@ -87,9 +86,9 @@ abstract static class ReadUntilBuiltin extends BuiltinNode { @Specialization public Object readUntil(ContextManager contextManager, Function untilCallback, @CachedLibrary(limit = "3") InteropLibrary dispatch, @CachedContext(YonaLanguage.class) Context context) { ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this); - YonaConnection yonaConnection = connectionContextManager.nativeData(this); + TCPConnection TCPConnection = connectionContextManager.nativeData(this); Promise promise = new Promise(dispatch); - yonaConnection.readQueue.submit(new YonaConnection.ReadRequest(untilCallback, promise)); + TCPConnection.readQueue.submit(new TCPConnection.ReadRequest(untilCallback, promise)); context.socketSelector.wakeup(); return promise; } @@ -100,9 +99,9 @@ abstract static class WriteBuiltin extends BuiltinNode { @Specialization public Object write(ContextManager contextManager, Seq data, @CachedLibrary(limit = "3") InteropLibrary dispatch, @CachedContext(YonaLanguage.class) Context context) { ConnectionContextManager connectionContextManager = ConnectionContextManager.adapt(contextManager, context, this); - YonaConnection yonaConnection = connectionContextManager.nativeData(this); + TCPConnection TCPConnection = connectionContextManager.nativeData(this); Promise promise = new Promise(dispatch); - yonaConnection.writeQueue.submit(new YonaConnection.WriteRequest(data, promise)); + TCPConnection.writeQueue.submit(new TCPConnection.WriteRequest(data, promise)); context.socketSelector.wakeup(); return promise; } diff --git a/language/src/main/java/yona/ast/builtin/modules/socket/SocketServerBuiltinModule.java b/language/src/main/java/yona/ast/builtin/modules/socket/SocketServerBuiltinModule.java index fc41c17..1efc87d 100644 --- a/language/src/main/java/yona/ast/builtin/modules/socket/SocketServerBuiltinModule.java +++ b/language/src/main/java/yona/ast/builtin/modules/socket/SocketServerBuiltinModule.java @@ -10,6 +10,8 @@ import com.oracle.truffle.api.library.CachedLibrary; import com.oracle.truffle.api.nodes.Node; import com.oracle.truffle.api.nodes.NodeInfo; +import com.oracle.truffle.api.nodes.UnexpectedResultException; +import yona.TypesGen; import yona.YonaException; import yona.YonaLanguage; import yona.ast.builtin.BuiltinNode; @@ -18,26 +20,25 @@ import yona.runtime.*; import yona.runtime.async.Promise; import yona.runtime.exceptions.BadArgException; -import yona.runtime.network.YonaConnection; -import yona.runtime.network.YonaServerChannel; +import yona.runtime.network.TCPConnection; +import yona.runtime.network.TCPServerChannel; import yona.runtime.stdlib.Builtins; import yona.runtime.stdlib.ExportedFunction; -import yona.runtime.threading.ExecutableFunction; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; -@BuiltinModuleInfo(packageParts = {"socket"}, moduleName = "Server") +@BuiltinModuleInfo(packageParts = {"socket", "tcp"}, moduleName = "Server") public final class SocketServerBuiltinModule implements BuiltinModule { - private static final class ChannelContextManager extends NativeObjectContextManager { - public ChannelContextManager(YonaServerChannel yonaServerChannel, Context context) { - super("server_channel", context.lookupGlobalFunction("socket\\Server", "run"), yonaServerChannel); + private static final class ChannelContextManager extends NativeObjectContextManager { + public ChannelContextManager(TCPServerChannel TCPServerChannel, Context context) { + super("server_channel", context.lookupGlobalFunction("socket\\Server", "run"), TCPServerChannel); } public static ChannelContextManager adapt(ContextManager contextManager, Context context, Node node) { - return new ChannelContextManager(((NativeObject) contextManager.getData(node)).getValue(), context); + return new ChannelContextManager(((NativeObject) contextManager.getData(node)).getValue(), context); } } @@ -73,8 +74,8 @@ public Object run(ContextManager contextManager, Function function, @CachedLi private static T closeChannel(ChannelContextManager connectionContextManager, T result, Node node) { try { - YonaServerChannel yonaServerChannel = connectionContextManager.nativeData(node); - yonaServerChannel.serverSocketChannel.close(); + TCPServerChannel TCPServerChannel = connectionContextManager.nativeData(node); + TCPServerChannel.serverSocketChannel.close(); return result; } catch (IOException e) { throw new yona.runtime.exceptions.IOException(e, node); @@ -85,7 +86,54 @@ private static T closeChannel(ChannelContextManager connectionContextManager @NodeInfo(shortName = "channel") abstract static class ChannelBuiltin extends BuiltinNode { @Specialization - public Object channel(Seq hostname, long port, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) { + public Object channel(Tuple args, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) { + Object unwrappedArgs = args.unwrapPromises(this); + if (unwrappedArgs instanceof Promise unwrappedArgsPromise) { + return unwrappedArgsPromise.map((resultArgs) -> createSocket((Object[]) resultArgs, context, dispatch), this); + } else if (unwrappedArgs instanceof Object[]) { + return createSocket((Object[]) unwrappedArgs, context, dispatch); + } else { + throw YonaException.typeError(this, unwrappedArgs); + } + } + + private ChannelContextManager createSocket(Object[] args, Context context, InteropLibrary dispatch) { + if (args.length != 3) { + throw new BadArgException("socket\tcp::Channel expects triple of a type, host and port. Type must be :tcp", this); + } + + Symbol socketType; + try { + socketType = TypesGen.expectSymbol(args[0]); + } catch (UnexpectedResultException e) { + throw new BadArgException("Expected symbol, got " + args[0], e, this); + } + + switch (socketType.asString()) { + case "tcp": + Seq hostname; + long port; + try { + hostname = TypesGen.expectSeq(args[1]); + } catch (UnexpectedResultException e) { + throw new BadArgException("Expected string, got: " + args[1], e, this); + } + try { + port = TypesGen.expectLong(args[2]); + } catch (UnexpectedResultException e) { + throw new BadArgException("Expected integer, got: " + args[2], e, this); + } + + return tcpSocket(hostname, port, context, dispatch); + +// case "unix": +// // TODO implement + default: + throw new BadArgException("Unsupported socket type: " + socketType, this); + } + } + + private ChannelContextManager tcpSocket(Seq hostname, long port, Context context, InteropLibrary dispatch) { if (port < 0L || port >= 65535L) { throw new BadArgException("port must be between 0 and 65535", this); } @@ -94,7 +142,7 @@ public Object channel(Seq hostname, long port, @CachedContext(YonaLanguage.class serverSocketChannel.bind(new InetSocketAddress(hostname.asJavaString(this), (int) port)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey = serverSocketChannel.register(context.socketSelector, SelectionKey.OP_ACCEPT); - YonaServerChannel result = new YonaServerChannel(context, serverSocketChannel, selectionKey, this, dispatch); + TCPServerChannel result = new TCPServerChannel(context, serverSocketChannel, selectionKey, this, dispatch); selectionKey.attach(result); context.socketSelector.wakeup(); return new ChannelContextManager(result, context); @@ -109,11 +157,11 @@ abstract static class AcceptBuiltin extends BuiltinNode { @Specialization public Object accept(ContextManager contextManager, @CachedContext(YonaLanguage.class) Context context, @CachedLibrary(limit = "3") InteropLibrary dispatch) { ChannelContextManager connectionContextManager = ChannelContextManager.adapt(contextManager, context, this); - YonaServerChannel yonaServerChannel = connectionContextManager.nativeData(this); + TCPServerChannel TCPServerChannel = connectionContextManager.nativeData(this); Promise promise = new Promise(dispatch); - yonaServerChannel.connectionPromises.submit(promise); + TCPServerChannel.connectionPromises.submit(promise); context.socketSelector.wakeup(); - return promise.map(yonaConnection -> new ConnectionContextManager((YonaConnection) yonaConnection, context), this); + return promise.map(yonaConnection -> new ConnectionContextManager((TCPConnection) yonaConnection, context), this); } } diff --git a/language/src/main/java/yona/ast/expression/WithExpression.java b/language/src/main/java/yona/ast/expression/WithExpression.java index 90379ff..17b9f13 100644 --- a/language/src/main/java/yona/ast/expression/WithExpression.java +++ b/language/src/main/java/yona/ast/expression/WithExpression.java @@ -120,6 +120,7 @@ private Object executeResultNode(final VirtualFrame frame, final String name, fi } } else { try { + // TODO if isDaemon, resultValue should be wrapped into promise and the code from the branch above should apply to it Object result = library.execute(wrapFunction, contextManager, TypesGen.expectFunction(resultValue)); if (result instanceof Promise resultPromise) { shouldCleanup = false; diff --git a/language/src/main/java/yona/runtime/Context.java b/language/src/main/java/yona/runtime/Context.java index d2bdb1e..7a5b70f 100644 --- a/language/src/main/java/yona/runtime/Context.java +++ b/language/src/main/java/yona/runtime/Context.java @@ -502,11 +502,6 @@ public Function lookupGlobalFunction(String fqn, String function) { @CompilerDirectives.TruffleBoundary public void dispose() { // LOGGER.fine("Threading shutting down"); - try { - socketSelector.close(); - } catch (IOException e) { - e.printStackTrace(); - } threading.dispose(); ioExecutor.shutdown(); assert ioExecutor.shutdownNow().isEmpty(); diff --git a/language/src/main/java/yona/runtime/network/NIOQueue.java b/language/src/main/java/yona/runtime/network/NIOQueue.java index 5a6d53e..f4b7b87 100644 --- a/language/src/main/java/yona/runtime/network/NIOQueue.java +++ b/language/src/main/java/yona/runtime/network/NIOQueue.java @@ -11,7 +11,6 @@ public final class NIOQueue { public final SingleConsumer consumer; public final T[] items; - private final Class itemElementType; public NIOQueue(Class itemsType, int maxLength) { this.queue = new MultiProducerSingleConsumerCursors(maxLength); diff --git a/language/src/main/java/yona/runtime/network/NIOSelectorThread.java b/language/src/main/java/yona/runtime/network/NIOSelectorThread.java index d8e759b..8a172fc 100644 --- a/language/src/main/java/yona/runtime/network/NIOSelectorThread.java +++ b/language/src/main/java/yona/runtime/network/NIOSelectorThread.java @@ -3,26 +3,24 @@ import com.oracle.truffle.api.interop.ArityException; import com.oracle.truffle.api.interop.UnsupportedMessageException; import com.oracle.truffle.api.interop.UnsupportedTypeException; +import yona.TypesGen; import yona.YonaException; import yona.runtime.Context; import yona.runtime.Seq; -import yona.runtime.Unit; import yona.runtime.async.Promise; import yona.runtime.exceptions.BadArgException; -import yona.runtime.threading.ExecutableFunction; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; +import java.nio.channels.*; public final class NIOSelectorThread extends Thread { protected static final int SOCKET_READ_BUFFER_SIZE = 128; private final Context context; + private volatile boolean closing = false; + public NIOSelectorThread(Context context) { this.context = context; this.setName("yona-nio-selector"); @@ -30,25 +28,24 @@ public NIOSelectorThread(Context context) { @Override public void run() { - while (context.socketSelector.isOpen()) { + while (!closing && context.socketSelector.isOpen()) { try { if (context.socketSelector.select() >= 0) { for (SelectionKey key : context.socketSelector.selectedKeys()) { if (key.isValid()) { if (key.isAcceptable()) { - // a connection was accepted by a ServerSocketChannel. - accept((YonaServerChannel) key.attachment(), key); + accept((TCPServerChannel) key.attachment(), key); } else if (key.isConnectable()) { - connect((YonaClientChannel) key.attachment(), key); + connect((TCPClientChannel) key.attachment(), key); } else { + SelectableChannel selectableChannel = key.channel(); + if (key.isReadable()) { - // a channel is ready for reading - read((YonaConnection) key.attachment(), (SocketChannel) key.channel(), key); + read((TCPConnection) key.attachment(), (SocketChannel) selectableChannel, key); } if (key.isValid() && key.isWritable()) { - // a channel is ready for writing - write((YonaConnection) key.attachment(), (SocketChannel) key.channel(), key); + write((TCPConnection) key.attachment(), (SocketChannel) selectableChannel, key); } } } @@ -59,10 +56,21 @@ public void run() { e.printStackTrace(); } } + + if (closing) { + try { + context.socketSelector.close(); + } catch (IOException ignored) { + } + } + } + + public void close() { + this.closing = true; } // accept client connection - private void accept(YonaServerChannel yonaServerChannel, SelectionKey acceptKey) throws IOException, InterruptedException { + private void accept(TCPServerChannel TCPServerChannel, SelectionKey acceptKey) throws IOException, InterruptedException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) acceptKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); @@ -88,17 +96,21 @@ private void accept(YonaServerChannel yonaServerChannel, SelectionKey acceptKey) } // connect to the server - private void connect(YonaClientChannel yonaClientChannel, SelectionKey acceptKey) throws IOException, InterruptedException { - SocketChannel socketChannel = (SocketChannel) acceptKey.channel(); - socketChannel.configureBlocking(false); - if (!yonaClientChannel.yonaConnectionPromise.isFulfilled()) { - while (socketChannel.isConnectionPending()) { - socketChannel.finishConnect(); + private void connect(TCPClientChannel TCPClientChannel, SelectionKey acceptKey) throws IOException, InterruptedException { + try { + SocketChannel socketChannel = (SocketChannel) acceptKey.channel(); + socketChannel.configureBlocking(false); + if (!TCPClientChannel.yonaConnectionPromise.isFulfilled()) { + while (socketChannel.isConnectionPending()) { + socketChannel.finishConnect(); + } + SelectionKey readKey = socketChannel.register(context.socketSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); + TCPConnection TCPConnection = new TCPConnection(readKey, TCPClientChannel.dispatch, TCPClientChannel.context, TCPClientChannel.node); + TCPClientChannel.yonaConnectionPromise.fulfil(TCPConnection, TCPConnection.node); + readKey.attach(TCPConnection); } - SelectionKey readKey = socketChannel.register(context.socketSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); - YonaConnection yonaConnection = new YonaConnection(readKey, yonaClientChannel.dispatch, yonaClientChannel.context, yonaClientChannel.node); - yonaClientChannel.yonaConnectionPromise.fulfil(yonaConnection, yonaConnection.node); - readKey.attach(yonaConnection); + } catch (IOException e) { + TCPClientChannel.yonaConnectionPromise.fulfil(new yona.runtime.exceptions.IOException(e, TCPClientChannel.node), TCPClientChannel.node); } } diff --git a/language/src/main/java/yona/runtime/network/YonaClientChannel.java b/language/src/main/java/yona/runtime/network/TCPClientChannel.java similarity index 80% rename from language/src/main/java/yona/runtime/network/YonaClientChannel.java rename to language/src/main/java/yona/runtime/network/TCPClientChannel.java index 16c2baa..50258bf 100644 --- a/language/src/main/java/yona/runtime/network/YonaClientChannel.java +++ b/language/src/main/java/yona/runtime/network/TCPClientChannel.java @@ -8,7 +8,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -public class YonaClientChannel { +public class TCPClientChannel { public final Context context; public final SocketChannel clientSocketChannel; public final Node node; @@ -16,7 +16,7 @@ public class YonaClientChannel { public final SelectionKey selectionKey; public final InteropLibrary dispatch; - public YonaClientChannel(Context context, SocketChannel clientSocketChannel, SelectionKey selectionKey, Node node, InteropLibrary dispatch) { + public TCPClientChannel(Context context, SocketChannel clientSocketChannel, SelectionKey selectionKey, Node node, InteropLibrary dispatch) { this.context = context; this.clientSocketChannel = clientSocketChannel; this.selectionKey = selectionKey; diff --git a/language/src/main/java/yona/runtime/network/YonaConnection.java b/language/src/main/java/yona/runtime/network/TCPConnection.java similarity index 88% rename from language/src/main/java/yona/runtime/network/YonaConnection.java rename to language/src/main/java/yona/runtime/network/TCPConnection.java index 4e7f125..73f2d42 100644 --- a/language/src/main/java/yona/runtime/network/YonaConnection.java +++ b/language/src/main/java/yona/runtime/network/TCPConnection.java @@ -9,7 +9,7 @@ import java.nio.channels.SelectionKey; -public class YonaConnection { +public class TCPConnection { private final static int MAX_RW_QUEUE_LENGTH = 16; public final SelectionKey selectionKey; @@ -20,7 +20,7 @@ public class YonaConnection { public final NIOQueue writeQueue; public final NIOQueue readQueue; - public YonaConnection(SelectionKey selectionKey, InteropLibrary dispatch, Context context, Node node) { + public TCPConnection(SelectionKey selectionKey, InteropLibrary dispatch, Context context, Node node) { this.selectionKey = selectionKey; this.dispatch = dispatch; this.context = context; diff --git a/language/src/main/java/yona/runtime/network/YonaServerChannel.java b/language/src/main/java/yona/runtime/network/TCPServerChannel.java similarity index 70% rename from language/src/main/java/yona/runtime/network/YonaServerChannel.java rename to language/src/main/java/yona/runtime/network/TCPServerChannel.java index e68f96e..675eb04 100644 --- a/language/src/main/java/yona/runtime/network/YonaServerChannel.java +++ b/language/src/main/java/yona/runtime/network/TCPServerChannel.java @@ -6,19 +6,19 @@ import yona.runtime.async.Promise; import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; +import java.nio.channels.spi.AbstractSelectableChannel; -public class YonaServerChannel { +public class TCPServerChannel { public static final int MAX_CONNECTIONS = 1024; public final Context context; - public final ServerSocketChannel serverSocketChannel; + public final AbstractSelectableChannel serverSocketChannel; public final Node node; public final SelectionKey selectionKey; public final NIOQueue connectionPromises; public final InteropLibrary dispatch; - public YonaServerChannel(Context context, ServerSocketChannel serverSocketChannel, SelectionKey selectionKey, Node node, InteropLibrary dispatch) { + public TCPServerChannel(Context context, AbstractSelectableChannel serverSocketChannel, SelectionKey selectionKey, Node node, InteropLibrary dispatch) { this.context = context; this.serverSocketChannel = serverSocketChannel; this.selectionKey = selectionKey; diff --git a/language/src/main/java/yona/runtime/threading/Threading.java b/language/src/main/java/yona/runtime/threading/Threading.java index 3cc17fb..36c9a9d 100644 --- a/language/src/main/java/yona/runtime/threading/Threading.java +++ b/language/src/main/java/yona/runtime/threading/Threading.java @@ -161,6 +161,7 @@ public void dispose() { } } try { + NIOSelectorThread.close(); NIOSelectorThread.interrupt(); NIOSelectorThread.join(); } catch (InterruptedException e) { diff --git a/language/src/test/java/yona/ErrorsTest.java b/language/src/test/java/yona/ErrorsTest.java index 1ec598f..f5c4304 100644 --- a/language/src/test/java/yona/ErrorsTest.java +++ b/language/src/test/java/yona/ErrorsTest.java @@ -377,7 +377,7 @@ public void stringToFloatBadFormatTest() { } @Test - public void badRegexpOptions() { + public void badRegexpOptionsTest() { assertThrows(PolyglotException.class, () -> { try { context.eval(YonaLanguage.ID, "Regexp::compile \"(a|(b))c\" {:unknown}"); @@ -396,4 +396,16 @@ public void incompleteSourceTest() { assertTrue(ex.isIncompleteSource()); } } + + @Test + public void socketClientConnectionRefusedTest() { + assertThrows(PolyglotException.class, () -> { + try { + context.eval(YonaLanguage.ID, "socket\\tcp\\Client::connect \"localhost\" 6666"); + } catch (PolyglotException ex) { + assertEquals("java.net.ConnectException: Connection refused", ex.getMessage()); + throw ex; + } + }); + } } diff --git a/language/src/test/java/yona/SocketTest.java b/language/src/test/java/yona/SocketTest.java index 66fbd58..b03e60e 100644 --- a/language/src/test/java/yona/SocketTest.java +++ b/language/src/test/java/yona/SocketTest.java @@ -1,6 +1,5 @@ package yona; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -10,9 +9,8 @@ @ExtendWith(LoggingExtension.class) public class SocketTest { @Test - @Tag("slow") public void echoServerTest() throws IOException, InterruptedException { - int result = new ProcessBuilder("bash", "-l", "run.sh").directory(new File("../tests/echo-server")).start().waitFor(); + int result = new ProcessBuilder("pwsh", "-l", "run.ps1").directory(new File("../tests/echo-server")).start().waitFor(); assert 0 == result; } } diff --git a/tests/echo-server/client.yona b/tests/echo-server/client.yona index 83b224e..46757cf 100644 --- a/tests/echo-server/client.yona +++ b/tests/echo-server/client.yona @@ -4,9 +4,9 @@ let keep_reading = \b -> if b == (ord 'o') then false else true # end on 'o' in -with socket\Client::connect "127.0.0.1" (int port) as connection +with socket\tcp\Client::connect "127.0.0.1" (int port) as connection do - socket\Connection::write connection "hello" - resp = Seq::decode <| socket\Connection::read_until connection keep_reading + socket\tcp\Connection::write connection "hello" + socket\tcp\Connection::read_until connection keep_reading end end diff --git a/tests/echo-server/run.ps1 b/tests/echo-server/run.ps1 new file mode 100755 index 0000000..544bf8b --- /dev/null +++ b/tests/echo-server/run.ps1 @@ -0,0 +1,26 @@ +#!/usr/bin/env pwsh + +param ( + [Int16] $Port = 5555, + [Int16] $Repetitions = 5 +) + +Start-Job -ScriptBlock {../../yona -f ./server.yona $args[0] $args[1]} -Name YonaServer -ArgumentList $Port,$Repetitions + +Start-Sleep -Seconds 1 + +1..$Repetitions | ForEach-Object { Start-Job -ScriptBlock {../../yona -f ./client.yona $args[0] } -Name “YonaClient$_” -ArgumentList $Port } + +Get-Job -Name YonaClient* | Wait-Job | Receive-Job +Get-Job -Name YonaServer | Wait-Job | Receive-Job + +$ServerState = $(Get-Job YonaServer).State +$ClientState = $(Get-Job -Name YonaClient*).State | Select-Object -Unique + +if ($ServerState -ne 'Completed') { + throw "Server has not completed successfuly: $ServerState" +} + +if ($ClientState -ne 'Completed') { + throw "Clients has not completed successfuly: $ClientState" +} diff --git a/tests/echo-server/run.sh b/tests/echo-server/run.sh deleted file mode 100644 index c300dc9..0000000 --- a/tests/echo-server/run.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env bash - -PORT=55555 -REPETITIONS=50 - -../../yona -f server.yona $PORT $REPETITIONS > server.output 2> server.output & -pids[0]=$! - -for i in $(seq 1 $REPETITIONS); do - ../../yona -f client.yona $PORT > client.output 2> client.output & pids[${i}]=$! -done - - -# wait for all pids -for pid in ${pids[*]}; do - wait $pid -done - -[ -s server.output ] && exit 1 -[ -s client.output ] && exit 1 - -rm server.output client.output diff --git a/tests/echo-server/server.yona b/tests/echo-server/server.yona index f808122..836c7b4 100644 --- a/tests/echo-server/server.yona +++ b/tests/echo-server/server.yona @@ -2,12 +2,12 @@ let _ -| port -| repetitions -| _ = System::args accept = \channel -> do - with daemon socket\Server::accept channel as connection - socket\Connection::read_line connection |> socket\Connection::write connection + with daemon socket\tcp\Server::accept channel as connection + socket\tcp\Connection::read_line connection |> socket\tcp\Connection::write connection end end in -with socket\Server::channel "127.0.0.1" (int port) as channel +with socket\tcp\Server::channel (:tcp, "127.0.0.1", int port) as channel (int repetitions) `times` (\-> accept channel) end