public protocol SharingStrategyProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
Scheduled on which all sequence events will be delivered.
*/
static var scheduler: SchedulerType { get }
/**
Computation resources sharing strategy for multiple sequence observers.
E.g. One can choose `share(replay:scope:)`
as sequence event sharing strategies, but also do something more exotic, like
implementing promises or lazy loading chains.
*/
static func share<Element>(_ source: Observable<Element>) -> Observable<Element>
1
2
3
4
5
6
public struct DriverSharingStrategy: SharingStrategyProtocol {
public static var scheduler: SchedulerType { return SharingScheduler.make() }
public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
return source.share(replay: 1, scope: .whileConnected)
}
}
public protocol SharedSequenceConvertibleType : ObservableConvertibleType
1
2
3
associatedtype SharingStrategy: SharingStrategyProtocol
func asSharedSequence() -> SharedSequence<SharingStrategy, Element>
1
2
3
4
5
extension SharedSequenceConvertibleType {
public func asObservable() -> Observable<Element> {
return self.asSharedSequence().asObservable()
}
}
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let _source: Observable<Element>
init(_ source: Observable<Element>) {
self._source = SharingStrategy.share(source)
}
init(raw: Observable<Element>) {
self._source = raw
}
public func asObservable() -> Observable<Element> {
return self._source
}
public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
return self
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
extension SharedSequence {
/**
Returns an empty observable sequence, using the specified scheduler to send out the single `Completed` message.
- returns: An observable sequence with no elements.
*/
public static func empty() -> SharedSequence<SharingStrategy, Element> {
return SharedSequence(raw: Observable.empty().subscribeOn(SharingStrategy.scheduler))
}
/**
Returns a non-terminating observable sequence, which can be used to denote an infinite duration.
- returns: An observable sequence whose observers will never get called.
*/
public static func never() -> SharedSequence<SharingStrategy, Element> {
return SharedSequence(raw: Observable.never())
}
/**
Returns an observable sequence that contains a single element.
- parameter element: Single element in the resulting observable sequence.
- returns: An observable sequence containing the single specified element.
*/
public static func just(_ element: Element) -> SharedSequence<SharingStrategy, Element> {
return SharedSequence(raw: Observable.just(element).subscribeOn(SharingStrategy.scheduler))
}
/**
Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
- parameter observableFactory: Observable factory function to invoke for each observer that subscribes to the resulting sequence.
- returns: An observable sequence whose observers trigger an invocation of the given observable factory function.
*/
public static func deferred(_ observableFactory: @escaping () -> SharedSequence<SharingStrategy, Element>)
-> SharedSequence<SharingStrategy, Element> {
return SharedSequence(Observable.deferred { observableFactory().asObservable() })
}
/**
This method creates a new Observable instance with a variable number of elements.
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
- parameter elements: Elements to generate.
- returns: The observable sequence whose elements are pulled from the given arguments.
*/
public static func of(_ elements: Element ...) -> SharedSequence<SharingStrategy, Element> {
let source = Observable.from(elements, scheduler: SharingStrategy.scheduler)
return SharedSequence(raw: source)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
extension SharedSequence {
/**
This method converts an array to an observable sequence.
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
*/
public static func from(_ array: [Element]) -> SharedSequence<SharingStrategy, Element> {
let source = Observable.from(array, scheduler: SharingStrategy.scheduler)
return SharedSequence(raw: source)
}
/**
This method converts a sequence to an observable sequence.
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
*/
public static func from<Sequence: Swift.Sequence>(_ sequence: Sequence) -> SharedSequence<SharingStrategy, Element> where Sequence.Element == Element {
let source = Observable.from(sequence, scheduler: SharingStrategy.scheduler)
return SharedSequence(raw: source)
}
/**
This method converts a optional to an observable sequence.
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
- parameter optional: Optional element in the resulting observable sequence.
- returns: An observable sequence containing the wrapped value or not from given optional.
*/
public static func from(optional: Element?) -> SharedSequence<SharingStrategy, Element> {
let source = Observable.from(optional: optional, scheduler: SharingStrategy.scheduler)
return SharedSequence(raw: source)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
extension SharedSequence where Element : RxAbstractInteger {
/**
Returns an observable sequence that produces a value after each period, using the specified scheduler to run timers and to send out observer messages.
- seealso: [interval operator on reactivex.io](http://reactivex.io/documentation/operators/interval.html)
- parameter period: Period for producing the values in the resulting sequence.
- returns: An observable sequence that produces a value after each period.
*/
public static func interval(_ period: RxTimeInterval)
-> SharedSequence<SharingStrategy, Element> {
return SharedSequence(Observable.interval(period, scheduler: SharingStrategy.scheduler))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension SharedSequence where Element: RxAbstractInteger {
/**
Returns an observable sequence that periodically produces a value after the specified initial relative due time has elapsed, using the specified scheduler to run timers.
- seealso: [timer operator on reactivex.io](http://reactivex.io/documentation/operators/timer.html)
- parameter dueTime: Relative time at which to produce the first value.
- parameter period: Period to produce subsequent values.
- returns: An observable sequence that produces a value after due time has elapsed and then each period.
*/
public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval)
-> SharedSequence<SharingStrategy, Element> {
return SharedSequence(Observable.timer(dueTime, period: period, scheduler: SharingStrategy.scheduler))
}
}