MessageMediaPreuploadManager
enum
MultipartUploadSource
1
2
3
case resource(MediaResourceReference)
case data(Data)
case custom(Signal<MediaResourceData, NoError>)
enum
MultipartUploadResult
1
2
3
case progress(Float)
case inputFile(Api.InputFile)
case inputSecretFile(Api.InputEncryptedFile, Int32, SecretFileEncryptionKey)
class
MessageMediaPreuploadManagerUploadContext
1
2
3
4
let disposable = MetaDisposable()
var progress: Float?
var result: MultipartUploadResult?
let subscribers = Bag<(MultipartUploadResult) -> Void>()
class
MessageMediaPreuploadManagerContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private let queue: Queue
private var uploadContexts: [Int64: MessageMediaPreuploadManagerUploadContext] = [:]
// add uplaod work
func add(network: Network, postbox: Postbox, id: Int64, encrypt: Bool, tag: MediaResourceFetchTag?, source: Signal<MediaResourceData, NoError>) {
let context = MessageMediaPreuploadManagerUploadContext()
self.uploadContexts[id] = context
let queue = self.queue
// upload and report result
context.disposable.set(multipartUpload(network: network, postbox: postbox, source: .custom(source), encrypt: encrypt, tag: tag, hintFileSize: nil, hintFileIsLarge: false).start(next: { [weak self] next in
queue.async {
if let strongSelf = self, let context = strongSelf.uploadContexts[id] {
switch next {
case let .progress(value):
print("progress")
context.progress = value
default:
print("result")
context.result = next
}
// notify result to subscribers
for subscriber in context.subscribers.copyItems() {
subscriber(next)
}
}
}
}))
}
// upload source
func upload(network: Network, postbox: Postbox, source: MultipartUploadSource, encrypt: Bool, tag: MediaResourceFetchTag?, hintFileSize: Int?, hintFileIsLarge: Bool) -> Signal<MultipartUploadResult, MultipartUploadError> {
let queue = self.queue
return Signal { [weak self] subscriber in
if let strongSelf = self {
if case let .resource(resource) = source, let id = localIdForResource(resource.resource), let context = strongSelf.uploadContexts[id] {
if let result = context.result {
subscriber.putNext(.progress(1.0))
subscriber.putNext(result)
subscriber.putCompletion()
return EmptyDisposable
} else if let progress = context.progress {
subscriber.putNext(.progress(progress))
}
// subscribe existing context for result
let index = context.subscribers.add({ next in
subscriber.putNext(next)
switch next {
case .inputFile, .inputSecretFile:
subscriber.putCompletion()
case .progress:
break
}
})
return ActionDisposable {
queue.async {
if let strongSelf = self, let context = strongSelf.uploadContexts[id] {
context.subscribers.remove(index)
}
}
}
} else {
// multipart uplaod
return multipartUpload(network: network, postbox: postbox, source: source, encrypt: encrypt, tag: tag, hintFileSize: hintFileSize, hintFileIsLarge: hintFileIsLarge).start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
} else {
subscriber.putError(.generic)
return EmptyDisposable
}
} |> runOn(self.queue)
}
class
MessageMediaPreuploadManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private let impl: QueueLocalObject<MessageMediaPreuploadManagerContext>
// add
public func add(network: Network, postbox: Postbox, id: Int64, encrypt: Bool, tag: MediaResourceFetchTag?, source: Signal<MediaResourceData, NoError>) {
self.impl.with { context in
context.add(network: network, postbox: postbox, id: id, encrypt: encrypt, tag: tag, source: source)
}
}
// upload
func upload(network: Network, postbox: Postbox, source: MultipartUploadSource, encrypt: Bool, tag: MediaResourceFetchTag?, hintFileSize: Int?, hintFileIsLarge: Bool) -> Signal<MultipartUploadResult, MultipartUploadError> {
return Signal<Signal<MultipartUploadResult, MultipartUploadError>, MultipartUploadError> { subscriber in
self.impl.with { context in
subscriber.putNext(context.upload(network: network, postbox: postbox, source: source, encrypt: encrypt, tag: tag, hintFileSize: hintFileSize, hintFileIsLarge: hintFileIsLarge))
subscriber.putCompletion()
}
return EmptyDisposable
}
|> switchToLatest
}