Home Rxswift Operator Switchlatest
Post
Cancel

Rxswift Operator Switchlatest

SwitchLatest

Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

final private class SwitchSinkIter<SourceType, Source: ObservableConvertibleType, Observer: ObserverType> : ObserverType, LockOwnerType, SynchronizedOnType where Source.Element == Observer.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typealias Element = Source.Element
typealias Parent = SwitchSink<SourceType, Source, Observer>

private let _parent: Parent
private let _id: Int
private let _self: Disposable

var _lock: RecursiveLock {
    return self._parent._lock
}

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next: break
    case .error, .completed:
        self._self.dispose()
    }

    if self._parent._latest != self._id {
        return
    }

    switch event {
    case .next:
        self._parent.forwardOn(event)
    case .error:
        self._parent.forwardOn(event)
        self._parent.dispose()
    case .completed:
        self._parent._hasLatest = false
        if self._parent._stopped {
            self._parent.forwardOn(event)
            self._parent.dispose()
        }
    }
}

private class SwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: Sink , ObserverType where Source.Element == Observer.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
typealias Element = SourceType
private let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
private let _innerSubscription: SerialDisposable = SerialDisposable()

let _lock = RecursiveLock()

// state
fileprivate var _stopped = false
fileprivate var _latest = 0
fileprivate var _hasLatest = false


func run(_ source: Observable<SourceType>) -> Disposable {
  // subscribe the source
    let subscription = source.subscribe(self)
    self._subscriptions.setDisposable(subscription)
    return Disposables.create(_subscriptions, _innerSubscription)
}

func performMap(_ element: SourceType) throws -> Source {
    rxAbstractMethod()
}

@inline(__always)
final private func nextElementArrived(element: Element) -> (Int, Observable<Source.Element>)? {
    self._lock.lock(); defer { self._lock.unlock() } // {
        do {
            let observable = try self.performMap(element).asObservable()
            self._hasLatest = true
            self._latest = self._latest &+ 1
            return (self._latest, observable)
        }
        catch let error {
            self.forwardOn(.error(error))
            self.dispose()
        }

        return nil
    // }
}

// handle events
func on(_ event: Event<Element>) {
    switch event {
    case .next(let element):
        if let (latest, observable) = self.nextElementArrived(element: element) {
            let d = SingleAssignmentDisposable()
            self._innerSubscription.disposable = d
// create a `SwitchSinkIter` observer to subscribe the new element as observable subscribe 
            let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
            let disposable = observable.subscribe(observer)
            d.setDisposable(disposable)
        }
    case .error(let error):
        self._lock.lock(); defer { self._lock.unlock() }
        self.forwardOn(.error(error))
        self.dispose()
    case .completed:
        self._lock.lock(); defer { self._lock.unlock() }
        self._stopped = true

        self._subscriptions.dispose()

        if !self._hasLatest {
          // 没有latest, 即没有发送收到过next, 而是直接收到complete, 则发送`completed` then `dispose`
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

final private class SwitchIdentitySink<Source: ObservableConvertibleType, Observer: ObserverType>: SwitchSink<Source, Source, Observer> where Observer.Element == Source.Element

1
2
3
override func performMap(_ element: Source) throws -> Source {
    return element
}

final private class Switch<Source: ObservableConvertibleType>: Producer

1
2
3
4
5
6
7
private let _source: Observable<Source>

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Source.Element {
    let sink = SwitchIdentitySink<Source, Observer>(observer: observer, cancel: cancel)
    let subscription = sink.run(self._source)
    return (sink: sink, subscription: subscription)
}

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")

let subjectsSubject = BehaviorSubject(value: subject1)

subjectsSubject.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🏈")
subject1.onNext("🏀")

subjectsSubject.onNext(subject2)

subject1.onNext("⚾️")

subject2.onNext("🍐")
This post is licensed under CC BY 4.0 by the author.