以下源码来自RxSwift
Sink在RxSwift的作用其实是对observer(对时间处理函数的封装)以及cancel(对dispose side effect的封装)的包装
既可以接受处理转发过来的事件(还是由内部的observer处理),也可以直接转发事件给内部的observer
Sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Sink<Observer: ObserverType> : Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
private let _disposed = AtomicInt(0)
#if DEBUG
private let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
// 处理事件,交给observer
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
// 产生SinkForward(一个转发事件的observer, 这里会转发回当前sink交由observer处理)
final func forwarder() -> SinkForward<Observer> {
return SinkForward(forward: self)
}
final var disposed: Bool {
return isFlagSet(self._disposed, 1)
}
// dispose: 交给cancel
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}