1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
private final class ShareReplay1WhileConnectedConnection: ObserverType, SynchronizedUnsubscribeType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
typealias Parent = ShareReplay1WhileConnected<Element>
private let _parent: Parent
private let _subscription = SingleAssignmentDisposable()
private let _lock: RecursiveLock
private var _disposed: Bool = false
fileprivate var _observers = Observers()
private var _element: Element?
final func on(_ event: Event<Element>) {
self._lock.lock()
let observers = self._synchronized_on(event)
self._lock.unlock()
dispatch(observers, event)
}
final private func _synchronized_on(_ event: Event<Element>) -> Observers {
if self._disposed {
return Observers()
}
switch event {
case .next(let element):
self._element = element
return self._observers
case .error, .completed:
let observers = self._observers
self._synchronized_dispose()
return observers
}
}
final func connect() {
// subscribe `self` on `source`
self._subscription.setDisposable(self._parent._source.subscribe(self))
}
final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock(); defer { self._lock.unlock() }
if let element = self._element {// replay the last element immediately after upon subscription
observer.on(.next(element))
}
let disposeKey = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: disposeKey)
}
final private func _synchronized_dispose() {
self._disposed = true
if self._parent._connection === self {
// drop the connection
self._parent._connection = nil
}
// clear observers
self._observers = Observers()
}
// unscribe
// remove observer from handler bag
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self._lock.lock()
let shouldDisconnect = self._synchronized_unsubscribe(disposeKey)
self._lock.unlock()
if shouldDisconnect {
self._subscription.dispose()
}
}
@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return false
}
if self._observers.count == 0 {
// dispose when no observers subscribing
self._synchronized_dispose()
return true
}
return false
}
final private class ShareReplay1WhileConnected: Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
fileprivate typealias Connection = ShareReplay1WhileConnectedConnection<Element>
fileprivate let _source: Observable<Element>
private let _lock = RecursiveLock()
fileprivate var _connection: Connection?
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let connection = self._synchronized_subscribe(observer)
let count = connection._observers.count
let disposable = connection._synchronized_subscribe(observer)
self._lock.unlock()
if count == 0 {
// connect
connection.connect()
}
return disposable
}
// lazy loading (_connection)
// 所以每次订阅用的是同一connection
@inline(__always)
private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
let connection: Connection
if let existingConnection = self._connection {
connection = existingConnection
}
else {
connection = ShareReplay1WhileConnectedConnection<Element>(
parent: self,
lock: self._lock)
self._connection = connection
}
return connection
}