Home Rxswift Operator Skipuntil
Post
Cancel

Rxswift Operator Skipuntil

SkipUntil

Returns the elements from the source observable sequence that are emitted after the other observable sequence produces an element.

1
2
3
4
public func skipUntil<Source: ObservableType>(_ other: Source)
    -> Observable<Element> {
    return SkipUntil(source: self.asObservable(), other: other.asObservable())
}

final private class SkipUntilSinkOther<Other, Observer: ObserverType>: ObserverType, LockOwnerType, SynchronizedOnType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typealias Parent = SkipUntilSink<Other, Observer>
typealias Element = Other

private let _parent: Parent

var _lock: RecursiveLock {
    return self._parent._lock
}

let _subscription = SingleAssignmentDisposable()

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next:
      // mark parent to forward elements on receiving a `next`
        self._parent._forwardElements = true
        self._subscription.dispose()
    case .error(let e):
        self._parent.forwardOn(.error(e))
        self._parent.dispose()
    case .completed:
        self._subscription.dispose()
    }
}

final private class SkipUntilSink<Other, Observer: ObserverType>: Sink, ObserverType, LockOwnerType, SynchronizedOnType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
typealias Element = Observer.Element 
typealias Parent = SkipUntil<Element, Other>

let _lock = RecursiveLock()
private let _parent: Parent
fileprivate var _forwardElements = false
private let _sourceSubscription = SingleAssignmentDisposable()

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next:
        if self._forwardElements {
            self.forwardOn(event)
        }
    case .error:
        self.forwardOn(event)
        self.dispose()
    case .completed:
        if self._forwardElements {
            self.forwardOn(event)
        }
        self.dispose()
    }
}

func run() -> Disposable {
    let sourceSubscription = self._parent._source.subscribe(self)
    let otherObserver = SkipUntilSinkOther(parent: self)
    let otherSubscription = self._parent._other.subscribe(otherObserver)
    self._sourceSubscription.setDisposable(sourceSubscription)
    otherObserver._subscription.setDisposable(otherSubscription)

    return Disposables.create(_sourceSubscription, otherObserver._subscription)
}

final private class SkipUntil<Element, Other>: Producer

1
2
3
4
5
6
7
8
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = SkipUntilSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}
This post is licensed under CC BY 4.0 by the author.