Home telegram - messageMediaPreuploadManager
Post
Cancel

telegram - messageMediaPreuploadManager

MessageMediaPreuploadManager

enum MultipartUploadSource

1
2
3
case resource(MediaResourceReference)
case data(Data)
case custom(Signal<MediaResourceData, NoError>)

enum MultipartUploadResult

1
2
3
case progress(Float)
case inputFile(Api.InputFile)
case inputSecretFile(Api.InputEncryptedFile, Int32, SecretFileEncryptionKey)			

class MessageMediaPreuploadManagerUploadContext

1
2
3
4
let disposable = MetaDisposable()
var progress: Float?
var result: MultipartUploadResult?
let subscribers = Bag<(MultipartUploadResult) -> Void>()

class MessageMediaPreuploadManagerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private let queue: Queue
private var uploadContexts: [Int64: MessageMediaPreuploadManagerUploadContext] = [:]

// add uplaod work
func add(network: Network, postbox: Postbox, id: Int64, encrypt: Bool, tag: MediaResourceFetchTag?, source: Signal<MediaResourceData, NoError>) {
    let context = MessageMediaPreuploadManagerUploadContext()
    self.uploadContexts[id] = context
    let queue = self.queue
  // upload and report result
    context.disposable.set(multipartUpload(network: network, postbox: postbox, source: .custom(source), encrypt: encrypt, tag: tag, hintFileSize: nil, hintFileIsLarge: false).start(next: { [weak self] next in
        queue.async {
            if let strongSelf = self, let context = strongSelf.uploadContexts[id] {
                switch next {
                    case let .progress(value):
                        print("progress")
                        context.progress = value
                    default:
                        print("result")
                        context.result = next
                }
              // notify result to subscribers
                for subscriber in context.subscribers.copyItems() {
                    subscriber(next)
                }
            }
        }
    }))
}

// upload source
func upload(network: Network, postbox: Postbox, source: MultipartUploadSource, encrypt: Bool, tag: MediaResourceFetchTag?, hintFileSize: Int?, hintFileIsLarge: Bool) -> Signal<MultipartUploadResult, MultipartUploadError> {
    let queue = self.queue
    return Signal { [weak self] subscriber in
        if let strongSelf = self {
            if case let .resource(resource) = source, let id = localIdForResource(resource.resource), let context = strongSelf.uploadContexts[id] {
                if let result = context.result {
                    subscriber.putNext(.progress(1.0))
                    subscriber.putNext(result)
                    subscriber.putCompletion()
                    return EmptyDisposable
                } else if let progress = context.progress {
                    subscriber.putNext(.progress(progress))
                }
              // subscribe existing context for result
                let index = context.subscribers.add({ next in
                    subscriber.putNext(next)
                    switch next {
                        case .inputFile, .inputSecretFile:
                            subscriber.putCompletion()
                        case .progress:
                            break
                    }
                })
                return ActionDisposable {
                    queue.async {
                        if let strongSelf = self, let context = strongSelf.uploadContexts[id] {
                            context.subscribers.remove(index)
                        }
                    }
                }
            } else {
              // multipart uplaod
                return multipartUpload(network: network, postbox: postbox, source: source, encrypt: encrypt, tag: tag, hintFileSize: hintFileSize, hintFileIsLarge: hintFileIsLarge).start(next: { next in
                    subscriber.putNext(next)
                }, error: { error in
                    subscriber.putError(error)
                }, completed: {
                    subscriber.putCompletion()
                })
            }
        } else {
            subscriber.putError(.generic)
            return EmptyDisposable
        }
    } |> runOn(self.queue)
}

class MessageMediaPreuploadManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private let impl: QueueLocalObject<MessageMediaPreuploadManagerContext>

// add 
public func add(network: Network, postbox: Postbox, id: Int64, encrypt: Bool, tag: MediaResourceFetchTag?, source: Signal<MediaResourceData, NoError>) {
    self.impl.with { context in
        context.add(network: network, postbox: postbox, id: id, encrypt: encrypt, tag: tag, source: source)
    }
}

// upload
func upload(network: Network, postbox: Postbox, source: MultipartUploadSource, encrypt: Bool, tag: MediaResourceFetchTag?, hintFileSize: Int?, hintFileIsLarge: Bool) -> Signal<MultipartUploadResult, MultipartUploadError> {
    return Signal<Signal<MultipartUploadResult, MultipartUploadError>, MultipartUploadError> { subscriber in
        self.impl.with { context in
            subscriber.putNext(context.upload(network: network, postbox: postbox, source: source, encrypt: encrypt, tag: tag, hintFileSize: hintFileSize, hintFileIsLarge: hintFileIsLarge))
            subscriber.putCompletion()
        }
        return EmptyDisposable
    }
    |> switchToLatest
}
This post is licensed under CC BY 4.0 by the author.