SkipUntil
Returns the elements from the source observable sequence that are emitted after the other observable sequence produces an element.
1
2
3
4
public func skipUntil<Source: ObservableType>(_ other: Source)
-> Observable<Element> {
return SkipUntil(source: self.asObservable(), other: other.asObservable())
}
final private class SkipUntilSinkOther<Other, Observer: ObserverType>: ObserverType, LockOwnerType, SynchronizedOnType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typealias Parent = SkipUntilSink<Other, Observer>
typealias Element = Other
private let _parent: Parent
var _lock: RecursiveLock {
return self._parent._lock
}
let _subscription = SingleAssignmentDisposable()
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
// mark parent to forward elements on receiving a `next`
self._parent._forwardElements = true
self._subscription.dispose()
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
case .completed:
self._subscription.dispose()
}
}
final private class SkipUntilSink<Other, Observer: ObserverType>: Sink, ObserverType, LockOwnerType, SynchronizedOnType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
typealias Element = Observer.Element
typealias Parent = SkipUntil<Element, Other>
let _lock = RecursiveLock()
private let _parent: Parent
fileprivate var _forwardElements = false
private let _sourceSubscription = SingleAssignmentDisposable()
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
if self._forwardElements {
self.forwardOn(event)
}
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
if self._forwardElements {
self.forwardOn(event)
}
self.dispose()
}
}
func run() -> Disposable {
let sourceSubscription = self._parent._source.subscribe(self)
let otherObserver = SkipUntilSinkOther(parent: self)
let otherSubscription = self._parent._other.subscribe(otherObserver)
self._sourceSubscription.setDisposable(sourceSubscription)
otherObserver._subscription.setDisposable(otherSubscription)
return Disposables.create(_sourceSubscription, otherObserver._subscription)
}
final private class SkipUntil<Element, Other>: Producer
1
2
3
4
5
6
7
8
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = SkipUntilSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}