Home Rxswift Operator Publish
Post
Cancel

Rxswift Operator Publish

Publish

Returns a connectable observable sequence that shares a single subscription to the underlying sequence.

This operator is a specialization of multicast using a PublishSubject.

public protocol ConnectableObservableType : ObservableType

1
2
// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
func connect() -> Disposable

public class ConnectableObservable: Observable, ConnectableObservableType

1
2
3
public func connect() -> Disposable {
    rxAbstractMethod()
}
1
2
3
4
5
6
7
8
public func publish() -> ConnectableObservable<Element> {
    return self.multicast { PublishSubject() }
}

public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
    -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
    return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}

final private class Connection<Subject: SubjectType>: ObserverType, Disposable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typealias Element = Subject.Observer.Element
private var _lock: RecursiveLock

private var _parent: ConnectableObservableAdapter<Subject>?
private var _subscription : Disposable?
private var _subjectObserver: Subject.Observer

func on(_ event: Event<Subject.Observer.Element>) {
    if isFlagSet(self._disposed, 1) {
        return
    }
    if event.isStopEvent {
        self.dispose()
    }
    self._subjectObserver.on(event)
}

func dispose() {
    _lock.lock(); defer { _lock.unlock() } // {
    fetchOr(self._disposed, 1)
    guard let parent = _parent else {
        return
    }

    if parent._connection === self {
        parent._connection = nil
        parent._subject = nil
    }
    self._parent = nil

    self._subscription?.dispose()
    self._subscription = nil
    // }
}

final private class ConnectableObservableAdapter<Subject: SubjectType>: ConnectableObservable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
typealias ConnectionType = Connection<Subject>

private let _source: Observable<Subject.Observer.Element>
private let _makeSubject: () -> Subject

fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?

override func connect() -> Disposable {
    return self._lock.calculateLocked {
        if let connection = self._connection {
            return connection
        }

        let singleAssignmentDisposable = SingleAssignmentDisposable()
        let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
        self._connection = connection
      // connection对source产生订阅
      // 而connection会将event转发给内部的subject
        let subscription = self._source.subscribe(connection)
        singleAssignmentDisposable.setDisposable(subscription)
        return connection
    }
}

private var lazySubject: Subject {
    if let subject = self._subject {
        return subject
    }

    let subject = self._makeSubject()
    self._subject = subject
    return subject
}

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
  // observer内部的subject产生订阅
    return self.lazySubject.subscribe(observer)
}

所以事件流为:source -> connection -> subject -> observer

调用connection才能产生connection对source的订阅,事件才会从source流向connection

This post is licensed under CC BY 4.0 by the author.