Home Rxswift Operator Flatmaplatest
Post
Cancel

Rxswift Operator Flatmaplatest

FlatMapLatest

Projects each element of an observable sequence into a new sequence of observable sequences and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

1
2
3
4
public func flatMapLatest<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
    -> Observable<Source.Element> {
        return FlatMapLatest(source: self.asObservable(), selector: selector)
}

final private class FlatMapLatest<SourceType, Source: ObservableConvertibleType>: Producer

1
2
3
4
5
6
7
8
9
typealias Selector = (SourceType) throws -> Source
private let _source: Observable<SourceType>
private let _selector: Selector

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Source.Element {
    let sink = MapSwitchSink<SourceType, Source, Observer>(selector: self._selector, observer: observer, cancel: cancel)
    let subscription = sink.run(self._source)
    return (sink: sink, subscription: subscription)
}

private class SwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: Sink, ObserverType where Source.Element == Observer.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
private let _innerSubscription: SerialDisposable = SerialDisposable()

let _lock = RecursiveLock()

// state
fileprivate var _stopped = false
fileprivate var _latest = 0
fileprivate var _hasLatest = false

func run(_ source: Observable<SourceType>) -> Disposable {
    let subscription = source.subscribe(self)
    self._subscriptions.setDisposable(subscription)
    return Disposables.create(_subscriptions, _innerSubscription)
}

func performMap(_ element: SourceType) throws -> Source {
    rxAbstractMethod()
}

@inline(__always)
final private func nextElementArrived(element: Element) -> (Int, Observable<Source.Element>)? {
    self._lock.lock(); defer { self._lock.unlock() } // {
        do {
            let observable = try self.performMap(element).asObservable()
            self._hasLatest = true
            self._latest = self._latest &+ 1
            return (self._latest, observable)
        }
        catch let error {
            self.forwardOn(.error(error))
            self.dispose()
        }

        return nil
    // }
}

func on(_ event: Event<Element>) {
    switch event {
    case .next(let element):
        if let (latest, observable) = self.nextElementArrived(element: element) {
            let d = SingleAssignmentDisposable()
          
          // dispose the last inner subscription
            self._innerSubscription.disposable = d

            let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
          // create a new inner subscription
            let disposable = observable.subscribe(observer)
            d.setDisposable(disposable)
        }
    case .error(let error):
        self._lock.lock(); defer { self._lock.unlock() }
        self.forwardOn(.error(error))
        self.dispose()
    case .completed:
        self._lock.lock(); defer { self._lock.unlock() }
      // mark as stopped then dispose the outer subscription
        self._stopped = true
      
        self._subscriptions.dispose()

        if !self._hasLatest {// does not hasLatest (i.e. there's no inner subscription yet)
          // forward `complete` and dispose
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

final private class MapSwitchSink<SourceType, Source: ObservableConvertibleType, Observer: ObserverType>: SwitchSink<SourceType, Source, Observer> where Observer.Element == Source.Element

1
2
3
4
5
6
typealias Selector = (SourceType) throws -> Source
private let _selector: Selector

override func performMap(_ element: SourceType) throws -> Source {
    return try self._selector(element)
}
This post is licensed under CC BY 4.0 by the author.