From cbc49d1270336965cf79ab71b3dd36c1b32cbd2c Mon Sep 17 00:00:00 2001 From: Oleh Hudeichuk Date: Fri, 27 Mar 2020 09:52:13 +0200 Subject: [PATCH] fix race conditions --- ActionCableSwift.podspec | 4 +- Package.swift | 2 +- Sources/ActionCableSwift/ACChannel.swift | 90 +++++++++---------- .../ActionCableSwift/ActionCableSwift.swift | 85 ++++++++++++------ Sources/ActionCableSwift/Helpers.swift | 85 +++++++++++------- 5 files changed, 157 insertions(+), 109 deletions(-) diff --git a/ActionCableSwift.podspec b/ActionCableSwift.podspec index 3f35248..f6d1b2d 100644 --- a/ActionCableSwift.podspec +++ b/ActionCableSwift.podspec @@ -9,7 +9,7 @@ Pod::Spec.new do |s| s.name = 'ActionCableSwift' s.module_name = 'ActionCableSwift' - s.version = '0.2.1' + s.version = '0.3.0' s.summary = '🏰 Action Cable Swift is a client library being released for Action Cable Rails 5 which makes it easy to add real-time features to your app.' s.swift_version = '5.1' @@ -32,4 +32,4 @@ Pod::Spec.new do |s| s.source_files = 'Sources/**/*' s.frameworks = 'Foundation' s.dependency 'SwiftExtensionsPack', '~> 0.2.9' -end \ No newline at end of file +end diff --git a/Package.swift b/Package.swift index 335f81d..c2f3cc8 100644 --- a/Package.swift +++ b/Package.swift @@ -19,7 +19,7 @@ let package = Package( .target( name: "ActionCableSwift", dependencies: [ - "SwiftExtensionsPack" + .product(name: "SwiftExtensionsPack", package: "SwiftExtensionsPack") ]), .testTarget( name: "ActionCableSwiftTests", diff --git a/Sources/ActionCableSwift/ACChannel.swift b/Sources/ActionCableSwift/ACChannel.swift index ed637af..5f27ed6 100644 --- a/Sources/ActionCableSwift/ACChannel.swift +++ b/Sources/ActionCableSwift/ACChannel.swift @@ -67,6 +67,8 @@ public class ACChannel { self.options = options ?? ACChannelOptions() setupAutoSubscribe() setupOnTextCallbacks() + setupOnCancelledCallbacks() + setupOnDisconnectCallbacks() } public func subscribe() throws { @@ -108,87 +110,77 @@ public class ACChannel { private func setupAutoSubscribe() { if options.autoSubscribe { if client?.isConnected ?? false { try? subscribe() } - client?.addOnConnected { [weak self] (headers) in + self.client?.addOnConnected { [weak self] (headers) in guard let self = self else { return } - try? self.subscribe() + self.channelSerialQueue.async { + try? self.subscribe() + } } } } private func setupOnDisconnectCallbacks() { - client?.addOnDisconnected { [weak self] (reason) in + client?.addOnDisconnected { [weak self] (reason) in guard let self = self else { return } - self.isSubscribed = false + self.channelSerialQueue.async { + self.isSubscribed = false + self.executeCallback(callbacks: self.onUnsubscribe) + } } } private func setupOnCancelledCallbacks() { client?.addOnCancelled { [weak self] in guard let self = self else { return } - self.isSubscribed = false + self.channelSerialQueue.async { + self.isSubscribed = false + self.executeCallback(callbacks: self.onUnsubscribe) + } } } private func setupOnTextCallbacks() { client?.addOnText { [weak self] (text) in guard let self = self else { return } - let message = ACSerializer.responseFrom(stringData: text) - switch message.type { - case .confirmSubscription: - self.isSubscribed = true - self.executeCallback(callbacks: self.onSubscribe, message: message) - self.flushBuffer() - case .rejectSubscription: - self.isSubscribed = false - self.executeCallback(callbacks: self.onRejectSubscription, message: message) - case .cancelSubscription: - self.isSubscribed = false - self.executeCallback(callbacks: self.onUnsubscribe, message: message) - case .message: - self.executeCallback(callbacks: self.onMessage, message: message) - case .ping: - self.executeCallback(callbacks: self.onPing) - default: break + self.channelSerialQueue.async { + let message = ACSerializer.responseFrom(stringData: text) + switch message.type { + case .confirmSubscription: + self.isSubscribed = true + self.executeCallback(callbacks: self.onSubscribe, message: message) + self.flushBuffer() + case .rejectSubscription: + self.isSubscribed = false + self.executeCallback(callbacks: self.onRejectSubscription, message: message) + case .cancelSubscription: + self.isSubscribed = false + self.executeCallback(callbacks: self.onUnsubscribe, message: message) + case .message: + self.executeCallback(callbacks: self.onMessage, message: message) + case .ping: + self.client?.pingRoundWatcher.ping() + self.executeCallback(callbacks: self.onPing) + default: break + } } } - - client?.addOnDisconnected { [weak self] (reason) in - guard let self = self else { return } - self.isSubscribed = false - self.executeCallback(callbacks: self.onUnsubscribe) - } - - client?.addOnCancelled { [weak self] in - guard let self = self else { return } - self.isSubscribed = false - self.executeCallback(callbacks: self.onUnsubscribe) - } } private func executeCallback(callbacks: [ACResponseCallback], message: ACMessage) { - channelSerialQueue.async { [weak self] in - guard let self = self else { return } - for closure in callbacks { - closure(self, message) - } + for closure in callbacks { + closure(self, message) } } private func executeCallback(callbacks: [ACResponseCallbackWithOptionalMessage]) { - channelSerialQueue.async { [weak self] in - guard let self = self else { return } - for closure in callbacks { - closure(self, nil) - } + for closure in callbacks { + closure(self, nil) } } private func flushBuffer() { - channelSerialQueue.async { [weak self] in - guard let self = self else { return } - while let closure = self.actionsBuffer.popLast() { - closure() - } + while let closure = self.actionsBuffer.popLast() { + closure() } } } diff --git a/Sources/ActionCableSwift/ActionCableSwift.swift b/Sources/ActionCableSwift/ActionCableSwift.swift index ca74cdd..39a32aa 100644 --- a/Sources/ActionCableSwift/ActionCableSwift.swift +++ b/Sources/ActionCableSwift/ActionCableSwift.swift @@ -8,9 +8,11 @@ public final class ACClient { public var headers: [String: String]? public let pingRoundWatcher = PingRoundWatcher() public var options: ACClientOptions - + private var channels: [String: ACChannel] = [:] private let clientConcurrentQueue = DispatchQueue(label: "com.ACClient.Conccurent", attributes: .concurrent) + private let isConnectedLock: NSLock = .init() + private let sendLock: NSLock = .init() /// callbacks private var onConnected: [((_ headers: [String: String]?) -> Void)] = [] @@ -21,6 +23,20 @@ public final class ACClient { private var onPing: [(() -> Void)] = [] private var onPong: [(() -> Void)] = [] + public init(ws: ACWebSocketProtocol, + headers: [String: String]? = nil, + options: ACClientOptions? = nil + ) { + self.ws = ws + self.headers = headers + self.options = options ?? ACClientOptions() + setupWSCallbacks() + pingRoundWatcher.client = self + if self.options.reconnect { + self.pingRoundWatcher.start() + } + } + public func addOnConnected(_ handler: @escaping (_ headers: [String: String]?) -> Void) { onConnected.append(handler) } @@ -49,42 +65,36 @@ public final class ACClient { onPong.append(handler) } - public init(ws: ACWebSocketProtocol, - headers: [String: String]? = nil, - options: ACClientOptions? = nil - ) { - self.ws = ws - self.headers = headers - self.options = options ?? ACClientOptions() - setupWSCallbacks() - } - subscript(name: String) -> ACChannel? { channels[name] } public func connect() { + isConnectedLock.lock() ws.connect(headers: headers) - if self.options.reconnect { - self.pingRoundWatcher.client = self - self.pingRoundWatcher.start() - } + isConnectedLock.unlock() } public func disconnect() { + isConnectedLock.lock() ws.disconnect() + isConnectedLock.unlock() } public func send(text: String, _ completion: (() -> Void)? = nil) { + sendLock.lock() ws.send(text: text) { completion?() } + sendLock.unlock() } public func send(data: Data, _ completion: (() -> Void)? = nil) { + sendLock.lock() ws.send(data: data) { completion?() } + sendLock.unlock() } @discardableResult @@ -96,27 +106,30 @@ public final class ACClient { private func setupWSCallbacks() { ws.onConnected = { [weak self] headers in guard let self = self else { return } - self.isConnected = true + self.setIsConnected(to: true) self.clientConcurrentQueue.async { [headers] in - for closure in self.onConnected { + let closures = self.onConnected + for closure in closures { closure(headers) } } } ws.onDisconnected = { [weak self] reason in guard let self = self else { return } - self.isConnected = false + self.setIsConnected(to: false) self.clientConcurrentQueue.async { [reason] in - for closure in self.onDisconnected { + let closures = self.onDisconnected + for closure in closures { closure(reason) } } } ws.onCancelled = { [weak self] in guard let self = self else { return } - self.isConnected = false + self.setIsConnected(to: false) self.clientConcurrentQueue.async { - for closure in self.onCancelled { + let closures = self.onCancelled + for closure in closures { closure() } } @@ -124,7 +137,8 @@ public final class ACClient { ws.onText = { [weak self] text in guard let self = self else { return } self.clientConcurrentQueue.async { [text] in - for closure in self.onText { + let closures = self.onText + for closure in closures { closure(text) } } @@ -132,7 +146,8 @@ public final class ACClient { ws.onBinary = { [weak self] data in guard let self = self else { return } self.clientConcurrentQueue.async { [data] in - for closure in self.onBinary { + let closures = self.onBinary + for closure in closures { closure(data) } } @@ -140,20 +155,40 @@ public final class ACClient { ws.onPing = { [weak self] in guard let self = self else { return } self.clientConcurrentQueue.async { - for closure in self.onPing { + let closures = self.onPing + for closure in closures { closure() } } } ws.onPong = { [weak self] in guard let self = self else { return } + let closures = self.onPong self.clientConcurrentQueue.async { - for closure in self.onPong { + for closure in closures { closure() } } } } + + func setIsConnected(to: Bool) { + isConnectedLock.lock() + isConnected = to + isConnectedLock.unlock() + } + + func getIsConnected() -> Bool { + isConnectedLock.lock() + let result = isConnected + isConnectedLock.unlock() + + return result + } + + deinit { + pingRoundWatcher.setFinish(to: true) + } } diff --git a/Sources/ActionCableSwift/Helpers.swift b/Sources/ActionCableSwift/Helpers.swift index 3f98b04..0c80662 100644 --- a/Sources/ActionCableSwift/Helpers.swift +++ b/Sources/ActionCableSwift/Helpers.swift @@ -20,7 +20,7 @@ public struct ACClientOptions { public var debug = false #endif - public var reconnect: Bool = true + public var reconnect: Bool = false public init() {} @@ -124,13 +124,20 @@ public struct ACMessage { public final class PingRoundWatcher { - var lastTimePoint: Int64 = 0 - var pingInterval: UInt32 = 3 - var cheksRange: Int64 = 9 - var difference: Int64 = 0 + weak var client: ACClient? - private var started = false - private let lock = NSLock() + var pingLimit: Int64 = 6 + var finish: Bool = false + var checksDelay: Float32 { + get { Float32(_checksDelay) / 1_000_000 } + set { _checksDelay = UInt32(newValue * 1_000_000) } + } + private var _checksDelay: UInt32 = 500_000 + private var lastTimePoint: Int64 = 0 + private var started: Bool = false + private let lock: NSLock = .init() + private let startInfoLock: NSLock = .init() + init(client: ACClient? = nil) { self.client = client @@ -138,63 +145,77 @@ public final class PingRoundWatcher { func start() { if isStarted() { return } - updateLastPoint() Thread { [weak self] in guard let self = self else { return } self.setStarted(to: true) while true { - sleep(self.pingInterval) - if self.client?.isConnected ?? false { - self.updateLastPoint() - } else if !self.checkDifference() { - self.updateLastPoint() - self.client?.isConnected = false + if self.finish { return } + + if !self.isConnected() { self.client?.disconnect() + usleep(200_000) self.client?.connect() + usleep(self._checksDelay) + self.updateLastPoint() + continue + } + if self.isWorks() { + usleep(self._checksDelay) + continue + } else { + self.lock.lock() + self.client?.setIsConnected(to: false) + self.lock.unlock() + usleep(self._checksDelay) } } }.start() } - public func getDifference() -> Int64 { - setDifference() + public func ping() { + updateLastPoint() + } + + private func updateLastPoint() { lock.lock() - let diff = Date().toSeconds() - lastTimePoint + lastTimePoint = Date().toSeconds() lock.unlock() - - return diff } public func isStarted() -> Bool { - lock.lock() - let result = started - lock.unlock() + startInfoLock.lock() + let result: Bool = started + startInfoLock.unlock() return result } - public func setStarted(to: Bool) { - lock.lock() + private func setStarted(to: Bool) { + startInfoLock.lock() started = to - lock.unlock() + startInfoLock.unlock() } - - private func checkDifference() -> Bool { - return getDifference() <= cheksRange + private func isConnected() -> Bool { + self.client?.getIsConnected() ?? false } - private func setDifference() { + public func setFinish(to: Bool) { lock.lock() - difference = Date().toSeconds() - lastTimePoint + finish = to lock.unlock() } - private func updateLastPoint() { + private func isWorks() -> Bool { lock.lock() - lastTimePoint = Date().toSeconds() + let result: Bool = !self.isOldPing() lock.unlock() + return result + } + + private func isOldPing() -> Bool { + (Date().toSeconds() - lastTimePoint) >= pingLimit } }