Home Rxswift Operator Merge
Post
Cancel

Rxswift Operator Merge

Merge

Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence.

Code

1
2
3
public func merge() -> Observable<Element.Element> {
    return Merge(source: self.asObservable())
}

private final class MergeSinkIter<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : ObserverType where Observer.Element == SourceSequence.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typealias Parent = MergeSink<SourceElement, SourceSequence, Observer>
private let _parent: Parent
private let _disposeKey: DisposeKey

func on(_ event: Event<Element>) {
    self._parent._lock.lock(); defer { self._parent._lock.unlock() } // lock {
        switch event {
        case .next(let value):
            self._parent.forwardOn(.next(value))
        case .error(let error):
          // dispose parent when error
          // 也就是说只要有其中一个信号报error, combine得到的整个observerable就dispose
            self._parent.forwardOn(.error(error))
            self._parent.dispose()
        case .completed:
            self._parent._group.remove(for: self._disposeKey)
            self._parent._activeCount -= 1
          // minus activeCount by one when complete
            self._parent.checkCompleted()
        }
    // }
}

private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : Sink, ObserverType where Observer.Element == SourceSequence.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
let _group = CompositeDisposable()
let _sourceSubscription = SingleAssignmentDisposable()

var subscribeNext: Bool {
    return true
}

// map sourceElement to SourceSequence(Obsservable)
func performMap(_ element: SourceElement) throws -> SourceSequence {
    rxAbstractMethod()
}

func run(_ sources: [Observable<Observer.Element>]) -> Disposable {
    self._activeCount += sources.count

  // subscribe all sources 
    for source in sources {
        self.subscribeInner(source)
    }

  // mark stopped and checkComplete
    self._stopped = true

    self.checkCompleted()

    return self._group
}

func run(_ source: Observable<SourceElement>) -> Disposable {
    _ = self._group.insert(self._sourceSubscription)
// 监听source
    let subscription = source.subscribe(self)
    self._sourceSubscription.setDisposable(subscription)

    return self._group
}

@inline(__always)
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
    self._lock.lock(); defer { self._lock.unlock() } // {
        if !self.subscribeNext {
          // return nil if not subscirbeNext
            return nil
        }

        do {
            let value = try self.performMap(element)
            self._activeCount += 1
          // return mapped value and increase activeCount by one
            return value
        }
        catch let e {
          // catch map error and forward `error`
            self.forwardOn(.error(e))
            self.dispose()
            return nil
        }
    // }
}

func on(_ event: Event<SourceElement>) {
    switch event {
    case .next(let element):
        if let value = self.nextElementArrived(element: element) {
          // subscribe (new element as observable)
            self.subscribeInner(value.asObservable())
        }
    case .error(let error):
        self._lock.lock(); defer { self._lock.unlock() }
      // send `error` and `dispose`
        self.forwardOn(.error(error))
        self.dispose()
    case .completed:
        self._lock.lock(); defer { self._lock.unlock() }
      // mark stopped and checkComplete
        self._stopped = true
        self._sourceSubscription.dispose()
        self.checkCompleted()
    }
}

private final class MergeBasicSink<Source: ObservableConvertibleType, Observer: ObserverType> : MergeSink<Source, Source, Observer> where Observer.Element == Source.Element

1
2
3
override func performMap(_ element: Source) throws -> Source {
    return element
}

final class Merge<SourceSequence: ObservableConvertibleType> : Producer

1
2
3
4
5
6
7
private let _source: Observable<SourceSequence>

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
    let sink = MergeBasicSink<SourceSequence, Observer>(observer: observer, cancel: cancel)
    let subscription = sink.run(self._source)
    return (sink: sink, subscription: subscription)
}

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")

subject1.onNext("🅱️")

subject2.onNext("①")

subject2.onNext("②")

subject1.onNext("🆎")

subject2.onNext("③")
This post is licensed under CC BY 4.0 by the author.