Home Rxswift Operator Retry
Post
Cancel

Rxswift Operator Retry

Retry

Repeats the source observable sequence until it successfully terminates.

struct InfiniteSequence : Sequence

1
2
3
4
5
6
7
8
9
10
typealias Iterator = AnyIterator<Element>

private let _repeatedValue: Element

func makeIterator() -> Iterator {
    let repeatedValue = _repeatedValue
    return AnyIterator {
        return repeatedValue
    }
}
1
2
3
4
5
6
7
8
public func retry() -> Observable<Element> {
    return CatchSequence(sources: InfiniteSequence(repeatedValue: self.asObservable()))
}

public func retry(_ maxAttemptCount: Int)
    -> Observable<Element> {
    return CatchSequence(sources: Swift.repeatElement(self.asObservable(), count: maxAttemptCount))
}

final private class CatchSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: TailRecursiveSink<Sequence, Observer>, ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
typealias Element = Observer.Element
typealias Parent = CatchSequence<Sequence>

private var _lastError: Swift.Error?

func on(_ event: Event<Element>) {
    switch event {
    case .next:
        self.forwardOn(event)
    case .error(let error):
        self._lastError = error
        self.schedule(.moveNext)
    case .completed:
        self.forwardOn(event)
        self.dispose()
    }
}

override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
    return source.subscribe(self)
}

override func done() {
    if let lastError = self._lastError {
        self.forwardOn(.error(lastError))
    }
    else {
        self.forwardOn(.completed)
    }

    self.dispose()
}

override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
    if let onError = observable as? CatchSequence<Sequence> {
        return (onError.sources.makeIterator(), nil)
    }
    else {
        return nil
    }
}

final private class CatchSequence<Sequence: Swift.Sequence>: Producer where Sequence.Element: ObservableConvertibleType

1
2
3
4
5
6
7
8
typealias Element = Sequence.Element.Element
let sources: Sequence

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = CatchSequenceSink<Sequence, Observer>(observer: observer, cancel: cancel)
    let subscription = sink.run((self.sources.makeIterator(), nil))
    return (sink: sink, subscription: subscription)
}
This post is licensed under CC BY 4.0 by the author.