Home rxswift shared sequence
Post
Cancel

rxswift shared sequence

public protocol SharingStrategyProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 Scheduled on which all sequence events will be delivered.
*/
static var scheduler: SchedulerType { get }

/**
 Computation resources sharing strategy for multiple sequence observers.

 E.g. One can choose `share(replay:scope:)`
 as sequence event sharing strategies, but also do something more exotic, like
 implementing promises or lazy loading chains.
*/
static func share<Element>(_ source: Observable<Element>) -> Observable<Element>
1
2
3
4
5
6
public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}

public protocol SharedSequenceConvertibleType : ObservableConvertibleType

1
2
3
associatedtype SharingStrategy: SharingStrategyProtocol

func asSharedSequence() -> SharedSequence<SharingStrategy, Element>
1
2
3
4
5
extension SharedSequenceConvertibleType {
    public func asObservable() -> Observable<Element> {
        return self.asSharedSequence().asObservable()
    }
}

public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let _source: Observable<Element>

init(_ source: Observable<Element>) {
    self._source = SharingStrategy.share(source)
}

init(raw: Observable<Element>) {
    self._source = raw
}

public func asObservable() -> Observable<Element> {
    return self._source
}

public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
    return self
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
extension SharedSequence {

    /**
    Returns an empty observable sequence, using the specified scheduler to send out the single `Completed` message.

    - returns: An observable sequence with no elements.
    */
    public static func empty() -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(raw: Observable.empty().subscribeOn(SharingStrategy.scheduler))
    }

    /**
    Returns a non-terminating observable sequence, which can be used to denote an infinite duration.

    - returns: An observable sequence whose observers will never get called.
    */
    public static func never() -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(raw: Observable.never())
    }

    /**
    Returns an observable sequence that contains a single element.

    - parameter element: Single element in the resulting observable sequence.
    - returns: An observable sequence containing the single specified element.
    */
    public static func just(_ element: Element) -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(raw: Observable.just(element).subscribeOn(SharingStrategy.scheduler))
    }

    /**
     Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.

     - parameter observableFactory: Observable factory function to invoke for each observer that subscribes to the resulting sequence.
     - returns: An observable sequence whose observers trigger an invocation of the given observable factory function.
     */
    public static func deferred(_ observableFactory: @escaping () -> SharedSequence<SharingStrategy, Element>)
        -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(Observable.deferred { observableFactory().asObservable() })
    }

    /**
    This method creates a new Observable instance with a variable number of elements.

    - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)

    - parameter elements: Elements to generate.
    - returns: The observable sequence whose elements are pulled from the given arguments.
    */
    public static func of(_ elements: Element ...) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(elements, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
extension SharedSequence {
    
    /**
    This method converts an array to an observable sequence.
     
    - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
    - returns: The observable sequence whose elements are pulled from the given enumerable sequence.
     */
    public static func from(_ array: [Element]) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(array, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
    
    /**
     This method converts a sequence to an observable sequence.
     
     - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
     - returns: The observable sequence whose elements are pulled from the given enumerable sequence.
    */
    public static func from<Sequence: Swift.Sequence>(_ sequence: Sequence) -> SharedSequence<SharingStrategy, Element> where Sequence.Element == Element {
        let source = Observable.from(sequence, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
    
    /**
     This method converts a optional to an observable sequence.
     
     - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
     
     - parameter optional: Optional element in the resulting observable sequence.
     
     - returns: An observable sequence containing the wrapped value or not from given optional.
     */
    public static func from(optional: Element?) -> SharedSequence<SharingStrategy, Element> {
        let source = Observable.from(optional: optional, scheduler: SharingStrategy.scheduler)
        return SharedSequence(raw: source)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
extension SharedSequence where Element : RxAbstractInteger {
    /**
     Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.

     - seealso: [interval operator on reactivex.io](http://reactivex.io/documentation/operators/interval.html)

     - parameter period: Period for producing the values in the resulting sequence.
     - returns: An observable sequence that produces a value after each period.
     */
    public static func interval(_ period: RxTimeInterval)
        -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(Observable.interval(period, scheduler: SharingStrategy.scheduler))
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension SharedSequence where Element: RxAbstractInteger {
    /**
     Returns an observable sequence that periodically produces a value after the specified initial relative due time has elapsed, using the specified scheduler to run timers.

     - seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html)

     - parameter dueTime: Relative time at which to produce the first value.
     - parameter period: Period to produce subsequent values.
     - returns: An observable sequence that produces a value after due time has elapsed and then each period.
     */
    public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval)
        -> SharedSequence<SharingStrategy, Element> {
        return SharedSequence(Observable.timer(dueTime, period: period, scheduler: SharingStrategy.scheduler))
    }
}
This post is licensed under CC BY 4.0 by the author.