Interval
Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.
1
2
3
4
5
6
7
8
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
final private class TimerSink<Observer: ObserverType> : Sink where Observer.Element : RxAbstractInteger
1
2
3
4
5
6
7
8
9
10
11
typealias Parent = Timer<Observer.Element>
private let _parent: Parent
private let _lock = RecursiveLock()
func run() -> Disposable {
return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.next(state))
return state &+ 1
}
}
final private class TimerOneOffSink<Observer: ObserverType>: Sink where Observer.Element: RxAbstractInteger
1
2
3
4
5
6
7
8
9
10
11
private let _parent: Parent
func run() -> Disposable {
return self._parent._scheduler.scheduleRelative(self, dueTime: self._parent._dueTime) { [unowned self] _ -> Disposable in
self.forwardOn(.next(0))
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
}
}
final private class Timer<Element: RxAbstractInteger>: Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fileprivate let _scheduler: SchedulerType
fileprivate let _dueTime: RxTimeInterval
fileprivate let _period: RxTimeInterval?
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
if self._period != nil {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}