Home rxswift take-until
Post
Cancel

rxswift take-until

TakeUntil

Returns the elements from the source observable sequence until the other observable sequence produces an element.

1
2
3
4
public func takeUntil<Source: ObservableType>(_ other: Source)
    -> Observable<Element> {
    return TakeUntil(source: self.asObservable(), other: other.asObservable())
}

final private class TakeUntilSink<Other, Observer: ObserverType>: Sink, LockOwnerType, ObserverType, SynchronizedOnType

1
2
3
4
typealias Element = Observer.Element 
typealias Parent = TakeUntil<Element, Other>
private let _parent: Parent
let _lock = RecursiveLock()

final private class TakeUntilSinkOther<Other, Observer: ObserverType>: ObserverType, LockOwnerType, SynchronizedOnType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typealias Parent = TakeUntilSink<Other, Observer>
typealias Element = Other

private let _parent: Parent

var _lock: RecursiveLock {
    return self._parent._lock
}

fileprivate let _subscription = SingleAssignmentDisposable()

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next:
      // too easy to understand
        self._parent.forwardOn(.completed)
        self._parent.dispose()
    case .error(let e):
        self._parent.forwardOn(.error(e))
        self._parent.dispose()
    case .completed:
        self._subscription.dispose()
    }
}

final private class TakeUntil<Element, Other>: Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}

func on(_ event: Event<Element>) {
    self.synchronizedOn(event)
}

func _synchronized_on(_ event: Event<Element>) {
    switch event {
    case .next:
        self.forwardOn(event)
    case .error:
        self.forwardOn(event)
        self.dispose()
    case .completed:
        self.forwardOn(event)
        self.dispose()
    }
}

func run() -> Disposable {
    let otherObserver = TakeUntilSinkOther(parent: self)
    let otherSubscription = self._parent._other.subscribe(otherObserver)
    otherObserver._subscription.setDisposable(otherSubscription)
    let sourceSubscription = self._parent._source.subscribe(self)

    return Disposables.create(sourceSubscription, otherObserver._subscription)
}
This post is licensed under CC BY 4.0 by the author.