TakeUntil
Returns the elements from the source observable sequence until the other observable sequence produces an element.
1
2
3
4
public func takeUntil<Source: ObservableType>(_ other: Source)
-> Observable<Element> {
return TakeUntil(source: self.asObservable(), other: other.asObservable())
}
final private class TakeUntilSink<Other, Observer: ObserverType>: Sink, LockOwnerType, ObserverType, SynchronizedOnType
1
2
3
4
typealias Element = Observer.Element
typealias Parent = TakeUntil<Element, Other>
private let _parent: Parent
let _lock = RecursiveLock()
final private class TakeUntilSinkOther<Other, Observer: ObserverType>: ObserverType, LockOwnerType, SynchronizedOnType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typealias Parent = TakeUntilSink<Other, Observer>
typealias Element = Other
private let _parent: Parent
var _lock: RecursiveLock {
return self._parent._lock
}
fileprivate let _subscription = SingleAssignmentDisposable()
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
// too easy to understand
self._parent.forwardOn(.completed)
self._parent.dispose()
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
case .completed:
self._subscription.dispose()
}
}
final private class TakeUntil<Element, Other>: Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.forwardOn(event)
self.dispose()
}
}
func run() -> Disposable {
let otherObserver = TakeUntilSinkOther(parent: self)
let otherSubscription = self._parent._other.subscribe(otherObserver)
otherObserver._subscription.setDisposable(otherSubscription)
let sourceSubscription = self._parent._source.subscribe(self)
return Disposables.create(sourceSubscription, otherObserver._subscription)
}