Home Rxswift Connection Sharereplay1connected.md
Post
Cancel

Rxswift Connection Sharereplay1connected.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
    -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

private final class ShareReplay1WhileConnectedConnection: ObserverType, SynchronizedUnsubscribeType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType

typealias Parent = ShareReplay1WhileConnected<Element>
private let _parent: Parent
private let _subscription = SingleAssignmentDisposable()

private let _lock: RecursiveLock
private var _disposed: Bool = false
fileprivate var _observers = Observers()
private var _element: Element?

final func on(_ event: Event<Element>) {
    self._lock.lock()
    let observers = self._synchronized_on(event)
    self._lock.unlock()
    dispatch(observers, event)
}


final private func _synchronized_on(_ event: Event<Element>) -> Observers {
    if self._disposed {
        return Observers()
    }

    switch event {
    case .next(let element):
        self._element = element
        return self._observers
    case .error, .completed:
        let observers = self._observers
        self._synchronized_dispose()
        return observers
    }
}

final func connect() {
  // subscribe `self` on `source`
    self._subscription.setDisposable(self._parent._source.subscribe(self))
}

final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self._lock.lock(); defer { self._lock.unlock() }
    if let element = self._element {// replay the last element immediately after upon subscription
        observer.on(.next(element))
    }

    let disposeKey = self._observers.insert(observer.on)

    return SubscriptionDisposable(owner: self, key: disposeKey)
}


final private func _synchronized_dispose() {
    self._disposed = true
    if self._parent._connection === self {
      // drop the connection
        self._parent._connection = nil
    }
  // clear observers
    self._observers = Observers()
}

// unscribe
// remove observer from handler bag
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
    self._lock.lock()
    let shouldDisconnect = self._synchronized_unsubscribe(disposeKey)
    self._lock.unlock()
    if shouldDisconnect {
        self._subscription.dispose()
    }
}


@inline(__always)
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
    // if already unsubscribed, just return
    if self._observers.removeKey(disposeKey) == nil {
        return false
    }

    if self._observers.count == 0 {
      // dispose when no observers subscribing
        self._synchronized_dispose()
        return true
    }

    return false
}

final private class ShareReplay1WhileConnected: Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
fileprivate typealias Connection = ShareReplay1WhileConnectedConnection<Element>

fileprivate let _source: Observable<Element>

private let _lock = RecursiveLock()

fileprivate var _connection: Connection?

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self._lock.lock()

    let connection = self._synchronized_subscribe(observer)
    let count = connection._observers.count

    let disposable = connection._synchronized_subscribe(observer)

    self._lock.unlock()

    if count == 0 {
      // connect
        connection.connect()
    }

    return disposable
}

// lazy loading (_connection)
// 所以每次订阅用的是同一connection
@inline(__always)
private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
    let connection: Connection

    if let existingConnection = self._connection {
        connection = existingConnection
    }
    else {
        connection = ShareReplay1WhileConnectedConnection<Element>(
            parent: self,
            lock: self._lock)
        self._connection = connection
    }

    return connection
}
This post is licensed under CC BY 4.0 by the author.