FlatMapLatest
Projects each element of an observable sequence into a new sequence of observable sequences and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
1
2
3
4
public func flatMapLatest<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
-> Observable<Source.Element> {
return FlatMapLatest(source: self.asObservable(), selector: selector)
}
final private class FlatMapLatest<SourceType, Source: ObservableConvertibleType>: Producer
1
2
3
4
5
6
7
8
9
typealias Selector = (SourceType) throws -> Source
private let _source: Observable<SourceType>
private let _selector: Selector
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Source.Element {
let sink = MapSwitchSink<SourceType, Source, Observer>(selector: self._selector, observer: observer, cancel: cancel)
let subscription = sink.run(self._source)
return (sink: sink, subscription: subscription)
}
private class SwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: Sink, ObserverType where Source.Element == Observer.Element
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
private let _innerSubscription: SerialDisposable = SerialDisposable()
let _lock = RecursiveLock()
// state
fileprivate var _stopped = false
fileprivate var _latest = 0
fileprivate var _hasLatest = false
func run(_ source: Observable<SourceType>) -> Disposable {
let subscription = source.subscribe(self)
self._subscriptions.setDisposable(subscription)
return Disposables.create(_subscriptions, _innerSubscription)
}
func performMap(_ element: SourceType) throws -> Source {
rxAbstractMethod()
}
@inline(__always)
final private func nextElementArrived(element: Element) -> (Int, Observable<Source.Element>)? {
self._lock.lock(); defer { self._lock.unlock() } // {
do {
let observable = try self.performMap(element).asObservable()
self._hasLatest = true
self._latest = self._latest &+ 1
return (self._latest, observable)
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
return nil
// }
}
func on(_ event: Event<Element>) {
switch event {
case .next(let element):
if let (latest, observable) = self.nextElementArrived(element: element) {
let d = SingleAssignmentDisposable()
// dispose the last inner subscription
self._innerSubscription.disposable = d
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
// create a new inner subscription
let disposable = observable.subscribe(observer)
d.setDisposable(disposable)
}
case .error(let error):
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.error(error))
self.dispose()
case .completed:
self._lock.lock(); defer { self._lock.unlock() }
// mark as stopped then dispose the outer subscription
self._stopped = true
self._subscriptions.dispose()
if !self._hasLatest {// does not hasLatest (i.e. there's no inner subscription yet)
// forward `complete` and dispose
self.forwardOn(.completed)
self.dispose()
}
}
}
final private class MapSwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: SwitchSink<SourceType, Source, Observer> where Observer.Element == Source.Element
1
2
3
4
5
6
typealias Selector = (SourceType) throws -> Source
private let _selector: Selector
override func performMap(_ element: SourceType) throws -> Source {
return try self._selector(element)
}