Home Rxswift Tailrecursivesink
Post
Cancel

Rxswift Tailrecursivesink

TailRecursiveSink

enum TailRecursiveSinkCommand

1
2
case moveNext
case dispose

class TailRecursiveSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink, InvocableWithValueType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
typealias Value = TailRecursiveSinkCommand
typealias Element = Observer.Element

typealias SequenceGenerator = (generator: Sequence.Iterator, remaining: IntMax?)

var _generators: [SequenceGenerator] = []
var _isDisposed = false
var _subscription = SerialDisposable()

var _gate = AsyncLock<InvocableScheduledItem<TailRecursiveSink<Sequence, Observer>>>()

func run(_ sources: SequenceGenerator) -> Disposable {
    self._generators.append(sources)

    self.schedule(.moveNext)

    return self._subscription
}

func invoke(_ command: TailRecursiveSinkCommand) {
    switch command {
    case .dispose:
        self.disposeCommand()
    case .moveNext:
        self.moveNextCommand()
    }
}

func schedule(_ command: TailRecursiveSinkCommand) {
    self._gate.invoke(InvocableScheduledItem(invocable: self, state: command))
}

func done() {
    self.forwardOn(.completed)
    self.dispose()
}

func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
    rxAbstractMethod()
}

private func moveNextCommand() {
    var next: Observable<Element>?

    repeat {
        guard let (g, left) = self._generators.last else {
            break
        }

        if self._isDisposed {
            return
        }

        self._generators.removeLast()

        var e = g

        guard let nextCandidate = e.next()?.asObservable() else {
            continue
        }

        // `left` is a hint of how many elements are left in generator.
        // In case this is the last element, then there is no need to push
        // that generator on stack.
        //
        // This is an optimization used to make sure in tail recursive case
        // there is no memory leak in case this operator is used to generate non terminating
        // sequence.

        if let knownOriginalLeft = left {
            // `- 1` because generator.next() has just been called
            if knownOriginalLeft - 1 >= 1 {
                self._generators.append((e, knownOriginalLeft - 1))
            }
        }
        else {
            self._generators.append((e, nil))
        }

        let nextGenerator = self.extract(nextCandidate)

        if let nextGenerator = nextGenerator {
            self._generators.append(nextGenerator)
            #if DEBUG || TRACE_RESOURCES
                if maxTailRecursiveSinkStackSize < self._generators.count {
                    maxTailRecursiveSinkStackSize = self._generators.count
                }
            #endif
        }
        else {
            next = nextCandidate
        }
    } while next == nil

    guard let existingNext = next else {
        self.done()
        return
    }

    let disposable = SingleAssignmentDisposable()
    self._subscription.disposable = disposable
    disposable.setDisposable(self.subscribeToNext(existingNext))
}

func subscribeToNext(_ source: Observable<Element>) -> Disposable {
    rxAbstractMethod()
}

func disposeCommand() {
    self._isDisposed = true
    self._generators.removeAll(keepingCapacity: false)
}

override func dispose() {
    super.dispose()

    self._subscription.dispose()
    self._gate.dispose()

    self.schedule(.dispose)
}
This post is licensed under CC BY 4.0 by the author.