AsyncSubject
An AsyncSubject emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes.
(If the source Observable does not emit any values, the AsyncSubject also completes without emitting any values.)
核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
private var _lastElement: Element?
// handle event
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
let (observers, event) = self._synchronized_on(event)
switch event {
case .next:
dispatch(observers, event)
dispatch(observers, .completed)
case .completed:
dispatch(observers, event)
case .error:
dispatch(observers, event)
}
}
func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
self._lock.lock(); defer { self._lock.unlock() }
if self._isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
self._lastElement = element
// will not send next
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
// send error to all observers
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
// send `lastElement` to all observers and mark stopped
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
// send `complete` to all observers and mark stopped
self._stoppedEvent = event
return (observers, .completed)
}
}
}
// subscribe
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock(); defer { self._lock.unlock() }
return self._synchronized_subscribe(observer)
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
switch stoppedEvent {
case .next:
// send `next` and `complete`
observer.on(stoppedEvent)
observer.on(.completed)
case .completed:
// send `complete`
observer.on(stoppedEvent)
case .error:
// send `error`
observer.on(stoppedEvent)
}
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}