Extending Combine with a custom ShareReplay operator
January 29, 2020
#swift
#combine
Combine is a functional reactive programming framework introduced by Apple on WWDC 2019. It was most probably inspired by ReactiveX and its implementations like RxSwift, RxJava, etc. There is even a cheatsheet you can use to compare Combine and RxSwift abstractions and find similar core components, operators, etc. It turns out that for some of the operators defined in ReactiveX there are no counterparts implemented in Combine.
In today’s article, I will share a way to extend the Combine framework with a custom ReplaySubject
and ShareReplay
operator.
ShareReplay operator
The ShareReplay is used to share a single subscription to the upstream publisher and replay items emitted by that one. It can be described with the next marble diagram:
This operator could be useful when you’re doing an expensive operation like a network request. Most of the times there is no need to perform it for each subscriber. The better solution would be to execute it once, then cache and multicast the results.
Defining the ShareReplay operator
Let’s start with defining the ShareReplay
operator. This could be done by creating an extension of the Publisher
protocol:
extension Publisher {
/// Provides a subject that shares a single subscription to the upstream publisher and
/// replays at most `bufferSize` items emitted by that publisher
/// - Parameter bufferSize: limits the number of items that can be replayed
func shareReplay(_ bufferSize: Int) -> AnyPublisher<Output, Failure> {
return multicast(subject: ReplaySubject(maxValues: bufferSize)).autoconnect().eraseToAnyPublisher()
}
}
As you can see above, we are using multicast
operator with a ReplaySubject
instance. The subject is used to deliver elements to multiple downstream subscribers. The operator returns a connectable publisher and autoconnect()
is used to simplify connecting or disconnecting from one.
It is important to note that any data flow within Combine framework involves three main components:
- Publisher - describes how values and errors are produced over time.
- Subscriber - registers on a Publisher to receive values, including completion.
- Subscription - controls the flow of data from a Publisher to a Subscriber.
In our case we need to define ReplaySubject
and ReplaySubjectSubscription
classes that implement Subject
and Subscription
protocols accordingly.
Creating the ReplaySubject
Now we need to create the ReplaySubject
class. It consumes the bufferSize
via initializer to limit the number of items to replay. We’ll be using the buffer
field to cache output values and NSRecursiveLock
instance to make the class thread-safe.
final class ReplaySubject<Output, Failure: Error>: Subject {
private var buffer = [Output]()
private let bufferSize: Int
private let lock = NSRecursiveLock()
init(_ bufferSize: Int = 0) {
self.bufferSize = bufferSize
}
}
Subject is a publisher that allows outside callers to publish elements. To satisfy the Publisher
protocol requirements we should implement the receive
function that is called to attach a Subscriber to the Publisher.
extension ReplaySubject {
private var subscriptions = [ReplaySubjectSubscription<Output, Failure>]()
private var completion: Subscribers.Completion<Failure>?
func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Failure == Failure, Downstream.Input == Output {
lock.lock(); defer { lock.unlock() } ➊
let subscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber))
subscriber.receive(subscription: subscription) ➋
subscriptions.append(subscription)
subscription.replay(buffer, completion: completion) ➌
}
}
➊ Protects the critical section of code with a lock.
➋ Creates the ReplaySubjectSubscription
instance and sends it to the subscriber.
➌ Replays the buffered values and completion event for the current subscription.
Now it is time to define functions from the Subject
protocol. These are used to establish demand for a new upstream subscription and propagate a value or completion event to the subscriber:
extension ReplaySubject {
/// Establishes demand for a new upstream subscriptions
func send(subscription: Subscription) {
lock.lock(); defer { lock.unlock() }
subscription.request(.unlimited)
}
/// Sends a value to the subscriber.
func send(_ value: Output) {
lock.lock(); defer { lock.unlock() }
buffer.append(value)
buffer = buffer.suffix(bufferSize)
subscriptions.forEach { $0.receive(value) }
}
/// Sends a completion event to the subscriber.
func send(completion: Subscribers.Completion<Failure>) {
lock.lock(); defer { lock.unlock() }
self.completion = completion
subscriptions.forEach { subscription in subscription.receive(completion: completion) }
}
}
Implementing a custom Subscription
Next we should implement ReplaySubjectSubscription
class, that conforms to Subscription
protocol. It could be defined as:
final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription {
private let downstream: AnySubscriber<Output, Failure>
private var isCompleted = false
private var demand: Subscribers.Demand = .none
init(downstream: AnySubscriber<Output, Failure>) {
self.downstream = downstream
}
func request(_ newDemand: Subscribers.Demand) {
demand += newDemand
}
func cancel() {
isCompleted = true
}
func receive(_ value: Output) {
guard !isCompleted, demand > 0 else { return }
demand += downstream.receive(value)
demand -= 1
}
func receive(completion: Subscribers.Completion<Failure>) {
guard !isCompleted else { return }
isCompleted = true
downstream.receive(completion: completion)
}
func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) {
guard !isCompleted else { return }
values.forEach { value in receive(value) }
if let completion = completion { receive(completion: completion) }
}
}
As you can see, the implementation is quite straight-forward. Most of the functions are used to send the values and completion event to the downstream subscriber. We are using isCompleted
flag to check if subscription is finished or cancelled. Apart from that we keep track of subscriber’s demand, that indicates how many more elements the subscriber expects to receive.
Using the ShareReplay operator in practice
Just like that, we’ve extended Combine framework with a custom ShareReplay
operator. As I mentioned earlier we can use one to cache the result of a network request and multicast it to multiple subscribers:
func getUsers(_ url: URL) -> AnyPublisher<Result<[User], Error>, Never> {
return URLSession.shared
.dataTaskPublisher(for: url)
.map { $0.data }
.decode(type: [User].self, decoder: JSONDecoder())
.map { users in .success(users) }
.catch { error in return Just(.failure(error)) }
.subscribe(on: DispatchQueue(label: "networking"))
.receive(on: RunLoop.main)
.shareReplay(1)
.eraseToAnyPublisher()
}
let users = getUsers(URL(string: "https://jsonplaceholder.typicode.com/users")!)
let sub1 = users.sink(receiveValue: { value in
print("subscriber1: \(value)\n")
})
let sub2 = users.sink(receiveValue: { value in
print("subscriber2: \(value)\n")
})
Conclusion
In this article we’ve learned the way to define a custom ShareReplay
operator, that could be helpful in reducing duplicated work and enhancing user experience. If you’d like to dive deeply into Combine, I’d suggest having a look at:
- Using Combine - an intermediate to advanced book, that goes into details on how to use the Combine framework provided by Apple.
- OpenCombine - Open-source implementation of Apple’s Combine framework.
You can find the source code of everything described in this blog post on Github. Feel free to play around and reach me out on Twitter if you have any questions, suggestions or feedback.
Thanks for reading!