Retry
Repeats the source observable sequence until it successfully terminates.
struct InfiniteSequence : Sequence
1
2
3
4
5
6
7
8
9
10
typealias Iterator = AnyIterator<Element>
private let _repeatedValue: Element
func makeIterator() -> Iterator {
let repeatedValue = _repeatedValue
return AnyIterator {
return repeatedValue
}
}
1
2
3
4
5
6
7
8
public func retry() -> Observable<Element> {
return CatchSequence(sources: InfiniteSequence(repeatedValue: self.asObservable()))
}
public func retry(_ maxAttemptCount: Int)
-> Observable<Element> {
return CatchSequence(sources: Swift.repeatElement(self.asObservable(), count: maxAttemptCount))
}
final private class CatchSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: TailRecursiveSink<Sequence, Observer>, ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
typealias Element = Observer.Element
typealias Parent = CatchSequence<Sequence>
private var _lastError: Swift.Error?
func on(_ event: Event<Element>) {
switch event {
case .next:
self.forwardOn(event)
case .error(let error):
self._lastError = error
self.schedule(.moveNext)
case .completed:
self.forwardOn(event)
self.dispose()
}
}
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
return source.subscribe(self)
}
override func done() {
if let lastError = self._lastError {
self.forwardOn(.error(lastError))
}
else {
self.forwardOn(.completed)
}
self.dispose()
}
override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let onError = observable as? CatchSequence<Sequence> {
return (onError.sources.makeIterator(), nil)
}
else {
return nil
}
}
final private class CatchSequence<Sequence: Swift.Sequence>: Producer where Sequence.Element: ObservableConvertibleType
1
2
3
4
5
6
7
8
typealias Element = Sequence.Element.Element
let sources: Sequence
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = CatchSequenceSink<Sequence, Observer>(observer: observer, cancel: cancel)
let subscription = sink.run((self.sources.makeIterator(), nil))
return (sink: sink, subscription: subscription)
}