Home Rxswift Operator Interval
Post
Cancel

Rxswift Operator Interval

Interval

Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.

1
2
3
4
5
6
7
8
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
    -> Observable<Element> {
    return Timer(
        dueTime: period,
        period: period,
        scheduler: scheduler
    )
}

final private class TimerSink<Observer: ObserverType> : Sink where Observer.Element : RxAbstractInteger

1
2
3
4
5
6
7
8
9
10
11
typealias Parent = Timer<Observer.Element>
private let _parent: Parent
private let _lock = RecursiveLock()

func run() -> Disposable {
    return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
        self._lock.lock(); defer { self._lock.unlock() }
        self.forwardOn(.next(state))
        return state &+ 1
    }
}

final private class TimerOneOffSink<Observer: ObserverType>: Sink where Observer.Element: RxAbstractInteger

1
2
3
4
5
6
7
8
9
10
11
private let _parent: Parent

func run() -> Disposable {
    return self._parent._scheduler.scheduleRelative(self, dueTime: self._parent._dueTime) { [unowned self] _ -> Disposable in
        self.forwardOn(.next(0))
        self.forwardOn(.completed)
        self.dispose()

        return Disposables.create()
    }
}

final private class Timer<Element: RxAbstractInteger>: Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fileprivate let _scheduler: SchedulerType
fileprivate let _dueTime: RxTimeInterval
fileprivate let _period: RxTimeInterval?

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    if self._period != nil {
        let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    else {
        let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
This post is licensed under CC BY 4.0 by the author.