class CombineLatestSink<Observer: ObserverType> : Sink , CombineLatestProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
typealias Element = Observer.Element
let _lock = RecursiveLock()
private let _arity: Int
private var _numberOfValues = 0
private var _numberOfDone = 0
private var _hasValue: [Bool]
private var _isDone: [Bool]
init(arity: Int, observer: Observer, cancel: Cancelable) {
self._arity = arity
self._hasValue = [Bool](repeating: false, count: arity)
self._isDone = [Bool](repeating: false, count: arity)
super.init(observer: observer, cancel: cancel)
}
func getResult() throws -> Element {
rxAbstractMethod()
}
func next(_ index: Int) {
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1
}
if self._numberOfValues == self._arity {
do {
// 凑齐值则发送结果
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
// allOthersDone, 但是还没有凑齐结果值,那永远不可能凑齐,所以直接complete
for i in 0 ..< self._arity {
if i != index && !self._isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
self.forwardOn(.completed)
self.dispose()
}
}
}
func fail(_ error: Swift.Error) {
self.forwardOn(.error(error))
self.dispose()
}
func done(_ index: Int) {
if self._isDone[index] {
return
}
self._isDone[index] = true
self._numberOfDone += 1
// 全部done, 才complete
if self._numberOfDone == self._arity {
self.forwardOn(.completed)
self.dispose()
}
}
final class CombineLatestObserver : ObserverType , LockOwnerType , SynchronizedOnType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typealias ValueSetter = (Element) -> Void
private let _parent: CombineLatestProtocol
let _lock: RecursiveLock
private let _index: Int
private let _this: Disposable
private let _setLatestValue: ValueSetter
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let value):
// 更新最新值
self._setLatestValue(value)
self._parent.next(self._index)
case .error(let error):
self._this.dispose()
self._parent.fail(error)
case .completed:
self._this.dispose()
self._parent.done(self._index)
}
}
final class CombineLatestSink2_<E1, E2, Observer: ObserverType> : CombineLatestSink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typealias Result = Observer.Element
typealias Parent = CombineLatest2<E1, E2, Result>
let _parent: Parent
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
}
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
subscription1.setDisposable(self._parent._source1.subscribe(observer1))
subscription2.setDisposable(self._parent._source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
override func getResult() throws-> Result {
return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
}
final class CombineLatest2<E1, E2, Result> : Producer
1
2
3
4
5
6
7
8
9
10
11
12
typealias ResultSelector = (E1, E2) throws -> Result
let _source1: Observable<E1>
let _source2: Observable<E2>
let _resultSelector: ResultSelector
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
1
2
3
4
5
6
7
8
public static func combineLatest<O1: ObservableType, O2: ObservableType, O3: ObservableType>
(_ source1: O1, _ source2: O2, _ source3: O3, resultSelector: @escaping (O1.Element, O2.Element, O3.Element) throws -> Element)
-> Observable<Element> {
return CombineLatest3(
source1: source1.asObservable(), source2: source2.asObservable(), source3: source3.asObservable(),
resultSelector: resultSelector
)
}