Publish
Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
This operator is a specialization of multicast
using a PublishSubject
.
public protocol ConnectableObservableType : ObservableType
1
2
| // Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
func connect() -> Disposable
|
public class ConnectableObservable: Observable, ConnectableObservableType
1
2
3
| public func connect() -> Disposable {
rxAbstractMethod()
}
|
1
2
3
4
5
6
7
8
| public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
|
final private class Connection<Subject: SubjectType>: ObserverType, Disposable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| typealias Element = Subject.Observer.Element
private var _lock: RecursiveLock
private var _parent: ConnectableObservableAdapter<Subject>?
private var _subscription : Disposable?
private var _subjectObserver: Subject.Observer
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
func dispose() {
_lock.lock(); defer { _lock.unlock() } // {
fetchOr(self._disposed, 1)
guard let parent = _parent else {
return
}
if parent._connection === self {
parent._connection = nil
parent._subject = nil
}
self._parent = nil
self._subscription?.dispose()
self._subscription = nil
// }
}
|
final private class ConnectableObservableAdapter<Subject: SubjectType>: ConnectableObservable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| typealias ConnectionType = Connection<Subject>
private let _source: Observable<Subject.Observer.Element>
private let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
// connection对source产生订阅
// 而connection会将event转发给内部的subject
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
private var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
// observer内部的subject产生订阅
return self.lazySubject.subscribe(observer)
}
|
所以事件流为:source -> connection -> subject -> observer
调用connection才能产生connection对source的订阅,事件才会从source流向connection