文中源码来自RxSwift
Observer即事件接受者 、处理事件者、订阅者
protocol
ObserverType
1
2
3
4
5
6
7
8
9
10
11
12
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype Element
@available(*, deprecated, renamed: "Element")
typealias E = Element
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<Element>)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
Map to AnyObserver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
extension ObserverType {
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<Element> {
return AnyObserver(self)
}
/// Transforms observer of type R to type E using custom transform method.
/// Each event sent to result observer is transformed and sent to `self`.
///
/// - returns: observer that transforms events.
public func mapObserver<Result>(_ transform: @escaping (Result) throws -> Element) -> AnyObserver<Result> {
return AnyObserver { e in
self.on(e.map(transform))
}
}
}
SinkFroward:
ObserverType
把事件转发送给_forward:Sink
Sink是对observer是cancel的包装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final class SinkForward<Observer: ObserverType>: ObserverType {
typealias Element = Observer.Element
private let _forward: Sink<Observer>
init(forward: Sink<Observer>) {
self._forward = forward
}
final func on(_ event: Event<Element>) {
switch event {
case .next:
self._forward._observer.on(event)
case .error, .completed:
self._forward._observer.on(event)
self._forward._cancel.dispose()
}
}
}