SwitchLatest
Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
final private class SwitchSinkIter<SourceType, Source: ObservableConvertibleType, Observer: ObserverType> : ObserverType, LockOwnerType, SynchronizedOnType where Source.Element == Observer.Element
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typealias Element = Source.Element
typealias Parent = SwitchSink<SourceType, Source, Observer>
private let _parent: Parent
private let _id: Int
private let _self: Disposable
var _lock: RecursiveLock {
return self._parent._lock
}
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next: break
case .error, .completed:
self._self.dispose()
}
if self._parent._latest != self._id {
return
}
switch event {
case .next:
self._parent.forwardOn(event)
case .error:
self._parent.forwardOn(event)
self._parent.dispose()
case .completed:
self._parent._hasLatest = false
if self._parent._stopped {
self._parent.forwardOn(event)
self._parent.dispose()
}
}
}
private class SwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: Sink , ObserverType where Source.Element == Observer.Element
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
typealias Element = SourceType
private let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
private let _innerSubscription: SerialDisposable = SerialDisposable()
let _lock = RecursiveLock()
// state
fileprivate var _stopped = false
fileprivate var _latest = 0
fileprivate var _hasLatest = false
func run(_ source: Observable<SourceType>) -> Disposable {
// subscribe the source
let subscription = source.subscribe(self)
self._subscriptions.setDisposable(subscription)
return Disposables.create(_subscriptions, _innerSubscription)
}
func performMap(_ element: SourceType) throws -> Source {
rxAbstractMethod()
}
@inline(__always)
final private func nextElementArrived(element: Element) -> (Int, Observable<Source.Element>)? {
self._lock.lock(); defer { self._lock.unlock() } // {
do {
let observable = try self.performMap(element).asObservable()
self._hasLatest = true
self._latest = self._latest &+ 1
return (self._latest, observable)
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
return nil
// }
}
// handle events
func on(_ event: Event<Element>) {
switch event {
case .next(let element):
if let (latest, observable) = self.nextElementArrived(element: element) {
let d = SingleAssignmentDisposable()
self._innerSubscription.disposable = d
// create a `SwitchSinkIter` observer to subscribe the new element as observable subscribe
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
let disposable = observable.subscribe(observer)
d.setDisposable(disposable)
}
case .error(let error):
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.error(error))
self.dispose()
case .completed:
self._lock.lock(); defer { self._lock.unlock() }
self._stopped = true
self._subscriptions.dispose()
if !self._hasLatest {
// 没有latest, 即没有发送收到过next, 而是直接收到complete, 则发送`completed` then `dispose`
self.forwardOn(.completed)
self.dispose()
}
}
}
final private class SwitchIdentitySink<Source: ObservableConvertibleType, Observer: ObserverType>: SwitchSink<Source, Source, Observer> where Observer.Element == Source.Element
1
2
3
override func performMap(_ element: Source) throws -> Source {
return element
}
final private class Switch<Source: ObservableConvertibleType>: Producer
1
2
3
4
5
6
7
private let _source: Observable<Source>
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Source.Element {
let sink = SwitchIdentitySink<Source, Observer>(observer: observer, cancel: cancel)
let subscription = sink.run(self._source)
return (sink: sink, subscription: subscription)
}
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🏈")
subject1.onNext("🏀")
subjectsSubject.onNext(subject2)
subject1.onNext("⚾️")
subject2.onNext("🍐")