Home rxswift asyncsubject
Post
Cancel

rxswift asyncsubject

AsyncSubject

An AsyncSubject emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes.

(If the source Observable does not emit any values, the AsyncSubject also completes without emitting any values.)

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
private var _lastElement: Element?

// handle event
public func on(_ event: Event<Element>) {
    #if DEBUG
        self._synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self._synchronizationTracker.unregister() }
    #endif
    let (observers, event) = self._synchronized_on(event)
    switch event {
    case .next:
        dispatch(observers, event)
        dispatch(observers, .completed)
    case .completed:
        dispatch(observers, event)
    case .error:
        dispatch(observers, event)
    }
}

func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
    self._lock.lock(); defer { self._lock.unlock() }
    if self._isStopped {
        return (Observers(), .completed)
    }

    switch event {
    case .next(let element):
        self._lastElement = element
      // will not send next
        return (Observers(), .completed)
    case .error:
        self._stoppedEvent = event

        let observers = self._observers
        self._observers.removeAll()

      // send error to all observers
        return (observers, event)
    case .completed:

        let observers = self._observers
        self._observers.removeAll()

        if let lastElement = self._lastElement {
          // send `lastElement` to all observers and mark stopped
            self._stoppedEvent = .next(lastElement)
            return (observers, .next(lastElement))
        }
        else {
          // send `complete` to all observers and mark stopped
            self._stoppedEvent = event
            return (observers, .completed)
        }
    }
}

// subscribe
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    self._lock.lock(); defer { self._lock.unlock() }
    return self._synchronized_subscribe(observer)
}

func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if let stoppedEvent = self._stoppedEvent {
        switch stoppedEvent {
        case .next:
          // send `next` and `complete`
            observer.on(stoppedEvent)
            observer.on(.completed)
        case .completed:
          // send `complete`
            observer.on(stoppedEvent)
        case .error:
          // send `error`
            observer.on(stoppedEvent)
        }
        return Disposables.create()
    }

    let key = self._observers.insert(observer.on)

    return SubscriptionDisposable(owner: self, key: key)
}
This post is licensed under CC BY 4.0 by the author.