Skip to content

Commit

Permalink
Add shareReplay operator
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiybutz committed Feb 5, 2020
1 parent 31462b0 commit eb2b8b5
Show file tree
Hide file tree
Showing 17 changed files with 4,621 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.swiftpm
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ This repo contains XCombine, a Swift module, developed on top of the Combine fra
- [Zip Operator](#zip-operator)
- [General Structure](#general-structure)
- [Example of Usage](#example-of-usage)
- [ShareReplay Operator](#sharereplay-operator)
- [Example of Usage](#example-of-usage-sharereplay)
- [License](#license)

## Installation
Expand Down Expand Up @@ -97,8 +99,46 @@ events.send("foo")
events.send("bar")
```

## ShareReplay Operator

XCombine offers the `shareReplay` operator that is not yet present in the Combine framework. It returns a publisher that shares a single subscription to the upstream. It also buffers a given number of latest incoming stream values and immediately upon subscription replays them. XCombine's `shareReplay` operator features:

* autoconnect (reference counting) mechanism

* a circular buffer for caching-related optimization

* a fine grained back pressure handling

### <a name="example-of-usage-sharereplay"></a>Example of Usage

The following snippet is the updated example from the [blog post][combine-sharereplay-operator] on developing your own `shareReplay` operator. It demonstrates the use of XCombine's `shareReplay` operator.

```swift
import Combine
import XCombine

let measurements = PassthroughSubject<Int, Never>()

let diagramDataSource = measurements
    .share(replay: 3)

let subscriber1 = diagramDataSource
    .sink(
        receiveCompletion: { completion in
            print("Subscriber 1:", completion)
        },
        receiveValue: { temperature in
            print("Subscriber 1:", temperature)
        }
    )

measurements.send(100)
measurements.send(110)
```

## License

This project is licensed under the MIT license.

[combine-insight-into-zip-operator]: https://sergebouts.github.io/combine-insight-into-zip-operator/
[combine-sharereplay-operator]: https://sergebouts.github.io/combine-sharereplay-operator/
131 changes: 131 additions & 0 deletions Sources/XCombine/CircularBuffer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//
// CircularBuffer.swift
// XCombine
//
// Created by Serge Bouts on 10/12/19.
// Copyright © 2019 Serge Bouts. All rights reserved.
//

import Foundation

/// Circular buffer errors.
enum CircularBufferError: Error, Equatable {
case invalidCapacity
case overflow
case isEmpty
case outOfRange
}

/// A circular buffer implementation.
///
/// See also: [Circular buffer](https://en.wikipedia.org/wiki/Circular_buffer)
struct CircularBuffer<Element> {
// MARK: - Properties

private var data: [Element]

private var head: Int = 0

private let lock = NSLock()

// MARK: - Initialization

/// Creates an instance with the buffer of `capacity` elements size.
///
/// - Parameter capacity: The buffer's capacity.
/// - Throws: `CircularBufferError.invalidCapacity` if the capacity value is wrong.
public init(capacity: Int) throws {
guard capacity > 0 else { throw CircularBufferError.invalidCapacity }

self.capacity = capacity
self.data = []
// `Int.max` capacity value is a special case, for which we don't reserve capacity at all.
if capacity < Int.max {
data.reserveCapacity(capacity)
}
}

// MARK: - API

/// The buffer's capacity.
private(set) var capacity: Int

/// The buffer's current size.
private(set) var count = 0

/// Returns the index'th element if the index is not out of range;
/// returns `nil` otherwise.
subscript(safe index: Int) -> Element? {
lock.lock()
defer { lock.unlock() }

guard index >= 0 && index < count else { return nil }

let index = (head + index) % capacity

return data[index]
}

/// Returns the index'th element if the index is correct;
/// throws otherwise.
///
/// - Parameter index: The element's index.
/// - Throws: `CircularBufferError.outOfRange` if the index is out of range.
/// - Returns: An element if the index is correct.
func get(at index: Int) throws -> Element {
guard let result = self[safe: index] else { throw CircularBufferError.outOfRange }
return result
}

/// Appends an element at the end of the buffer if the buffer is not full;
/// throws otherwise.
///
/// - Parameter element: The element to append.
/// - Throws: `CircularBufferError.overflow` if the buffer if full.
mutating func append(_ element: Element) throws {
lock.lock()
defer { lock.unlock() }

guard !isFull else { throw CircularBufferError.overflow }

if data.count < capacity {
data.append(element)
} else {
data[(head + count) % capacity] = element
}

count += 1
}

/// Removes the first element from the buffer if the buffer is not empty;
/// throws otherwise.
///
/// - Throws: `CircularBufferError.isEmpty` if the buffer is empty.
mutating func removeFirst() throws {
lock.lock()
defer { lock.unlock() }

guard count > 0 else { throw CircularBufferError.isEmpty }

head = (head + 1) % capacity
count -= 1
}

/// Returns `true` if the buffer if empty;
/// `false` otherwise.
var isEmpty: Bool {
count == 0
}

/// Returns `true` if the buffer if full;
/// `false` otherwise.
var isFull: Bool {
assert(count <= capacity)
return count == capacity
}

/// Returns the number of elements, that can yet be appended.
var freeSpace: Int {
return capacity - count
}
}
Loading

0 comments on commit eb2b8b5

Please sign in to comment.