Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Fixing memory leaks #1170

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
self.printAllCapturedLogs()
}

self.lock.withLockVoid {
self._logCaptures = []
}

for node in self._nodes {
node.log.warning("======================== TEST TEAR DOWN: SHUTDOWN ========================")
try! await node.shutdown().wait()
Expand All @@ -173,7 +177,6 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
self.lock.withLockVoid {
self._nodes = []
self._testKits = []
self._logCaptures = []
}

if self.inspectDetectActorLeaks {
Expand All @@ -187,7 +190,10 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
}

public func testKit(_ system: ClusterSystem) -> ActorTestKit {
guard let idx = self._nodes.firstIndex(where: { s in s.cluster.node == system.cluster.node }) else {
let idx = self.lock.withLock {
self._nodes.firstIndex(where: { s in s.cluster.node == system.cluster.node })
}
guard let idx else {
fatalError("Must only call with system that was spawned using `setUpNode()`, was: \(system)")
}

Expand Down
25 changes: 15 additions & 10 deletions Sources/DistributedActorsTestKit/LogCapture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public final class LogCapture {

public func logger(label: String) -> Logger {
self.captureLabel = label
return Logger(label: "LogCapture(\(label))", LogCaptureLogHandler(label: label, self))
let handler = LogCaptureLogHandler(label: label, settings: self.settings) { [weak self] log in
self?.append(log)
}
return Logger(label: "LogCapture(\(label))", handler)
}

func append(_ log: CapturedLogMessage) {
Expand Down Expand Up @@ -197,17 +200,19 @@ public struct CapturedLogMessage {

struct LogCaptureLogHandler: LogHandler {
let label: String
let capture: LogCapture
let settings: LogCapture.Settings
let append: (CapturedLogMessage) -> Void

init(label: String, _ capture: LogCapture) {
init(label: String, settings: LogCapture.Settings, append: @escaping (CapturedLogMessage) -> Void) {
self.label = label
self.capture = capture
self.settings = settings
self.append = append
}

public func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) {
let actorPath = self.metadata["actor/path"].map { "\($0)" } ?? ""

guard self.capture.settings.filterActorPaths.contains(where: { path in
guard self.settings.filterActorPaths.contains(where: { path in
if (path == "") { // TODO(swift): rdar://98691039 String.starts(with:) has a bug when given an empty string, so we have to avoid it
return true
}
Expand All @@ -216,13 +221,13 @@ struct LogCaptureLogHandler: LogHandler {
}) else {
return // ignore this actor's logs, it was filtered out
}
guard !self.capture.settings.excludeActorPaths.contains(actorPath) else {
guard !self.settings.excludeActorPaths.contains(actorPath) else {
return // actor was excluded explicitly
}
guard self.capture.settings.grep.isEmpty || self.capture.settings.grep.contains(where: { "\(message)".contains($0) }) else {
guard self.settings.grep.isEmpty || self.settings.grep.contains(where: { "\(message)".contains($0) }) else {
return // log was included explicitly
}
guard !self.capture.settings.excludeGrep.contains(where: { "\(message)".contains($0) }) else {
guard !self.settings.excludeGrep.contains(where: { "\(message)".contains($0) }) else {
return // log was excluded explicitly
}

Expand All @@ -231,7 +236,7 @@ struct LogCaptureLogHandler: LogHandler {
_metadata.merge(metadata ?? [:], uniquingKeysWith: { _, r in r })
_metadata["label"] = "\(self.label)"

self.capture.append(CapturedLogMessage(date: date, level: level, message: message, metadata: _metadata, file: file, function: function, line: line))
self.append(CapturedLogMessage(date: date, level: level, message: message, metadata: _metadata, file: file, function: function, line: line))
}

public subscript(metadataKey metadataKey: String) -> Logger.Metadata.Value? {
Expand All @@ -247,7 +252,7 @@ struct LogCaptureLogHandler: LogHandler {

public var logLevel: Logger.Level {
get {
self.capture.settings.minimumLogLevel
self.settings.minimumLogLevel
}
set {
// ignore, we always collect all logs
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActorsTestKit/TestProbes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -853,8 +853,8 @@ private distributed actor _TestProbeInternal: LifecycleWatch {
private var onEnqueue: ((Item) -> Void)?

init() {
self.items = AsyncStream { continuation in
self.onEnqueue = { item in
self.items = AsyncStream { [weak self] continuation in
self?.onEnqueue = { item in
continuation.yield(item)
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedCluster/ActorRefProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ internal struct RemoteActorRefProvider: _ActorRefProvider {
private let localNode: Cluster.Node
private let localProvider: LocalActorRefProvider

let cluster: ClusterShell
weak var cluster: ClusterShell?
// TODO: should cache perhaps also associations to inject them eagerly to actor refs?

// TODO: restructure it somehow, perhaps we dont need the full abstraction like this
init(
settings: ClusterSystemSettings,
cluster: ClusterShell,
cluster: ClusterShell?,
localProvider: LocalActorRefProvider
) {
precondition(settings.enabled, "Remote actor provider should only be used when clustering is enabled")
Expand Down
10 changes: 7 additions & 3 deletions Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ public struct ClusterControl {
///
/// This sequence begins with a snapshot of the current cluster state and continues with events representing changes
/// since the snapshot.
public let events: ClusterEventStream
public var events: ClusterEventStream {
self._events
}

internal var _events: ClusterEventStream
Comment on lines +32 to +36
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know why I've added this 🫠 revert back


/// Offers a snapshot of membership, which may be used to perform ad-hoc tests against the membership.
/// Note that this view may be immediately outdated after checking if, if e.g. a membership change is just being processed.
Expand Down Expand Up @@ -67,14 +71,14 @@ public struct ClusterControl {
}
}

private let cluster: ClusterShell?
private weak var cluster: ClusterShell?
internal let ref: ClusterShell.Ref

init(_ settings: ClusterSystemSettings, cluster: ClusterShell?, clusterRef: ClusterShell.Ref, eventStream: ClusterEventStream) {
self.settings = settings
self.cluster = cluster
self.ref = clusterRef
self.events = eventStream
self._events = eventStream

var initialMembership: Cluster.Membership = .empty
_ = initialMembership.join(settings.bindNode)
Expand Down
6 changes: 5 additions & 1 deletion Sources/DistributedCluster/Cluster/ClusterEventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Logging
public struct ClusterEventStream: AsyncSequence {
public typealias Element = Cluster.Event

private let actor: ClusterEventStreamActor?
private var actor: ClusterEventStreamActor?

internal init(_ system: ClusterSystem, customName: String? = nil) {
let props = ClusterEventStreamActor.props(customName: customName)
Expand All @@ -37,6 +37,10 @@ public struct ClusterEventStream: AsyncSequence {
self.actor = nil
}

internal mutating func clean() {
self.actor = nil
}

nonisolated func subscribe(_ ref: _ActorRef<Cluster.Event>, file: String = #filePath, line: UInt = #line) {
Task {
await self._subscribe(ref, file: file, line: line)
Expand Down
27 changes: 18 additions & 9 deletions Sources/DistributedCluster/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ internal class ClusterShell {

// 2) Ensure the failure detector knows about this node
Task {
await self._swimShell.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
await self._swimShell?.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): rename once https://github.com/apple/swift/pull/42098 is implemented
__secretlyKnownToBeLocal.monitor(node: associated.handshake.remoteNode)
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ internal class ClusterShell {
// it's gossip will also propagate the information through the cluster
traceLog_Remote(system.cluster.node, "Finish terminate association [\(remoteNode)]: Notifying SWIM, .confirmDead")
system.log.warning("Confirm .dead to underlying SWIM, node: \(reflecting: remoteNode)")
self._swimShell.confirmDead(node: remoteNode)
self._swimShell?.confirmDead(node: remoteNode)

// it is important that we first check the contains; as otherwise we'd re-add a .down member for what was already removed (!)
if state.membership.contains(remoteNode) {
Expand Down Expand Up @@ -244,9 +244,9 @@ internal class ClusterShell {
return pool
}

internal private(set) var _swimShell: SWIMActor!
internal private(set) var _swimShell: SWIMActor?

private var clusterEvents: ClusterEventStream!
private var clusterEvents: ClusterEventStream?

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Cluster Shell, reference used for issuing commands to the cluster
Expand Down Expand Up @@ -364,6 +364,11 @@ internal class ClusterShell {
.init()
.supervision(strategy: .escalate) // always escalate failures, if this actor fails we're in big trouble -> terminate the system
._asWellKnown

deinit {
self.clusterEvents?.clean()
self._swimShell?.clean()
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -429,10 +434,14 @@ extension ClusterShell {
}
)

guard let events = self.clusterEvents else {
throw ClusterSystemError(.shuttingDown(""))
}

var state = ClusterShellState(
settings: self.settings,
channel: chan,
events: self.clusterEvents,
events: events,
gossiperControl: gossiperControl,
log: context.log
)
Expand All @@ -452,7 +461,7 @@ extension ClusterShell {
}

private func publish(_ event: Cluster.Event) {
self.publish(event, to: self.clusterEvents)
self.clusterEvents.map { self.publish(event, to: $0) }
}

private func publish(_ event: Cluster.Event, to eventStream: ClusterEventStream) {
Expand Down Expand Up @@ -633,7 +642,7 @@ extension ClusterShell {

Task { [eventsToPublish] in
for event in eventsToPublish {
await self.clusterEvents.publish(event)
await self.clusterEvents?.publish(event)
}
}

Expand Down Expand Up @@ -663,7 +672,7 @@ extension ClusterShell {

func tryConfirmDeadToSWIM(_ context: _ActorContext<Message>, _ state: ClusterShellState, change: Cluster.MembershipChange) {
if change.status.isAtLeast(.down) {
self._swimShell.confirmDead(node: change.member.node)
self._swimShell?.confirmDead(node: change.member.node)
}
}

Expand Down Expand Up @@ -1271,7 +1280,7 @@ extension ClusterShell {
}

// whenever we down a node we must ensure to confirm it to swim, so it won't keep monitoring it forever needlessly
self._swimShell.confirmDead(node: memberToDown.node)
self._swimShell?.confirmDead(node: memberToDown.node)

if memberToDown.node == state.selfNode {
// ==== ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ internal actor DistributedNodeDeathWatcher {
// initialized

let events = actorSystem.cluster.events
self.eventListenerTask = Task {
self.eventListenerTask = Task { [weak self] in
for try await event in events {
switch event {
case .membershipChange(let change):
self.membershipChanged(change)
await self?.membershipChanged(change)
case .snapshot(let membership):
let diff = Cluster.Membership._diff(from: .empty, to: membership)
for change in diff.changes {
self.membershipChanged(change)
await self?.membershipChanged(change)
}
case .leadershipChange, .reachabilityChange:
break // ignore those, they don't affect downing
case ._PLEASE_DO_NOT_EXHAUSTIVELY_MATCH_THIS_ENUM_NEW_CASES_MIGHT_BE_ADDED_IN_THE_FUTURE:
self.log.error("Received Cluster.Event [\(event)]. This should not happen, please file an issue.")
self?.log.error("Received Cluster.Event [\(event)]. This should not happen, please file an issue.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ internal distributed actor DowningStrategyShell {
init(_ strategy: DowningStrategy, system: ActorSystem) async {
self.strategy = strategy
self.actorSystem = system
let events = system.cluster.events
self.eventsListeningTask = Task { [weak self] in
try await self?.whenLocal { myself in
for await event in system.cluster.events {
for await event in events {
try await self?.whenLocal { myself in
try myself.receiveClusterEvent(event)
}
}
}
}

deinit {
print("\(Self.self) DEINIT")
self.eventsListeningTask?.cancel()
// self.eventsListeningTask = nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,22 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist,
"\(Self.self) expects to be on well known path: /system/receptionist, but was: \(self.id.fullDescription)"
) // TODO(distributed): remove when we remove paths entirely

self.eventsListeningTask = Task { [weak self, system] in
try await self?.whenLocal { __secretlyKnownToBeLocal in
for try await event in system.cluster.events {
__secretlyKnownToBeLocal.onClusterEvent(event: event)
let events = system.cluster.events
self.eventsListeningTask = Task { [weak self] in
for try await event in events {
await self?.whenLocal { myself in
myself.onClusterEvent(event: event)
}
}
}

let interval = system.settings.receptionist.ackPullReplicationIntervalSlow
// === timers ------------------
// periodically gossip to other receptionists with the last seqNr we've seen,
// and if it happens to be outdated by then this will cause a push from that node.
self.slowACKReplicationTimerTask = Task { [weak self] in
await self?.whenLocal { myself in
for await _ in AsyncTimerSequence.repeating(every: system.settings.receptionist.ackPullReplicationIntervalSlow, clock: .continuous) {
for await _ in AsyncTimerSequence.repeating(every: interval, clock: .continuous) {
await self?.whenLocal { myself in
myself.periodicAckTick()
}
}
Expand Down
Loading