Home Rxswift Operator Zip
Post
Cancel

Rxswift Operator Zip

Zip

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.

Code

1
2
3
4
5
6
7
8
public static func zip<O1: ObservableType, O2: ObservableType>
    (_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
    -> Observable<Element> {
    return Zip2(
        source1: source1.asObservable(), source2: source2.asObservable(),
        resultSelector: resultSelector
    )
}

final class ZipObserver : ObserverType, LockOwnerType, SynchronizedOnType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typealias ValueSetter = (Element) -> Void

private var _parent: ZipSinkProtocol?

let _lock: RecursiveLock

// state
private let _index: Int
private let _this: Disposable
private let _setNextValue: ValueSetter


func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    if self._parent != nil {
        switch event {
        case .next:
            break
        case .error:
            self._this.dispose()
        case .completed:
            self._this.dispose()
        }
    }

    if let parent = self._parent {
        switch event {
        case .next(let value):
            self._setNextValue(value)
            parent.next(self._index)
        case .error(let error):
            parent.fail(error)
        case .completed:
            parent.done(self._index)
        }
    }
}

class ZipSink<Observer: ObserverType> : Sink, ZipSinkProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
let _arity: Int

let _lock = RecursiveLock()

private var _isDone: [Bool]

init(arity: Int, observer: Observer, cancel: Cancelable) {
    self._isDone = [Bool](repeating: false, count: arity)
    self._arity = arity

    super.init(observer: observer, cancel: cancel)
}

func getResult() throws -> Element {
    rxAbstractMethod()
}

func hasElements(_ index: Int) -> Bool {
    rxAbstractMethod()
}

func next(_ index: Int) {
    var hasValueAll = true

    for i in 0 ..< self._arity {
        if !self.hasElements(i) {
            hasValueAll = false
            break
        }
    }

    if hasValueAll {
        do {
          // 跟SwiftSignalKit中combileLatest类似,都有值才发送结果
            let result = try self.getResult()
            self.forwardOn(.next(result))
        }
        catch let e {
          // catch and send `error then dispose
            self.forwardOn(.error(e))
            self.dispose()
        }
    }
}

// send `error` then dispose
func fail(_ error: Swift.Error) {
    self.forwardOn(.error(error))
    self.dispose()
}

// send complete then dispose when all done 
func done(_ index: Int) {
    self._isDone[index] = true

    var allDone = true

    for done in self._isDone where !done {
        allDone = false
        break
    }

    if allDone {
        self.forwardOn(.completed)
        self.dispose()
    }
}

final class ZipSink2_<E1, E2, Observer: ObserverType> : ZipSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
typealias Result = Observer.Element
typealias Parent = Zip2<E1, E2, Result>

let _parent: Parent

var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)

init(parent: Parent, observer: Observer, cancel: Cancelable) {
    self._parent = parent
    super.init(arity: 2, observer: observer, cancel: cancel)
}

override func hasElements(_ index: Int) -> Bool {
    switch index {
    case 0: return !self._values1.isEmpty
    case 1: return !self._values2.isEmpty

    default:
        rxFatalError("Unhandled case (Function)")
    }
}

func run() -> Disposable {
    let subscription1 = SingleAssignmentDisposable()
    let subscription2 = SingleAssignmentDisposable()

    let observer1 = ZipObserver(lock: self._lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
    let observer2 = ZipObserver(lock: self._lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)

  // 分别订阅两个ZipObserver, 来收集数据
    subscription1.setDisposable(self._parent.source1.subscribe(observer1))
    subscription2.setDisposable(self._parent.source2.subscribe(observer2))

    return Disposables.create([
       subscription1,
       subscription2
    ])
}

// 获取最终要发送的结果
override func getResult() throws -> Result {
    return try self._parent._resultSelector(self._values1.dequeue()!, self._values2.dequeue()!)
}

final class Zip2<E1, E2, Result> : Producer

1
2
3
4
5
6
7
8
9
10
11
typealias ResultSelector = (E1, E2) throws -> Result
let source1: Observable<E1>
let source2: Observable<E2>

let _resultSelector: ResultSelector

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
    let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}
This post is licensed under CC BY 4.0 by the author.