Home Rxswift Connection Sharewhileconnected
Post
Cancel

Rxswift Connection Sharewhileconnected

每次订阅用独立的一条connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
    -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

private final class ShareWhileConnectedConnection: ObserverType, SynchronizedUnsubscribeType

可以看到这个类优点特别,本身只是Observer, 但是确可以接受订阅,在处理事件时,把事件转发它自己的所有订阅者;

它也对应唯一的connection,当它自己的observers为空,或者说没有订阅者时,释放connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
typealias Observers = AnyObserver<Element>.s // event handlers bag
typealias DisposeKey = Observers.KeyType


typealias Parent = ShareWhileConnected<Element>
private let _parent: Parent
private let _subscription = SingleAssignmentDisposable()

private let _lock: RecursiveLock
private var _disposed: Bool = false
fileprivate var _observers = Observers()

final func on(_ event: Event<Element>) {
    self._lock.lock()
    let observers = self._synchronized_on(event)
    self._lock.unlock()
  // 转发事件(注意这个类是observer)
    dispatch(observers, event)
}

final private func _synchronized_on(_ event: Event<Element>) -> Observers {
    if self._disposed {
        return Observers()
    }

    switch event {
    case .next:
        return self._observers
    case .error, .completed:
        let observers = self._observers
        self._synchronized_dispose()
        return observers
    }
}

final func connect() {
  // subscribe `self` on `source`
    self._subscription.setDisposable(self._parent._source.subscribe(self))
}

// subscribe
// 加入到handler bag中
final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self._lock.lock(); defer { self._lock.unlock() }

    let disposeKey = self._observers.insert(observer.on)

    return SubscriptionDisposable(owner: self, key: disposeKey)
}


final private func _synchronized_dispose() {
    self._disposed = true
    if self._parent._connection === self {
        self._parent._connection = nil
    }
    self._observers = Observers()
}

// unsubscribe
// 从handler bag中移除
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
    self._lock.lock()
    let shouldDisconnect = self._synchronized_unsubscribe(disposeKey)
    self._lock.unlock()
    if shouldDisconnect {
        self._subscription.dispose()
    }
}

@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
    // if already unsubscribed, just return
    if self._observers.removeKey(disposeKey) == nil {
        return false
    }

    if self._observers.count == 0 {
      // dispose when no observers subscribing
        self._synchronized_dispose()
        return true
    }

    return false
}

final private class ShareWhileConnected: Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
fileprivate typealias Connection = ShareWhileConnectedConnection<Element>
fileprivate let _source: Observable<Element>
private let _lock = RecursiveLock()
fileprivate var _connection: Connection?


override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self._lock.lock()

    let connection = self._synchronized_subscribe(observer)
    let count = connection._observers.count

    let disposable = connection._synchronized_subscribe(observer)

    self._lock.unlock()

    if count == 0 {
      // connect
        connection.connect()
    }

    return disposable
}


// lazy load `_connection`
// 所以每次订阅的是同一connection
@inline(__always)
private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
    let connection: Connection

    if let existingConnection = self._connection {
        connection = existingConnection
    }
    else {
        connection = ShareWhileConnectedConnection<Element>(
            parent: self,
            lock: self._lock)
        self._connection = connection
    }

    return connection
}
This post is licensed under CC BY 4.0 by the author.