ReplaySubject
接口类
- 线程安全
-
订阅是会把存储的值发送给当前的订阅者,也就是replay
createWithBufferSize
createUnbounded
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class ReplaySubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, Disposable {
public typealias SubjectObserverType = ReplaySubject<Element>
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
self._lock.lock()
let value = self._observers.count > 0
self._lock.unlock()
return value
}
fileprivate let _lock = RecursiveLock()
// state
fileprivate var _isDisposed = false
fileprivate var _isStopped = false
fileprivate var _stoppedEvent = nil as Event<Element>? {
didSet {
self._isStopped = self._stoppedEvent != nil
}
}
fileprivate var _observers = Observers()
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
func unsubscribe(_ key: DisposeKey) {
rxAbstractMethod()
}
final var isStopped: Bool {
return self._isStopped
}
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
rxAbstractMethod()
}
/// Returns observer interface for subject.
public func asObserver() -> SubjectObserverType {
return self
}
/// Unsubscribe all observers and release resources.
public func dispose() {
}
/// Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence.
///
/// - parameter bufferSize: Maximal number of elements to replay to observer after subscription.
/// - returns: New instance of replay subject.
public static func create(bufferSize: Int) -> ReplaySubject<Element> {
if bufferSize == 1 {
// replay one
return ReplayOne()
}
else {
// replay many
return ReplayMany(bufferSize: bufferSize)
}
}
/// Creates a new instance of `ReplaySubject` that buffers all the elements of a sequence.
/// To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable'
/// number of elements.
public static func createUnbounded() -> ReplaySubject<Element> {
return ReplayAll()
}
#if TRACE_RESOURCES
override init() {
_ = Resources.incrementTotal()
}
deinit {
_ = Resources.decrementTotal()
}
#endif
}
ReplayBufferBase
abstract methos: trim
, addValueToBuffer
, replayBuffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private class ReplayBufferBase<Element>
: ReplaySubject<Element>
, SynchronizedUnsubscribeType {
func trim() {
rxAbstractMethod()
}
func addValueToBuffer(_ value: Element) {
rxAbstractMethod()
}
func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
rxAbstractMethod()
}
override func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
if self._isDisposed {
return Observers()
}
if self._isStopped {
return Observers()
}
switch event {
case .next(let element):
// buffer value
self.addValueToBuffer(element)
self.trim()
return self._observers
case .error, .completed:
self._stoppedEvent = event
// trim
self.trim()
let observers = self._observers
self._observers.removeAll()
return observers
}
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let anyObserver = observer.asObserver()
// replay buffer on subscribe
self.replayBuffer(anyObserver)
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
else {
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
}
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
self._lock.lock()
self._synchronized_unsubscribe(disposeKey)
self._lock.unlock()
}
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
if self._isDisposed {
return
}
_ = self._observers.removeKey(disposeKey)
}
override func dispose() {
super.dispose()
self.synchronizedDispose()
}
func synchronizedDispose() {
self._lock.lock()
self._synchronized_dispose()
self._lock.unlock()
}
func _synchronized_dispose() {
self._isDisposed = true
self._observers.removeAll()
}
}
ReplayOne
Always buffer one value.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final class ReplayOne<Element> : ReplayBufferBase<Element> {
private var _value: Element?
override init() {
super.init()
}
override func trim() {
}
override func addValueToBuffer(_ value: Element) {
self._value = value
}
// replay the value to observe on subscribe
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
if let value = self._value {
observer.on(.next(value))
}
}
override func _synchronized_dispose() {
super._synchronized_dispose()
self._value = nil
}
}
ReplayManyBase
Values are queued to a Queue data structure.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
fileprivate var _queue: Queue<Element>
init(queueSize: Int) {
self._queue = Queue(capacity: queueSize + 1)
}
override func addValueToBuffer(_ value: Element) {
self._queue.enqueue(value)
}
// replay all queued values the observer on subscribe
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
for item in self._queue {
observer.on(.next(item))
}
}
override func _synchronized_dispose() {
super._synchronized_dispose()
self._queue = Queue(capacity: 0)
}
}
Replay Many
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final class ReplayMany<Element> : ReplayManyBase<Element> {
private let _bufferSize: Int
init(bufferSize: Int) {
self._bufferSize = bufferSize
super.init(queueSize: bufferSize)
}
// trim to bufferSize
override func trim() {
while self._queue.count > self._bufferSize {
_ = self._queue.dequeue()
}
}
}
ReplayAll
1
2
3
4
5
6
7
8
9
10
private final class ReplayAll<Element> : ReplayManyBase<Element> {
init() {
super.init(queueSize: 0)
}
// do not trim
override func trim() {
}
}
注:文中源码来自RxSwift