Skip to content

Commit

Permalink
core: implement #2419
Browse files Browse the repository at this point in the history
* Add receiveIdleTimeout and sendIdleTimeout config parameters to websocket client and server
* Add idleTimeout stack layer
* Add integration tests
  • Loading branch information
gawronA committed Jul 28, 2023
1 parent f54a8de commit 37a8df2
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 2 deletions.
20 changes: 20 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ akka.http {
}

websocket {
# Maximum allowable time between two consecutive elements being received from client.
# If no data is received within receive-idle-timeout, the server message handler flow is failed with akka.stream.StreamIdleTimeoutException.
# Can be configured together with akka.http.client.server.websocket.periodic-keep-alive-max-idle to catch unexpected
# client disconnections (if client supports pong replies)
receive-idle-timeout = infinite

# Maximum allowable time between two consecutive elements being send to client.
# If no data is sent within send-idle-timeout, server message handler flow is failed with akka.stream.StreamIdleTimeoutException.
send-idle-timeout = infinite

# periodic keep alive may be implemented using by sending Ping frames
# upon which the other side is expected to reply with a Pong frame,
# or by sending a Pong frame, which serves as unidirectional heartbeat.
Expand Down Expand Up @@ -495,6 +505,16 @@ akka.http {

#client-settings
websocket {
# Maximum allowable time between two consecutive elements being received from server.
# If no data is received within receive-idle-timeout, the client flow is failed with akka.stream.StreamIdleTimeoutException.
# Can be configured together with akka.http.client.client.websocket.periodic-keep-alive-max-idle to catch unexpected
# network disconnections (if server supports pong replies)
receive-idle-timeout = infinite

# Maximum allowable time between two consecutive elements being send to server.
# If no data is sent within send-idle-timeout, the client flow is failed with akka.stream.StreamIdleTimeoutException.
send-idle-timeout = infinite

# periodic keep alive may be implemented using by sending Ping frames
# upon which the other side is expected to reply with a Pong frame,
# or by sending a Pong frame, which serves as unidirectional heartbeat.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[http] object WebSocket {
masking(serverSide, websocketSettings.randomFactory) atop
FrameLogger.logFramesIfEnabled(websocketSettings.logFrames) atop
frameHandling(serverSide, closeTimeout, log) atop
idleTimeout(websocketSettings) atop
periodicKeepAlive(websocketSettings) atop
messageAPI(serverSide, closeTimeout)

Expand Down Expand Up @@ -89,6 +90,21 @@ private[http] object WebSocket {
private[this] final val PongFullFrame: FrameStart = FrameEvent.fullFrame(Opcode.Pong, None, ByteString.empty, fin = true)
private[this] final val mkDirectAnswerPong = () => DirectAnswer(PongFullFrame)

/** The layer that transparently monitors data flow in both directions and fails the flow
* with [[akka.stream.StreamIdleTimeoutException]] if elements are not passed within configured time.
* */
def idleTimeout(settings: WebSocketSettings): BidiFlow[FrameHandler.Output, FrameHandler.Output, FrameOutHandler.Input, FrameOutHandler.Input, NotUsed] = {
val receiveIdleTimeoutFlow = settings.receiveIdleTimeout match {
case receiveIdleTimeout: FiniteDuration => Flow[FrameHandler.Output].idleTimeout(receiveIdleTimeout)
case _ => Flow[FrameHandler.Output].map(identity)
}
val sendIdleTimeoutFlow = settings.sendIdleTimeout match {
case sendIdleTimeout: FiniteDuration => Flow[Input].idleTimeout(sendIdleTimeout)
case _ => Flow[Input].map(identity)
}

BidiFlow.fromFlows(receiveIdleTimeoutFlow, sendIdleTimeoutFlow)
}
/**
* The layer that implements all low-level frame handling, like handling control frames, collecting messages
* from frames, decoding text messages, close handling, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import scala.concurrent.duration.Duration
@InternalApi
private[akka] final case class WebSocketSettingsImpl(
randomFactory: () => Random,
receiveIdleTimeout: Duration,
sendIdleTimeout: Duration,
periodicKeepAliveMode: String,
periodicKeepAliveMaxIdle: Duration,
periodicKeepAliveData: () => ByteString,
Expand Down Expand Up @@ -57,6 +59,8 @@ private[akka] object WebSocketSettingsImpl { // on purpose not extending Setting
val c = inner
WebSocketSettingsImpl(
Randoms.SecureRandomInstances,
c.getPotentiallyInfiniteDuration("receive-idle-timeout"),
c.getPotentiallyInfiniteDuration("send-idle-timeout"),
c.getString("periodic-keep-alive-mode"), // mode could be extended to be a factory of pings, if we'd need control over the data field
c.getPotentiallyInfiniteDuration("periodic-keep-alive-max-idle"),
NoPeriodicKeepAliveData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import scala.concurrent.duration.Duration
@DoNotInherit
trait WebSocketSettings { self: WebSocketSettingsImpl =>
def getRandomFactory: Supplier[Random]
def receiveIdleTimeout: Duration
def sendIdleTimeout: Duration
def periodicKeepAliveMode: String
def periodicKeepAliveMaxIdle: Duration
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ abstract class WebSocketSettings extends akka.http.javadsl.settings.WebSocketSet
override final val getRandomFactory: Supplier[Random] = new Supplier[Random] {
override def get(): Random = self.randomFactory()
}
override def receiveIdleTimeout: Duration
override def sendIdleTimeout: Duration
override def periodicKeepAliveMode: String
override def periodicKeepAliveMaxIdle: Duration
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package akka.http.impl.engine.ws

import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration.{Duration, DurationInt}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade
Expand All @@ -25,6 +25,7 @@ import akka.util.ByteString
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit._

import java.util.concurrent.TimeUnit
import scala.util.{ Failure, Success }

class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer(
Expand Down Expand Up @@ -244,6 +245,61 @@ class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer(
resTry.failed.get.getMessage should ===("Connection failed.")
}
}

"fail the materialized future if elements are not received within receive-idle-timeout" in {
val bindingFuture = Http().newServerAt("localhost", 0).bindSync({
_.attribute(webSocketUpgrade).get.handleMessages(Flow.apply, None)
})
val binding = Await.result(bindingFuture, 3.seconds.dilated)
val myPort = binding.localAddress.getPort

import system.dispatcher
val future = Source(1 to 10).map(_ => TextMessage("dummy"))
.map(x => {
Await.result(Future {
Thread.sleep(200)
}, Duration.apply(3, TimeUnit.SECONDS))
x
})
.via(Http().webSocketClientFlow(
WebSocketRequest("ws://127.0.01:" + myPort),
settings = ClientConnectionSettings("akka.http.client.websocket.receive-idle-timeout = 1s"))
).toMat(Sink.ignore)(Keep.right).run()

whenReady(future.map(r => Success(r)).recover { case ex => Failure(ex) }) { result =>
result.isFailure should ===(true)
}
}

"fail the materialized future if elements are not send within send-idle-timeout" in {
import system.dispatcher
val bindingFuture = Http().newServerAt("localhost", 0).bindSync({
_.attribute(webSocketUpgrade).get.handleMessagesWithSinkSource(
Sink.ignore,
Source(1 to 10).map(_ => TextMessage("dummy"))
.map(x => {
Await.result(Future {
Thread.sleep(200)
}, Duration.apply(3, TimeUnit.SECONDS))
x
})
)
})
val binding = Await.result(bindingFuture, 3.seconds.dilated)
val myPort = binding.localAddress.getPort

val future = Source.maybe.via(
Http().webSocketClientFlow(
WebSocketRequest("ws://127.0.01:" + myPort),
settings = ClientConnectionSettings("akka.http.client.websocket.send-idle-timeout = 1s")
)
).toMat(Sink.ignore)(Keep.right).run()

import system.dispatcher
whenReady(future.map(r => Success(r)).recover { case ex => Failure(ex) }) { result =>
result.isFailure should ===(true)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package akka.http.impl.engine.ws

import akka.Done
import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.model.ws._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.testkit._
import org.scalatest.concurrent.Eventually

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Promise}
import scala.util.{Failure, Success}

class WebSocketServerReceiveIdleTimeoutSpec extends AkkaSpecWithMaterializer(
"""
akka {
stream.materializer.debug.fuzzing-mode=off
http.server.websocket.log-frames = on
http.client.websocket.log-frames = on
http.server.websocket.receive-idle-timeout = 1s
}
""") with Eventually {

"A WebSocket server" must {

"terminate the handler flow with an error when elements are not received within receive-idle-timeout" in Utils.assertAllStagesStopped {
import system.dispatcher
val handlerTermination = Promise[Done]()
val handler = Flow[Message].map(identity).watchTermination() { (_, terminationFuture) =>
terminationFuture.onComplete {
case Success(_) => handlerTermination.trySuccess(Done)
case Failure(exception) => handlerTermination.tryFailure(exception)
}
}

val bindingFuture = Http().newServerAt("localhost", 0)
.bindSync({
_.attribute(webSocketUpgrade).get.handleMessages(handler.recover { case ex => {
handlerTermination.failure(ex)
TextMessage("dummy")
}
}, None)
})
val binding = Await.result(bindingFuture, 3.seconds.dilated)
val myPort = binding.localAddress.getPort

Source.maybe.via(Http().webSocketClientFlow(WebSocketRequest("ws://127.0.01:" + myPort))).to(Sink.ignore).run()

handlerTermination.future.failed.futureValue
binding.unbind()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package akka.http.impl.engine.ws

import akka.Done
import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.AttributeKeys.webSocketUpgrade
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.model.ws._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.testkit._
import org.scalatest.concurrent.Eventually

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}

class WebSocketServerSendIdleTimeoutSpec extends AkkaSpecWithMaterializer(
"""
akka {
stream.materializer.debug.fuzzing-mode=off
http.server.websocket.log-frames = on
http.client.websocket.log-frames = on
http.server.websocket.send-idle-timeout = 1s
}
""") with Eventually {

"A WebSocket server" must {

"terminate the handler flow with an error when elements are not sent within send-idle-timeout" in Utils.assertAllStagesStopped {
import system.dispatcher
val handlerTermination = Promise[Done]()
val handler = Flow[Message].map(identity).watchTermination() { (_, terminationFuture) =>
terminationFuture.onComplete {
case Success(_) => handlerTermination.trySuccess(Done)
case Failure(exception) => handlerTermination.tryFailure(exception)
}
}

val bindingFuture = Http().newServerAt("localhost", 0)
.bindSync({
_.attribute(webSocketUpgrade).get.handleMessages(handler.recover { case ex => {
handlerTermination.failure(ex)
TextMessage("dummy")
}
}, None)
})
val binding = Await.result(bindingFuture, 3.seconds.dilated)
val myPort = binding.localAddress.getPort

Source(1 to 10).map(_ => {
Await.result(Future(Thread.sleep(200)), Duration(3, TimeUnit.SECONDS))
TextMessage("dummy")
})
.via(Http().webSocketClientFlow(WebSocketRequest("ws://127.0.01:" + myPort))).to(Sink.ignore).run()

handlerTermination.future.failed.futureValue
binding.unbind()
}
}
}

0 comments on commit 37a8df2

Please sign in to comment.