Home Rxswift Operator Combinelatest
Post
Cancel

Rxswift Operator Combinelatest

class CombineLatestSink<Observer: ObserverType> : Sink , CombineLatestProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
typealias Element = Observer.Element 

let _lock = RecursiveLock()

private let _arity: Int
private var _numberOfValues = 0
private var _numberOfDone = 0
private var _hasValue: [Bool]
private var _isDone: [Bool]

init(arity: Int, observer: Observer, cancel: Cancelable) {
    self._arity = arity
    self._hasValue = [Bool](repeating: false, count: arity)
    self._isDone = [Bool](repeating: false, count: arity)

    super.init(observer: observer, cancel: cancel)
}

func getResult() throws -> Element {
    rxAbstractMethod()
}

func next(_ index: Int) {
    if !self._hasValue[index] {
        self._hasValue[index] = true
        self._numberOfValues += 1
    }

    if self._numberOfValues == self._arity {
        do {
          // 凑齐值则发送结果
            let result = try self.getResult()
            self.forwardOn(.next(result))
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
        }
    }
    else {
        var allOthersDone = true
// allOthersDone, 但是还没有凑齐结果值,那永远不可能凑齐,所以直接complete
        for i in 0 ..< self._arity {
            if i != index && !self._isDone[i] {
                allOthersDone = false
                break
            }
        }

        if allOthersDone {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

func fail(_ error: Swift.Error) {
    self.forwardOn(.error(error))
    self.dispose()
}

func done(_ index: Int) {
    if self._isDone[index] {
        return
    }

    self._isDone[index] = true
    self._numberOfDone += 1

  // 全部done, 才complete
    if self._numberOfDone == self._arity {
        self.forwardOn(.completed)
        self.dispose()
    }
}

final class CombineLatestObserver : ObserverType , LockOwnerType , SynchronizedOnType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typealias ValueSetter = (Element) -> Void
    
private let _parent: CombineLatestProtocol

let _lock: RecursiveLock
private let _index: Int
private let _this: Disposable
private let _setLatestValue: ValueSetter

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next(let value):
      // 更新最新值
        self._setLatestValue(value)
        self._parent.next(self._index)
    case .error(let error):
        self._this.dispose()
        self._parent.fail(error)
    case .completed:
        self._this.dispose()
        self._parent.done(self._index)
    }
}

final class CombineLatestSink2_<E1, E2, Observer: ObserverType> : CombineLatestSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typealias Result = Observer.Element
typealias Parent = CombineLatest2<E1, E2, Result>

let _parent: Parent

var _latestElement1: E1! = nil
var _latestElement2: E2! = nil

init(parent: Parent, observer: Observer, cancel: Cancelable) {
    self._parent = parent
    super.init(arity: 2, observer: observer, cancel: cancel)
}


func run() -> Disposable {
    let subscription1 = SingleAssignmentDisposable()
    let subscription2 = SingleAssignmentDisposable()

    let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
    let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)

     subscription1.setDisposable(self._parent._source1.subscribe(observer1))
     subscription2.setDisposable(self._parent._source2.subscribe(observer2))

    return Disposables.create([
            subscription1,
            subscription2
    ])
}

override func getResult() throws-> Result {
    return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
}

final class CombineLatest2<E1, E2, Result> : Producer

1
2
3
4
5
6
7
8
9
10
11
12
typealias ResultSelector = (E1, E2) throws -> Result

let _source1: Observable<E1>
let _source2: Observable<E2>

let _resultSelector: ResultSelector

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
    let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}
1
2
3
4
5
6
7
8
public static func combineLatest<O1: ObservableType, O2: ObservableType, O3: ObservableType>
    (_ source1: O1, _ source2: O2, _ source3: O3, resultSelector: @escaping (O1.Element, O2.Element, O3.Element) throws -> Element)
        -> Observable<Element> {
    return CombineLatest3(
        source1: source1.asObservable(), source2: source2.asObservable(), source3: source3.asObservable(),
        resultSelector: resultSelector
    )
}
This post is licensed under CC BY 4.0 by the author.