TailRecursiveSink
enum TailRecursiveSinkCommand
1
2
case moveNext
case dispose
class TailRecursiveSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink, InvocableWithValueType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
typealias Value = TailRecursiveSinkCommand
typealias Element = Observer.Element
typealias SequenceGenerator = (generator: Sequence.Iterator, remaining: IntMax?)
var _generators: [SequenceGenerator] = []
var _isDisposed = false
var _subscription = SerialDisposable()
var _gate = AsyncLock<InvocableScheduledItem<TailRecursiveSink<Sequence, Observer>>>()
func run(_ sources: SequenceGenerator) -> Disposable {
self._generators.append(sources)
self.schedule(.moveNext)
return self._subscription
}
func invoke(_ command: TailRecursiveSinkCommand) {
switch command {
case .dispose:
self.disposeCommand()
case .moveNext:
self.moveNextCommand()
}
}
func schedule(_ command: TailRecursiveSinkCommand) {
self._gate.invoke(InvocableScheduledItem(invocable: self, state: command))
}
func done() {
self.forwardOn(.completed)
self.dispose()
}
func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
rxAbstractMethod()
}
private func moveNextCommand() {
var next: Observable<Element>?
repeat {
guard let (g, left) = self._generators.last else {
break
}
if self._isDisposed {
return
}
self._generators.removeLast()
var e = g
guard let nextCandidate = e.next()?.asObservable() else {
continue
}
// `left` is a hint of how many elements are left in generator.
// In case this is the last element, then there is no need to push
// that generator on stack.
//
// This is an optimization used to make sure in tail recursive case
// there is no memory leak in case this operator is used to generate non terminating
// sequence.
if let knownOriginalLeft = left {
// `- 1` because generator.next() has just been called
if knownOriginalLeft - 1 >= 1 {
self._generators.append((e, knownOriginalLeft - 1))
}
}
else {
self._generators.append((e, nil))
}
let nextGenerator = self.extract(nextCandidate)
if let nextGenerator = nextGenerator {
self._generators.append(nextGenerator)
#if DEBUG || TRACE_RESOURCES
if maxTailRecursiveSinkStackSize < self._generators.count {
maxTailRecursiveSinkStackSize = self._generators.count
}
#endif
}
else {
next = nextCandidate
}
} while next == nil
guard let existingNext = next else {
self.done()
return
}
let disposable = SingleAssignmentDisposable()
self._subscription.disposable = disposable
disposable.setDisposable(self.subscribeToNext(existingNext))
}
func subscribeToNext(_ source: Observable<Element>) -> Disposable {
rxAbstractMethod()
}
func disposeCommand() {
self._isDisposed = true
self._generators.removeAll(keepingCapacity: false)
}
override func dispose() {
super.dispose()
self._subscription.dispose()
self._gate.dispose()
self.schedule(.dispose)
}