ObservableConvertibleType
可以转换为Observerable
1
2
3
4
5
6
7
8
9
10
11
12
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype Element
@available(*, deprecated, renamed: "Element")
typealias E = Element
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<Element>
}
ObservableType
可以接受订阅,订阅的结果为disposable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public protocol ObservableType: ObservableConvertibleType {
/**
Subscribes `observer` to receive events for this sequence.
### Grammar
**Next\* (Error | Completed)?**
* sequences can produce zero or more elements so zero or more `Next` events can be sent to `observer`
* once an `Error` or `Completed` event is sent, the sequence terminates and can't produce any other elements
It is possible that events are sent from different threads, but no two events can be sent concurrently to
`observer`.
### Resource Management
When sequence sends `Complete` or `Error` event all internal resources that compute sequence elements
will be freed.
To cancel production of sequence elements and free resources immediately, call `dispose` on returned
subscription.
- returns: Subscription for `observer` that can be used to cancel production of sequence elements and free resources.
*/
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
1
2
3
4
5
6
7
8
9
10
11
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<Element> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
extension ObservableType {
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<Element>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
/**
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.
- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
gracefully completed, errored, or if the generation is canceled by disposing subscription).
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
注:文中源码来自RxSwift