diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 1759c771a5..fabe65cbc6 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -312,6 +312,16 @@ akka.http { } websocket { + # Maximum allowable time between two consecutive elements being received from client. + # If no data is received within receive-idle-timeout, the server message handler flow is failed with akka.stream.StreamIdleTimeoutException. + # Can be configured together with akka.http.client.server.websocket.periodic-keep-alive-max-idle to catch unexpected + # client disconnections (if client supports pong replies) + receive-idle-timeout = infinite + + # Maximum allowable time between two consecutive elements being send to client. + # If no data is sent within send-idle-timeout, server message handler flow is failed with akka.stream.StreamIdleTimeoutException. + send-idle-timeout = infinite + # periodic keep alive may be implemented using by sending Ping frames # upon which the other side is expected to reply with a Pong frame, # or by sending a Pong frame, which serves as unidirectional heartbeat. @@ -495,6 +505,16 @@ akka.http { #client-settings websocket { + # Maximum allowable time between two consecutive elements being received from server. + # If no data is received within receive-idle-timeout, the client flow is failed with akka.stream.StreamIdleTimeoutException. + # Can be configured together with akka.http.client.client.websocket.periodic-keep-alive-max-idle to catch unexpected + # network disconnections (if server supports pong replies) + receive-idle-timeout = infinite + + # Maximum allowable time between two consecutive elements being send to server. + # If no data is sent within send-idle-timeout, the client flow is failed with akka.stream.StreamIdleTimeoutException. + send-idle-timeout = infinite + # periodic keep alive may be implemented using by sending Ping frames # upon which the other side is expected to reply with a Pong frame, # or by sending a Pong frame, which serves as unidirectional heartbeat. diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocket.scala index 30ea875540..c2bbcf857b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebSocket.scala @@ -42,6 +42,7 @@ private[http] object WebSocket { masking(serverSide, websocketSettings.randomFactory) atop FrameLogger.logFramesIfEnabled(websocketSettings.logFrames) atop frameHandling(serverSide, closeTimeout, log) atop + idleTimeout(websocketSettings) atop periodicKeepAlive(websocketSettings) atop messageAPI(serverSide, closeTimeout) @@ -89,6 +90,21 @@ private[http] object WebSocket { private[this] final val PongFullFrame: FrameStart = FrameEvent.fullFrame(Opcode.Pong, None, ByteString.empty, fin = true) private[this] final val mkDirectAnswerPong = () => DirectAnswer(PongFullFrame) + /** The layer that transparently monitors data flow in both directions and fails the flow + * with [[akka.stream.StreamIdleTimeoutException]] if elements are not passed within configured time. + * */ + def idleTimeout(settings: WebSocketSettings): BidiFlow[FrameHandler.Output, FrameHandler.Output, FrameOutHandler.Input, FrameOutHandler.Input, NotUsed] = { + val receiveIdleTimeoutFlow = settings.receiveIdleTimeout match { + case receiveIdleTimeout: FiniteDuration => Flow[FrameHandler.Output].idleTimeout(receiveIdleTimeout) + case _ => Flow[FrameHandler.Output].map(identity) + } + val sendIdleTimeoutFlow = settings.sendIdleTimeout match { + case sendIdleTimeout: FiniteDuration => Flow[Input].idleTimeout(sendIdleTimeout) + case _ => Flow[Input].map(identity) + } + + BidiFlow.fromFlows(receiveIdleTimeoutFlow, sendIdleTimeoutFlow) + } /** * The layer that implements all low-level frame handling, like handling control frames, collecting messages * from frames, decoding text messages, close handling, etc. diff --git a/akka-http-core/src/main/scala/akka/http/impl/settings/WebSocketSettingsImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/settings/WebSocketSettingsImpl.scala index 730b05b216..4dad630c00 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/settings/WebSocketSettingsImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/settings/WebSocketSettingsImpl.scala @@ -18,6 +18,8 @@ import scala.concurrent.duration.Duration @InternalApi private[akka] final case class WebSocketSettingsImpl( randomFactory: () => Random, + receiveIdleTimeout: Duration, + sendIdleTimeout: Duration, periodicKeepAliveMode: String, periodicKeepAliveMaxIdle: Duration, periodicKeepAliveData: () => ByteString, @@ -57,6 +59,8 @@ private[akka] object WebSocketSettingsImpl { // on purpose not extending Setting val c = inner WebSocketSettingsImpl( Randoms.SecureRandomInstances, + c.getPotentiallyInfiniteDuration("receive-idle-timeout"), + c.getPotentiallyInfiniteDuration("send-idle-timeout"), c.getString("periodic-keep-alive-mode"), // mode could be extended to be a factory of pings, if we'd need control over the data field c.getPotentiallyInfiniteDuration("periodic-keep-alive-max-idle"), NoPeriodicKeepAliveData, diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/settings/WebSocketSettings.scala b/akka-http-core/src/main/scala/akka/http/javadsl/settings/WebSocketSettings.scala index 51323a7e4c..2806f0143c 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/settings/WebSocketSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/settings/WebSocketSettings.scala @@ -21,6 +21,8 @@ import scala.concurrent.duration.Duration @DoNotInherit trait WebSocketSettings { self: WebSocketSettingsImpl => def getRandomFactory: Supplier[Random] + def receiveIdleTimeout: Duration + def sendIdleTimeout: Duration def periodicKeepAliveMode: String def periodicKeepAliveMaxIdle: Duration /** diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/WebSocketSettings.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/WebSocketSettings.scala index 1bfc98a7f4..59c6d0b083 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/settings/WebSocketSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/settings/WebSocketSettings.scala @@ -18,6 +18,8 @@ abstract class WebSocketSettings extends akka.http.javadsl.settings.WebSocketSet override final val getRandomFactory: Supplier[Random] = new Supplier[Random] { override def get(): Random = self.randomFactory() } + override def receiveIdleTimeout: Duration + override def sendIdleTimeout: Duration override def periodicKeepAliveMode: String override def periodicKeepAliveMaxIdle: Duration /** diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala index 4bd83c2e19..9405ecafaa 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -4,8 +4,8 @@ package akka.http.impl.engine.ws -import scala.concurrent.{ Await, Promise } -import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration.{Duration, DurationInt} import akka.http.scaladsl.Http import akka.http.scaladsl.model.Uri.apply import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade @@ -25,6 +25,7 @@ import akka.util.ByteString import akka.stream.testkit.scaladsl.TestSink import akka.testkit._ +import java.util.concurrent.TimeUnit import scala.util.{ Failure, Success } class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer( @@ -244,6 +245,61 @@ class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer( resTry.failed.get.getMessage should ===("Connection failed.") } } + + "fail the materialized future if elements are not received within receive-idle-timeout" in { + val bindingFuture = Http().newServerAt("localhost", 0).bindSync({ + _.attribute(webSocketUpgrade).get.handleMessages(Flow.apply, None) + }) + val binding = Await.result(bindingFuture, 3.seconds.dilated) + val myPort = binding.localAddress.getPort + + import system.dispatcher + val future = Source(1 to 10).map(_ => TextMessage("dummy")) + .map(x => { + Await.result(Future { + Thread.sleep(200) + }, Duration.apply(3, TimeUnit.SECONDS)) + x + }) + .via(Http().webSocketClientFlow( + WebSocketRequest("ws://127.0.01:" + myPort), + settings = ClientConnectionSettings("akka.http.client.websocket.receive-idle-timeout = 1s")) + ).toMat(Sink.ignore)(Keep.right).run() + + whenReady(future.map(r => Success(r)).recover { case ex => Failure(ex) }) { result => + result.isFailure should ===(true) + } + } + + "fail the materialized future if elements are not send within send-idle-timeout" in { + import system.dispatcher + val bindingFuture = Http().newServerAt("localhost", 0).bindSync({ + _.attribute(webSocketUpgrade).get.handleMessagesWithSinkSource( + Sink.ignore, + Source(1 to 10).map(_ => TextMessage("dummy")) + .map(x => { + Await.result(Future { + Thread.sleep(200) + }, Duration.apply(3, TimeUnit.SECONDS)) + x + }) + ) + }) + val binding = Await.result(bindingFuture, 3.seconds.dilated) + val myPort = binding.localAddress.getPort + + val future = Source.maybe.via( + Http().webSocketClientFlow( + WebSocketRequest("ws://127.0.01:" + myPort), + settings = ClientConnectionSettings("akka.http.client.websocket.send-idle-timeout = 1s") + ) + ).toMat(Sink.ignore)(Keep.right).run() + + import system.dispatcher + whenReady(future.map(r => Success(r)).recover { case ex => Failure(ex) }) { result => + result.isFailure should ===(true) + } + } } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerReceiveIdleTimeoutSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerReceiveIdleTimeoutSpec.scala new file mode 100644 index 0000000000..834dbaa8ef --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerReceiveIdleTimeoutSpec.scala @@ -0,0 +1,57 @@ +package akka.http.impl.engine.ws + +import akka.Done +import akka.http.impl.util.AkkaSpecWithMaterializer +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade +import akka.http.scaladsl.model.Uri.apply +import akka.http.scaladsl.model.ws._ +import akka.stream.scaladsl._ +import akka.stream.testkit._ +import akka.testkit._ +import org.scalatest.concurrent.Eventually + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Promise} +import scala.util.{Failure, Success} + +class WebSocketServerReceiveIdleTimeoutSpec extends AkkaSpecWithMaterializer( + """ + akka { + stream.materializer.debug.fuzzing-mode=off + http.server.websocket.log-frames = on + http.client.websocket.log-frames = on + http.server.websocket.receive-idle-timeout = 1s + } + """) with Eventually { + + "A WebSocket server" must { + + "terminate the handler flow with an error when elements are not received within receive-idle-timeout" in Utils.assertAllStagesStopped { + import system.dispatcher + val handlerTermination = Promise[Done]() + val handler = Flow[Message].map(identity).watchTermination() { (_, terminationFuture) => + terminationFuture.onComplete { + case Success(_) => handlerTermination.trySuccess(Done) + case Failure(exception) => handlerTermination.tryFailure(exception) + } + } + + val bindingFuture = Http().newServerAt("localhost", 0) + .bindSync({ + _.attribute(webSocketUpgrade).get.handleMessages(handler.recover { case ex => { + handlerTermination.failure(ex) + TextMessage("dummy") + } + }, None) + }) + val binding = Await.result(bindingFuture, 3.seconds.dilated) + val myPort = binding.localAddress.getPort + + Source.maybe.via(Http().webSocketClientFlow(WebSocketRequest("ws://127.0.01:" + myPort))).to(Sink.ignore).run() + + handlerTermination.future.failed.futureValue + binding.unbind() + } + } +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerSendIdleTimeoutSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerSendIdleTimeoutSpec.scala new file mode 100644 index 0000000000..efe42ac3b6 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketServerSendIdleTimeoutSpec.scala @@ -0,0 +1,62 @@ +package akka.http.impl.engine.ws + +import akka.Done +import akka.http.impl.util.AkkaSpecWithMaterializer +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade +import akka.http.scaladsl.model.Uri.apply +import akka.http.scaladsl.model.ws._ +import akka.stream.scaladsl._ +import akka.stream.testkit._ +import akka.testkit._ +import org.scalatest.concurrent.Eventually + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.{Duration, DurationInt} +import scala.concurrent.{Await, Future, Promise} +import scala.util.{Failure, Success} + +class WebSocketServerSendIdleTimeoutSpec extends AkkaSpecWithMaterializer( + """ + akka { + stream.materializer.debug.fuzzing-mode=off + http.server.websocket.log-frames = on + http.client.websocket.log-frames = on + http.server.websocket.send-idle-timeout = 1s + } + """) with Eventually { + + "A WebSocket server" must { + + "terminate the handler flow with an error when elements are not sent within send-idle-timeout" in Utils.assertAllStagesStopped { + import system.dispatcher + val handlerTermination = Promise[Done]() + val handler = Flow[Message].map(identity).watchTermination() { (_, terminationFuture) => + terminationFuture.onComplete { + case Success(_) => handlerTermination.trySuccess(Done) + case Failure(exception) => handlerTermination.tryFailure(exception) + } + } + + val bindingFuture = Http().newServerAt("localhost", 0) + .bindSync({ + _.attribute(webSocketUpgrade).get.handleMessages(handler.recover { case ex => { + handlerTermination.failure(ex) + TextMessage("dummy") + } + }, None) + }) + val binding = Await.result(bindingFuture, 3.seconds.dilated) + val myPort = binding.localAddress.getPort + + Source(1 to 10).map(_ => { + Await.result(Future(Thread.sleep(200)), Duration(3, TimeUnit.SECONDS)) + TextMessage("dummy") + }) + .via(Http().webSocketClientFlow(WebSocketRequest("ws://127.0.01:" + myPort))).to(Sink.ignore).run() + + handlerTermination.future.failed.futureValue + binding.unbind() + } + } +}