Home Rxswift Operator Flatmap
Post
Cancel

Rxswift Operator Flatmap

FlatMap

Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.

1
2
3
4
public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
    -> Observable<Source.Element> {
        return FlatMap(source: self.asObservable(), selector: selector)
}

final private class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer

1
2
3
4
5
6
7
8
9
10
11
typealias Selector = (SourceElement) throws -> SourceSequence
private let _source: Observable<SourceElement>
private let _selector: Selector

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
    let sink = FlatMapSink(selector: self._selector, observer: observer, cancel: cancel)
    let subscription = sink.run(self._source)
    return (sink: sink, subscription: subscription)
}


private final class MergeSinkIter<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : ObserverType where Observer.Element == SourceSequence.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typealias Parent = MergeSink<SourceElement, SourceSequence, Observer>
typealias DisposeKey = CompositeDisposable.DisposeKey

private let _parent: Parent
private let _disposeKey: DisposeKey

func on(_ event: Event<Element>) {
    self._parent._lock.lock(); defer { self._parent._lock.unlock() } // lock {
        switch event {
        case .next(let value):
            self._parent.forwardOn(.next(value))
        case .error(let error):
            self._parent.forwardOn(.error(error))
            self._parent.dispose()
        case .completed:
            self._parent._group.remove(for: self._disposeKey)
            self._parent._activeCount -= 1
            self._parent.checkCompleted()
        }
    // }
}

private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>: Sink, ObserverType where Observer.Element == SourceSequence.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
typealias ResultType = Observer.Element
typealias Element = SourceElement

let _lock = RecursiveLock()

var subscribeNext: Bool {
    return true
}

// state
let _group = CompositeDisposable()
let _sourceSubscription = SingleAssignmentDisposable()

var _activeCount = 0
var _stopped = false

func performMap(_ element: SourceElement) throws -> SourceSequence {
    rxAbstractMethod()
}


@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
    self._lock.lock(); defer { self._lock.unlock() } // {
        if !self.subscribeNext {
            return nil
        }

        do {
            let value = try self.performMap(element)
            self._activeCount += 1
            return value
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return nil
        }
    // }
}

func on(_ event: Event<SourceElement>) {
    switch event {
    case .next(let element):
        if let value = self.nextElementArrived(element: element) {
          // subscribe inner
          // subscribe next as observable
            self.subscribeInner(value.asObservable())
        }
    case .error(let error):
        self._lock.lock(); defer { self._lock.unlock() }
        self.forwardOn(.error(error))
        self.dispose()
    case .completed:
        self._lock.lock(); defer { self._lock.unlock() }
      // mark stopped then dispose(跟source解除订阅关系,这里不会再收到订阅) the checkComplete
        self._stopped = true
        self._sourceSubscription.dispose()
        self.checkCompleted()
    }
}

func subscribeInner(_ source: Observable<Observer.Element>) {
    let iterDisposable = SingleAssignmentDisposable()
    if let disposeKey = self._group.insert(iterDisposable) {
        let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
        let subscription = source.subscribe(iter)
        iterDisposable.setDisposable(subscription)
    }
}


func run(_ sources: [Observable<Observer.Element>]) -> Disposable {
    self._activeCount += sources.count

    for source in sources {
        self.subscribeInner(source)
    }

    self._stopped = true

    self.checkCompleted()

    return self._group
}

func run(_ source: Observable<SourceElement>) -> Disposable {
    _ = self._group.insert(self._sourceSubscription)

    let subscription = source.subscribe(self)
    self._sourceSubscription.setDisposable(subscription)

    return self._group
}

@inline(__always)
func checkCompleted() {
    if self._stopped && self._activeCount == 0 {
      // forward `complte` and then dispose(跟所包装的observer解除订阅关系)
      // 这个条件意味着,就算source已经解除订阅,只要还有内部订阅在(activeCount > 0),还是可以给所包装的Observer发送事件
        self.forwardOn(.completed)
        self.dispose()
    }
}

private final class FlatMapSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : MergeSink<SourceElement, SourceSequence, Observer> where Observer.Element == SourceSequence.Element

1
2
3
4
5
6
typealias Selector = (SourceElement) throws -> SourceSequence
private let _selector: Selector

override func performMap(_ element: SourceElement) throws -> SourceSequence {
    return try self._selector(element)
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
		let disposeBag = DisposeBag()
		let obvb1 = PublishSubject<String>()
		let obvb2 = PublishSubject<String>()
		let obvb = BehaviorSubject(value: obvb1);
		let obvbb = obvb.flatMap { $0 }
		obvbb.subscribe(onNext: {
			debugPrint("event: \($0)")
		}, onCompleted: {
			debugPrint("completed")
		}).disposed(by: disposeBag)
				
		obvb1.onNext("abc")
		obvb1.onNext("def")
		
		obvb.onNext(obvb2)
		obvb.onCompleted() // 根订阅已经结束,但是不影响下面继续发送事件
		obvb1.onNext("ghi")
		obvb1.onNext("bbq")
		
		obvb2.onNext("kkk")
		obvb2.onNext("jjj")
		obvb1.onCompleted()
		obvb2.onCompleted()
This post is licensed under CC BY 4.0 by the author.