Scan
Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value.
1
2
3
4
5
6
7
public func scan<A>(_ seed: A, accumulator: @escaping (A, Element) throws -> A)
-> Observable<A> {
return Scan(source: self.asObservable(), seed: seed) { acc, element in
let currentAcc = acc
acc = try accumulator(currentAcc, element)
}
}
final private class ScanSink<Element, Observer: ObserverType>: Sink, ObserverType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typealias Accumulate = Observer.Element
typealias Parent = Scan<Element, Accumulate>
private let _parent: Parent
private var _accumulate: Accumulate
func on(_ event: Event<Element>) {
switch event {
case .next(let element):
do {
try self._parent._accumulator(&self._accumulate, element)
// too easy to understand
self.forwardOn(.next(self._accumulate))
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
final private class Scan<Element, Accumulate>: Producer
类似于Swift标准库里的reduce
1
2
3
4
5
6
7
8
9
10
11
typealias Accumulator = (inout Accumulate, Element) throws -> Void
private let _source: Observable<Element>
fileprivate let _seed: Accumulate
fileprivate let _accumulator: Accumulator
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Accumulate {
let sink = ScanSink(parent: self, observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}