每次订阅用独立的一条connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
private final class ShareWhileConnectedConnection: ObserverType, SynchronizedUnsubscribeType
可以看到这个类优点特别,本身只是Observer, 但是确可以接受订阅,在处理事件时,把事件转发它自己的所有订阅者;
它也对应唯一的connection,当它自己的observers为空,或者说没有订阅者时,释放connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
typealias Observers = AnyObserver<Element>.s // event handlers bag
typealias DisposeKey = Observers.KeyType
typealias Parent = ShareWhileConnected<Element>
private let _parent: Parent
private let _subscription = SingleAssignmentDisposable()
private let _lock: RecursiveLock
private var _disposed: Bool = false
fileprivate var _observers = Observers()
final func on(_ event: Event<Element>) {
self._lock.lock()
let observers = self._synchronized_on(event)
self._lock.unlock()
// 转发事件(注意这个类是observer)
dispatch(observers, event)
}
final private func _synchronized_on(_ event: Event<Element>) -> Observers {
if self._disposed {
return Observers()
}
switch event {
case .next:
return self._observers
case .error, .completed:
let observers = self._observers
self._synchronized_dispose()
return observers
}
}
final func connect() {
// subscribe `self` on `source`
self._subscription.setDisposable(self._parent._source.subscribe(self))
}
// subscribe
// 加入到handler bag中
final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock(); defer { self._lock.unlock() }
let disposeKey = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
final private func _synchronized_dispose() {
self._disposed = true
if self._parent._connection === self {
self._parent._connection = nil
}
self._observers = Observers()
}
// unsubscribe
// 从handler bag中移除
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self._lock.lock()
let shouldDisconnect = self._synchronized_unsubscribe(disposeKey)
self._lock.unlock()
if shouldDisconnect {
self._subscription.dispose()
}
}
@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return false
}
if self._observers.count == 0 {
// dispose when no observers subscribing
self._synchronized_dispose()
return true
}
return false
}
final private class ShareWhileConnected: Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fileprivate typealias Connection = ShareWhileConnectedConnection<Element>
fileprivate let _source: Observable<Element>
private let _lock = RecursiveLock()
fileprivate var _connection: Connection?
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let connection = self._synchronized_subscribe(observer)
let count = connection._observers.count
let disposable = connection._synchronized_subscribe(observer)
self._lock.unlock()
if count == 0 {
// connect
connection.connect()
}
return disposable
}
// lazy load `_connection`
// 所以每次订阅的是同一connection
@inline(__always)
private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
let connection: Connection
if let existingConnection = self._connection {
connection = existingConnection
}
else {
connection = ShareWhileConnectedConnection<Element>(
parent: self,
lock: self._lock)
self._connection = connection
}
return connection
}