Zip
Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.
Code
1
2
3
4
5
6
7
8
public static func zip<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
-> Observable<Element> {
return Zip2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: resultSelector
)
}
final class ZipObserver : ObserverType, LockOwnerType, SynchronizedOnType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typealias ValueSetter = (Element) -> Void
private var _parent: ZipSinkProtocol?
let _lock: RecursiveLock
// state
private let _index: Int
private let _this: Disposable
private let _setNextValue: ValueSetter
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
if self._parent != nil {
switch event {
case .next:
break
case .error:
self._this.dispose()
case .completed:
self._this.dispose()
}
}
if let parent = self._parent {
switch event {
case .next(let value):
self._setNextValue(value)
parent.next(self._index)
case .error(let error):
parent.fail(error)
case .completed:
parent.done(self._index)
}
}
}
class ZipSink<Observer: ObserverType> : Sink, ZipSinkProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
let _arity: Int
let _lock = RecursiveLock()
private var _isDone: [Bool]
init(arity: Int, observer: Observer, cancel: Cancelable) {
self._isDone = [Bool](repeating: false, count: arity)
self._arity = arity
super.init(observer: observer, cancel: cancel)
}
func getResult() throws -> Element {
rxAbstractMethod()
}
func hasElements(_ index: Int) -> Bool {
rxAbstractMethod()
}
func next(_ index: Int) {
var hasValueAll = true
for i in 0 ..< self._arity {
if !self.hasElements(i) {
hasValueAll = false
break
}
}
if hasValueAll {
do {
// 跟SwiftSignalKit中combileLatest类似,都有值才发送结果
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
// catch and send `error then dispose
self.forwardOn(.error(e))
self.dispose()
}
}
}
// send `error` then dispose
func fail(_ error: Swift.Error) {
self.forwardOn(.error(error))
self.dispose()
}
// send complete then dispose when all done
func done(_ index: Int) {
self._isDone[index] = true
var allDone = true
for done in self._isDone where !done {
allDone = false
break
}
if allDone {
self.forwardOn(.completed)
self.dispose()
}
}
final class ZipSink2_<E1, E2, Observer: ObserverType> : ZipSink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
typealias Result = Observer.Element
typealias Parent = Zip2<E1, E2, Result>
let _parent: Parent
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
}
override func hasElements(_ index: Int) -> Bool {
switch index {
case 0: return !self._values1.isEmpty
case 1: return !self._values2.isEmpty
default:
rxFatalError("Unhandled case (Function)")
}
}
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: self._lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: self._lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
// 分别订阅两个ZipObserver, 来收集数据
subscription1.setDisposable(self._parent.source1.subscribe(observer1))
subscription2.setDisposable(self._parent.source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
// 获取最终要发送的结果
override func getResult() throws -> Result {
return try self._parent._resultSelector(self._values1.dequeue()!, self._values2.dequeue()!)
}
final class Zip2<E1, E2, Result> : Producer
1
2
3
4
5
6
7
8
9
10
11
typealias ResultSelector = (E1, E2) throws -> Result
let source1: Observable<E1>
let source2: Observable<E2>
let _resultSelector: ResultSelector
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}