Home telegram - pendingMessageManager
Post
Cancel

telegram - pendingMessageManager

listen unsent messageIds

1
2
3
self.managedOperationsDisposable.add(postbox.unsentMessageIdsView().start(next: { [weak pendingMessageManager] view in
            pendingMessageManager?.updatePendingMessageIds(view.ids)
        }))

class UnsentMessageIdsView

1
let ids: Set<MessageId>

updatePendingMessageIds

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func updatePendingMessageIds(_ messageIds: Set<MessageId>) {
    self.queue.async {
        let addedMessageIds = messageIds.subtracting(self.pendingMessageIds)
        let removedMessageIds = self.pendingMessageIds.subtracting(messageIds)
        let removedSecretMessageIds = Set(removedMessageIds.filter({ $0.peerId.namespace == Namespaces.Peer.SecretChat }))

        var updateUploadingPeerIds = Set<PeerId>()
        var updateUploadingGroupIds = Set<Int64>()
        for id in removedMessageIds {
            if let context = self.messageContexts[id] {
                if let groupId = context.state.groupId {
                    updateUploadingGroupIds.insert(groupId)
                }
                context.state = .none
                updateUploadingPeerIds.insert(id.peerId)
                context.sendDisposable.dispose()
                context.uploadDisposable.dispose()
                context.activityDisposable.dispose()

                if context.status != nil {
                    context.status = nil
                    for subscriber in context.statusSubscribers.copyItems() {
                        subscriber(nil, context.error)
                    }
                }

                if context.statusSubscribers.isEmpty {
                    self.messageContexts.removeValue(forKey: id)
                }
            }
        }

      // begin sending if addedMessageIds not empty
        if !addedMessageIds.isEmpty {
            self.beginSendingMessages(Array(addedMessageIds).sorted())
        }

        self.pendingMessageIds = messageIds

        for peerId in updateUploadingPeerIds {
            self.updateWaitingUploads(peerId: peerId)
        }

        for groupId in updateUploadingGroupIds {
            self.beginSendingGroupIfPossible(groupId: groupId)
        }

        if !removedSecretMessageIds.isEmpty {
            let _ = (self.postbox.transaction { transaction -> Set<PeerId> in
                var peerIdsWithDeliveredMessages = Set<PeerId>()
                for id in removedSecretMessageIds {
                    if let message = transaction.getMessage(id) {
                        if message.isSentOrAcknowledged {
                            peerIdsWithDeliveredMessages.insert(id.peerId)
                        }
                    }
                }
                return peerIdsWithDeliveredMessages
            }
            |> deliverOn(self.queue)).start(next: { [weak self] peerIdsWithDeliveredMessages in
                guard let strongSelf = self else {
                    return
                }
                for peerId in peerIdsWithDeliveredMessages {
                    if let context = strongSelf.peerSummaryContexts[peerId] {
                        for subscriber in context.messageDeliveredSubscribers.copyItems() {
                            subscriber()
                        }
                    }
                }
            })
        }

        var peersWithPendingMessages = Set<PeerId>()
        for id in self.pendingMessageIds {
            peersWithPendingMessages.insert(id.peerId)
        }

        self._hasPendingMessages.set(peersWithPendingMessages)
    }
}

class PendingMessageContext

1
2
3
4
5
6
7
8
9
10
var state: PendingMessageState = .none
let uploadDisposable = MetaDisposable()
let sendDisposable = MetaDisposable()
var activityType: PeerInputActivity? = nil
var contentType: PendingMessageUploadedContentType? = nil
let activityDisposable = MetaDisposable()
var status: PendingMessageStatus?
var error: PendingMessageFailureReason?
var statusSubscribers = Bag<(PendingMessageStatus?, PendingMessageFailureReason?) -> Void>()
var forcedReuploadOnce: Bool = false

beginSendingMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private func beginSendingMessages(_ ids: [MessageId]) {
    assert(self.queue.isCurrent())

    for id in ids.sorted() {
      // prepare message context
        let messageContext: PendingMessageContext
        if let current = self.messageContexts[id] {
            messageContext = current
        } else {
            messageContext = PendingMessageContext()
            self.messageContexts[id] = messageContext
        }

        let status = PendingMessageStatus(isRunning: false, progress: 0.0)
        if status != messageContext.status {
            messageContext.status = status
            for subscriber in messageContext.statusSubscribers.copyItems() {
                subscriber(messageContext.status, messageContext.error)
            }
        }
    }

    let disposable = MetaDisposable()
    let messages = self.postbox.messagesAtIds(ids)
    |> deliverOn(self.queue)
    |> afterDisposed { [weak self, weak disposable] in
        if let strongSelf = self, let strongDisposable = disposable {
            strongSelf.beginSendingMessagesDisposables.remove(strongDisposable)
        }
    }
    self.beginSendingMessagesDisposables.add(disposable)
  // load messages with ids from postbox
    disposable.set(messages.start(next: { [weak self] messages in
        if let strongSelf = self {
            assert(strongSelf.queue.isCurrent())

            for message in messages.filter({ !$0.flags.contains(.Sending) }).sorted(by: { $0.id < $1.id }) {
                guard let messageContext = strongSelf.messageContexts[message.id] else {
                    continue
                }

                messageContext.activityType = uploadActivityTypeForMessage(message)
              // mark state as collect uploading info
                strongSelf.collectUploadingInfo(messageContext: messageContext, message: message)
            }

            for (messageContext, _) in strongSelf.messageContexts.values.compactMap({ messageContext -> (PendingMessageContext, Message)? in
                if case let .collectingInfo(message) = messageContext.state {
                    return (messageContext, message)
                } else {
                    return nil
                }
            }).sorted(by: { lhs, rhs in
                return lhs.1.index < rhs.1.index
            }) {
                if case let .collectingInfo(message) = messageContext.state {
                  // prepare uploading content
                    let (contentUploadSignal, contentType) = messageContentToUpload(network: strongSelf.network, postbox: strongSelf.postbox, auxiliaryMethods: strongSelf.auxiliaryMethods, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, messageMediaPreuploadManager: strongSelf.messageMediaPreuploadManager, revalidationContext: strongSelf.revalidationContext, forceReupload:  messageContext.forcedReuploadOnce, isGrouped: message.groupingKey != nil, message: message)
                    messageContext.contentType = contentType

                    if strongSelf.canBeginUploadingMessage(id: message.id, type: contentType) {
                      // begin uploading
                        strongSelf.beginUploadingMessage(messageContext: messageContext, id: message.id, groupId: message.groupingKey, uploadSignal: contentUploadSignal)
                    } else {
                      // mark state as waiting for upload to start
                        messageContext.state = .waitingForUploadToStart(groupId: message.groupingKey, upload: contentUploadSignal)
                    }
                }
            }
        }
    }))
}

PendingMessageUploadedContentResult

enum PendingMessageUploadedContent

1
2
3
4
5
6
case text(String)
case media(Api.InputMedia, String)
case forward(ForwardSourceInfoAttribute)
case chatContextResult(OutgoingChatContextResultMessageAttribute)
case secretMedia(Api.InputEncryptedFile, Int32, SecretFileEncryptionKey)
case messageScreenshot

enum PendingMessageReuploadInfo

1
case reuploadFile(FileMediaReference)

enum PendingMessageUploadedContentAndReuploadInfo

1
2
let content: PendingMessageUploadedContent
let reuploadInfo: PendingMessageReuploadInfo?

enum PendingMessageUploadedContentResult

1
2
case progress(Float)
case content(PendingMessageUploadedContentAndReuploadInfo)

messageContentToUpload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func messageContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, revalidationContext: MediaReferenceRevalidationContext, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, messageId: MessageId?, attributes: [MessageAttribute], text: String, media: [Media]) -> (Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>, PendingMessageUploadedContentType) {
    var contextResult: OutgoingChatContextResultMessageAttribute?
    var autoremoveAttribute: AutoremoveTimeoutMessageAttribute?
    for attribute in attributes {
        if let attribute = attribute as? OutgoingChatContextResultMessageAttribute {
            if peerId.namespace != Namespaces.Peer.SecretChat {
                contextResult = attribute
            }
        } else if let attribute = attribute as? AutoremoveTimeoutMessageAttribute {
            autoremoveAttribute = attribute
        }
    }
    
    var forwardInfo: ForwardSourceInfoAttribute?
    for attribute in attributes {
        if let attribute = attribute as? ForwardSourceInfoAttribute {
            if peerId.namespace != Namespaces.Peer.SecretChat {
                forwardInfo = attribute
            }
        }
    }
    
    if let media = media.first as? UChatMediaAction, media.action == .historyScreenshot {
        return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .messageScreenshot, reuploadInfo: nil))), .none)
    } else if let forwardInfo = forwardInfo {
        return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .forward(forwardInfo), reuploadInfo: nil))), .text)
    } else if let contextResult = contextResult {
        return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .chatContextResult(contextResult), reuploadInfo: nil))), .text)
    } else if let media = media.first, let mediaResult = mediaContentToUpload(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, revalidationContext: revalidationContext, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, media: media, text: text, autoremoveAttribute: autoremoveAttribute, messageId: messageId, attributes: attributes) {
        return (mediaResult, .media)
    } else {
        return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .text(text), reuploadInfo: nil))), .text)
    }
}

mediaContentToUpload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func mediaContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, revalidationContext: MediaReferenceRevalidationContext, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, media: Media, text: String, autoremoveAttribute: AutoremoveTimeoutMessageAttribute?, messageId: MessageId?, attributes: [MessageAttribute]) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>? {
    if let image = media as? UChatMediaImage, let largest = largestImageRepresentation(image.representations) {
        if peerId.namespace == Namespaces.Peer.SecretChat, let resource = largest.resource as? SecretFileMediaResource {
            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(.inputEncryptedFile(id: resource.fileId, accessHash: resource.accessHash), resource.decryptedSize, resource.key), reuploadInfo: nil)))
        }
        if peerId.namespace != Namespaces.Peer.SecretChat, let reference = image.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaPhoto(flags: 0, id: Api.InputPhoto.inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)))
        } else {
            return uploadedMediaImageContent(network: network, postbox: postbox, transformOutgoingMessageMedia: transformOutgoingMessageMedia, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, image: image, messageId: messageId, text: text, attributes: attributes, autoremoveAttribute: autoremoveAttribute)
        }
    } else if let file = media as? UChatMediaFile {
        if let resource = file.resource as? CloudDocumentMediaResource {
            if peerId.namespace == Namespaces.Peer.SecretChat {
                for attribute in file.attributes {
                    if case let .Sticker(sticker) = attribute {
                        if let _ = sticker.packReference {
                            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: PendingMessageUploadedContent.text(text), reuploadInfo: nil)))
                        }
                    }
                }
                return uploadedMediaFileContent(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, forceReupload: true, isGrouped: isGrouped, peerId: peerId, messageId: messageId, text: text, attributes: attributes, file: file)
            } else {
                if forceReupload {
                    let mediaReference: AnyMediaReference
                    if file.isSticker {
                        mediaReference = .standalone(media: file)
                    } else {
                        mediaReference = .savedGif(media: file)
                    }
                    return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: UChatCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: resource)
                    |> mapError { _ -> PendingMessageUploadError in
                        return .generic
                    }
                    |> mapToSignal { validatedResource -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                        if let validatedResource = validatedResource.updatedResource as? UChatCloudMediaResourceWithFileReference, let reference = validatedResource.fileReference {
                            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: reference)), ttlSeconds: nil), text), reuploadInfo: nil)))
                        } else {
                            return .fail(.generic)
                        }
                    }
                }
                return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: resource.fileReference ?? Data())), ttlSeconds: nil), text), reuploadInfo: nil)))
            }
        } else {
            return uploadedMediaFileContent(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, messageId: messageId, text: text, attributes: attributes, file: file)
        }
    } else if let contact = media as? UChatMediaContact {
        let input = Api.InputMedia.inputMediaContact(phoneNumber: contact.phoneNumber, firstName: contact.firstName, lastName: contact.lastName, vcard: contact.vCardData ?? "")
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
    }
    //xlb 好友名片分享
    else if let shareContact = media as? UChatMediaShareContact {
        let input = Api.InputMedia.inputMediaShareContact(userId: shareContact.userId ?? 0)
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
    } else if let map = media as? UChatMediaMap {
        let input: Api.InputMedia
        if let liveBroadcastingTimeout = map.liveBroadcastingTimeout {
            input = .inputMediaGeoLive(flags: 1 << 1, geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude), period: liveBroadcastingTimeout)
        } else if let venue = map.venue {
            input = .inputMediaVenue(geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude), title: venue.title, address: venue.address ?? "", provider: venue.provider ?? "", venueId: venue.id ?? "", venueType: venue.type ?? "")
        } else {
            input = .inputMediaGeoPoint(geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude))
        }
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
    } else if let poll = media as? UChatMediaPoll {
        if peerId.namespace == Namespaces.Peer.SecretChat {
            return .fail(.generic)
        }
        let inputPoll = Api.InputMedia.inputMediaPoll(poll: Api.Poll.poll(id: 0, flags: 0, question: poll.text, answers: poll.options.map({ $0.apiOption })))
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(inputPoll, text), reuploadInfo: nil)))
// hcl redenvelopes
    } else if let redEnvelopes = media as? UChatMediaRedEnvelopes {
        let input = Api.InputMedia.inputMediaRedPacket(count: 0)
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
    }
    //xlb 分享
    else if let share = media as? UChatMediaShare {
        switch share.type {
            case 1:
                //游戏
                let input = Api.InputShare.inputShareGeme(title: share.gameInfo?.title ?? "", photo: share.gameInfo?.photo ?? "", jumpUrl: share.gameInfo?.jumpUrl ?? "")
                return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .share(input, text), reuploadInfo: nil)))
            case 2:
                //直播
                let input = Api.InputShare.inputShareLive(state: share.livelInfo?.live_stat ?? 0, begin_at: share.livelInfo?.begin_at ?? 0, group_id: share.livelInfo?.group_id ?? 0, live_user: share.livelInfo?.live_user ?? 0, live_flag: share.livelInfo?.live_flag ?? "", live_name: share.livelInfo?.live_name ?? "", live_icon: share.livelInfo?.live_icon ?? "")
                return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .share(input, text), reuploadInfo: nil)))
        default:
            return nil
        }
       
    } else {
        return nil
    }
}

uploadedMediaFileContent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
private func uploadedMediaFileContent(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, messageId: MessageId?, text: String, attributes: [MessageAttribute], file: UChatMediaFile) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> {
  // fetch in local cache first
    return maybePredownloadedFileResource(postbox: postbox, auxiliaryMethods: auxiliaryMethods, peerId: peerId, resource: file.resource, forceRefresh: forceReupload)
    |> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                    // reference key
        var referenceKey: CachedSentMediaReferenceKey?
        switch result {
            case let .media(media):
                if !forceReupload, let file = media as? UChatMediaFile, let resource = file.resource as? CloudDocumentMediaResource, let fileReference = resource.fileReference {
                    return .single(.progress(1.0))
                    |> then(
                        .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)))
                    )
                }
            case let .localReference(key):
                referenceKey = key
            case .none:
                referenceKey = nil
        }
        
                    // hintFileIsLarge
        var hintFileIsLarge = false
                    // hintSize
        var hintSize: Int?
        if let size = file.size {
            hintSize = size
        } else if let resource = file.resource as? LocalFileReferenceMediaResource, let size = resource.size {
            hintSize = Int(size)
        }
                    // mp4
        if (file.resource.headerSize != 0 || file.mimeType.hasPrefix("video/mp4")) && !file.isAnimated {
            hintFileIsLarge = true
        }
                    // file reference
        let fileReference: AnyMediaReference
        if let partialReference = file.partialReference {
            fileReference = partialReference.mediaReference(file)
        } else {
            fileReference = .standalone(media: file)
        }
        let upload = messageMediaPreuploadManager.upload(network: network, postbox: postbox, source: .resource(fileReference.resourceReference(file.resource)), encrypt: peerId.namespace == Namespaces.Peer.SecretChat, tag: UChatMediaResourceFetchTag(statsCategory: statsCategoryForFileWithAttributes(file.attributes)), hintFileSize: hintSize, hintFileIsLarge: hintFileIsLarge)
        |> mapError { _ -> PendingMessageUploadError in return .generic
        }
        var alreadyTransformed = false
        for attribute in attributes {
            if let attribute = attribute as? OutgoingMessageInfoAttribute {
                if attribute.flags.contains(.transformedMedia) {
                    alreadyTransformed = true
                }
            }
        }
    
        let transform: Signal<UploadedMediaTransform, NoError>
        if let transformOutgoingMessageMedia = transformOutgoingMessageMedia, let messageId = messageId, !alreadyTransformed {
            transform = .single(.pending)
            |> then(transformOutgoingMessageMedia(postbox, network, .standalone(media: file), false)
            |> mapToSignal { mediaReference -> Signal<UploadedMediaTransform, NoError> in
                return postbox.transaction { transaction -> UploadedMediaTransform in
                    if let media = mediaReference?.media {
                        if let id = media.id {
                            let _ = transaction.updateMedia(id, update: media)
                            transaction.updateMessage(messageId, update: { currentMessage in
                                var storeForwardInfo: StoreMessageForwardInfo?
                                if let forwardInfo = currentMessage.forwardInfo {
                                    storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: nil)
                                }
                                var updatedAttributes = currentMessage.attributes
                                if let index = updatedAttributes.firstIndex(where: { $0 is OutgoingMessageInfoAttribute }){
                                    let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
                                    updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
                                } else {
                                    updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
                                }
                                return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
                            })
                        }
                        return .done(media)
                    } else {
                        return .done(file)
                    }
                }
            })
        } else {
            transform = .single(.done(file))
        }
    
        let transformedFileAndThumbnail: Signal<UploadedMediaFileAndThumbnail, PendingMessageUploadError> = .single(.pending)
        |> then(transform
        |> mapToSignalPromotingError { media -> Signal<UploadedMediaFileAndThumbnail, PendingMessageUploadError> in
            switch media {
                case .pending:
                    return .single(.pending)
                case let .done(media):
                    if let media = media as? UChatMediaFile, let smallestThumbnail = smallestImageRepresentation(media.previewRepresentations) {
                        if peerId.namespace == Namespaces.Peer.SecretChat {
                            return .single(.done(media, .none))
                        } else {
                            let fileReference: AnyMediaReference
                            if let partialReference = media.partialReference {
                                fileReference = partialReference.mediaReference(media)
                            } else {
                                fileReference = .standalone(media: media)
                            }
                            
                            return uploadedThumbnail(network: network, postbox: postbox, resourceReference: fileReference.resourceReference(smallestThumbnail.resource))
                            |> mapError { _ -> PendingMessageUploadError in return .generic }
                            |> map { result in
                                if let result = result {
                                    return .done(media, .file(result))
                                } else {
                                    return .done(media, .none)
                                }
                            }
                        }
                    } else {
                        return .single(.done(file, .none))
                    }
            }
        })
    
        return combineLatest(upload, transformedFileAndThumbnail)
        |> mapToSignal { content, fileAndThumbnailResult -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
            switch content {
                case let .progress(progress):
                    return .single(.progress(progress))
                case let .inputFile(inputFile):
                    if case let .done(file, thumbnail) = fileAndThumbnailResult {
                        var flags: Int32 = 0
                        
                        var thumbnailFile: Api.InputFile?
                        if case let .file(file) = thumbnail {
                            thumbnailFile = file
                        }
                        
                        if let _ = thumbnailFile {
                            flags |= 1 << 2
                        }
                        
                        var ttlSeconds: Int32?
                        for attribute in attributes {
                            if let attribute = attribute as? AutoremoveTimeoutMessageAttribute {
                                flags |= 1 << 1
                                ttlSeconds = attribute.timeout
                            }
                        }
                        
                        if !file.isAnimated {
                            flags |= 1 << 3
                        }
                        
                        
                        if ttlSeconds != nil  {
                            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
                        }
                        
                        if !isGrouped {
                            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
                        }
                        
                        return postbox.transaction { transaction -> Api.InputPeer? in
                            return transaction.getPeer(peerId).flatMap(apiInputPeer)
                        }
                        |> mapError { _ -> PendingMessageUploadError in return .generic }
                        |> mapToSignal { inputPeer -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                            if let inputPeer = inputPeer {
                                return network.request(Api.functions.messages.uploadMedia(peer: inputPeer, media: .inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds)))
                                |> mapError { _ -> PendingMessageUploadError in return .generic }
                                |> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                                    switch result {
                                        case let .messageMediaDocument(_, document, _):
                                            if let document = document, let mediaFile = uchatMediaFileFromApiDocument(document), let resource = mediaFile.resource as? CloudDocumentMediaResource, let fileReference = resource.fileReference {
                                                return maybeCacheUploadedResource(postbox: postbox, key: referenceKey, result: .content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaDocument(flags: 0, id: .inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)), media: mediaFile)
                                            }
                                        default:
                                            break
                                    }
                                    return .fail(.generic)
                                }
                            } else {
                                return .fail(.generic)
                            }
                        }
                    } else {
                        return .complete()
                    }
                case let .inputSecretFile(file, size, key):
                    if case .done = fileAndThumbnailResult {
                        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(file, size, key), reuploadInfo: nil)))
                    } else {
                        return .complete()
                    }
            }
        }
    }
}

uploadedMediaImageContent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
private func uploadedMediaImageContent(network: Network, postbox: Postbox, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, image: UChatMediaImage, messageId: MessageId?, text: String, attributes: [MessageAttribute], autoremoveAttribute: AutoremoveTimeoutMessageAttribute?) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> {
    guard let largestRepresentation = largestImageRepresentation(image.representations) else {
        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .text(text), reuploadInfo: nil)))
    }
    
    let predownloadedResource: Signal<PredownloadedResource, PendingMessageUploadError> = maybePredownloadedImageResource(postbox: postbox, peerId: peerId, resource: largestRepresentation.resource, forceRefresh: forceReupload)
    return predownloadedResource
    |> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
        var referenceKey: CachedSentMediaReferenceKey?
        switch result {
            case let .media(media):
                if !forceReupload, let image = media as? UChatMediaImage, let reference = image.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
                    var flags: Int32 = 0
                    var ttlSeconds: Int32?
                    if let autoremoveAttribute = autoremoveAttribute {
                        flags |= 1 << 0
                        ttlSeconds = autoremoveAttribute.timeout
                    }
                    return .single(.progress(1.0))
                    |> then(
                        .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaPhoto(flags: flags, id: .inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
                    )
                }
            case let .localReference(key):
                referenceKey = key
            case .none:
                referenceKey = nil
        }
        
        var alreadyTransformed = false
        for attribute in attributes {
            if let attribute = attribute as? OutgoingMessageInfoAttribute {
                if attribute.flags.contains(.transformedMedia) {
                    alreadyTransformed = true
                }
            }
        }
        let transform: Signal<UploadedMediaTransform, NoError>
        if let transformOutgoingMessageMedia = transformOutgoingMessageMedia, let messageId = messageId, !alreadyTransformed {
            transform = .single(.pending)
            |> then(
                transformOutgoingMessageMedia(postbox, network, .standalone(media: image), false)
                |> mapToSignal { mediaReference -> Signal<UploadedMediaTransform, NoError> in
                    return postbox.transaction { transaction -> UploadedMediaTransform in
                        if let media = mediaReference?.media {
                            if let id = media.id {
                                let _ = transaction.updateMedia(id, update: media)
                                transaction.updateMessage(messageId, update: { currentMessage in
                                    var storeForwardInfo: StoreMessageForwardInfo?
                                    if let forwardInfo = currentMessage.forwardInfo {
                                        storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: nil)
                                    }
                                    var updatedAttributes = currentMessage.attributes
                                    if let index = updatedAttributes.firstIndex(where: { $0 is OutgoingMessageInfoAttribute }){
                                        let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
                                        updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
                                    } else {
                                        updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
                                    }
                                    return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
                                })
                            }
                            return .done(media)
                        } else {
                            return .done(image)
                        }
                    }
                }
            )
        } else {
            transform = .single(.done(image))
        }
        
        return transform
        |> mapError { _ -> PendingMessageUploadError in
            return .generic
        }
        |> mapToSignal { transformResult -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
            switch transformResult {
            case .pending:
                return .single(.progress(0.0))
            case let .done(transformedMedia):
                let transformedImage = (transformedMedia as? UChatMediaImage) ?? image
                guard let largestRepresentation = largestImageRepresentation(transformedImage.representations) else {
                    return .fail(.generic)
                }
                let imageReference: AnyMediaReference
                if let partialReference = transformedImage.partialReference {
                    imageReference = partialReference.mediaReference(transformedImage)
                } else {
                    imageReference = .standalone(media: transformedImage)
                }
                return multipartUpload(network: network, postbox: postbox, source: .resource(imageReference.resourceReference(largestRepresentation.resource)), encrypt: peerId.namespace == Namespaces.Peer.SecretChat, tag: UChatMediaResourceFetchTag(statsCategory: .image), hintFileSize: nil, hintFileIsLarge: false)
                |> mapError { _ -> PendingMessageUploadError in return .generic }
                |> mapToSignal { next -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                    switch next {
                        case let .progress(progress):
                            return .single(.progress(progress))
                        case let .inputFile(file):
                            var flags: Int32 = 0
                            var ttlSeconds: Int32?
                            if let autoremoveAttribute = autoremoveAttribute {
                                flags |= 1 << 1
                                ttlSeconds = autoremoveAttribute.timeout
                            }
                            return postbox.transaction { transaction -> Api.InputPeer? in
                                return transaction.getPeer(peerId).flatMap(apiInputPeer)
                            }
                            |> mapError { _ -> PendingMessageUploadError in return .generic }
                            |> mapToSignal { inputPeer -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                                if let inputPeer = inputPeer {
                                    if autoremoveAttribute != nil {
                                        return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedPhoto(flags: flags, file: file, stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
                                    }
                                    
                                    return network.request(Api.functions.messages.uploadMedia(peer: inputPeer, media: Api.InputMedia.inputMediaUploadedPhoto(flags: flags, file: file, stickers: nil, ttlSeconds: ttlSeconds)))
                                    |> mapError { _ -> PendingMessageUploadError in return .generic }
                                    |> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
                                        switch result {
                                            case let .messageMediaPhoto(_, photo, _):
                                                if let photo = photo, let mediaImage = uchatMediaImageFromApiPhoto(photo), let reference = mediaImage.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
                                                    var flags: Int32 = 0
                                                    var ttlSeconds: Int32?
                                                    if let autoremoveAttribute = autoremoveAttribute {
                                                        flags |= 1 << 0
                                                        ttlSeconds = autoremoveAttribute.timeout
                                                    }
                                                    return maybeCacheUploadedResource(postbox: postbox, key: referenceKey, result: .content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaPhoto(flags: flags, id: .inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: ttlSeconds), text), reuploadInfo: nil)), media: mediaImage)
                                                }
                                            default:
                                                break
                                        }
                                        return .fail(.generic)
                                    }
                                } else {
                                    return .fail(.generic)
                                }
                            }
                        case let .inputSecretFile(file, size, key):
                            return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(file, size, key), reuploadInfo: nil)))
                    }
                }
            }
        }
    }
}

beginUploadingMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private func beginUploadingMessage(messageContext: PendingMessageContext, id: MessageId, threadId: Int64?, groupId: Int64?, uploadSignal: Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>) {
    messageContext.state = .uploading(groupId: groupId)

    let status = PendingMessageStatus(isRunning: true, progress: 0.0)
    messageContext.status = status
    for subscriber in messageContext.statusSubscribers.copyItems() {
        subscriber(messageContext.status, messageContext.error)
    }
    let activityCategory: PeerActivitySpace.Category
    if let threadId = threadId {
        activityCategory = .thread(threadId)
    } else {
        activityCategory = .global
    }
    self.addContextActivityIfNeeded(messageContext, peerId: PeerActivitySpace(peerId: id.peerId, category: activityCategory))

    let queue = self.queue

    messageContext.uploadDisposable.set((uploadSignal
    |> deliverOn(queue)
    |> `catch` { [weak self] _ -> Signal<PendingMessageUploadedContentResult, NoError> in
        if let strongSelf = self {
            let modify = strongSelf.postbox.transaction { transaction -> Void in
                transaction.updateMessage(id, update: { currentMessage in
                    var storeForwardInfo: StoreMessageForwardInfo?
                    if let forwardInfo = currentMessage.forwardInfo {
                        storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
                    }
                    return .update(StoreMessage(id: id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
                })
            }
            return modify
            |> mapToSignal { _ in
                return .complete()
            }
        }
        return .complete()
    }
    |> deliverOn(queue)).start(next: { [weak self] next in
        if let strongSelf = self {
            assert(strongSelf.queue.isCurrent())

            switch next {
                case let .progress(progress):
                    if let current = strongSelf.messageContexts[id] {
                        let status = PendingMessageStatus(isRunning: true, progress: progress)
                        current.status = status
                        for subscriber in current.statusSubscribers.copyItems() {
                            subscriber(current.status, current.error)
                        }
                    }
                case let .content(content):
                    if let current = strongSelf.messageContexts[id] {
                      // begin send request after uploading
                        strongSelf.beginSendingMessage(messageContext: current, messageId: id, groupId: groupId, content: content)
                        strongSelf.updateWaitingUploads(peerId: id.peerId)
                        if let groupId = groupId {
                            strongSelf.beginSendingGroupIfPossible(groupId: groupId)
                        }
                    }
            }
        }
    }))
}

addContextActivityIfNeeded

1
2
3
4
5
private func addContextActivityIfNeeded(_ context: PendingMessageContext, peerId: PeerActivitySpace) {
        if let activityType = context.activityType {
            context.activityDisposable.set(self.localInputActivityManager.acquireActivity(chatPeerId: peerId, peerId: self.accountPeerId, activity: activityType))
        }
    }

updateWaitingUploads

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private func updateWaitingUploads(peerId: PeerId) {
    assert(self.queue.isCurrent())

    let messageIdsForPeer: [MessageId] = self.messageContexts.keys.filter({ $0.peerId == peerId }).sorted()
    loop: for contextId in messageIdsForPeer {
        let context = self.messageContexts[contextId]!
        if case let .waitingForUploadToStart(groupId, uploadSignal) = context.state {
            if self.canBeginUploadingMessage(id: contextId, type: context.contentType ?? .media) {
                context.state = .uploading(groupId: groupId)
                let status = PendingMessageStatus(isRunning: true, progress: 0.0)
                context.status = status
                for subscriber in context.statusSubscribers.copyItems() {
                    subscriber(context.status, context.error)
                }

                let activityCategory: PeerActivitySpace.Category
                if let threadId = context.threadId {
                    activityCategory = .thread(threadId)
                } else {
                    activityCategory = .global
                }

                self.addContextActivityIfNeeded(context, peerId: PeerActivitySpace(peerId: peerId, category: activityCategory))
                context.uploadDisposable.set((uploadSignal
                |> deliverOn(self.queue)).start(next: { [weak self] next in
                    if let strongSelf = self {
                        assert(strongSelf.queue.isCurrent())

                        switch next {
                            case let .progress(progress):
                                if let current = strongSelf.messageContexts[contextId] {
                                    let status = PendingMessageStatus(isRunning: true, progress: progress)
                                    current.status = status
                                    for subscriber in current.statusSubscribers.copyItems() {
                                        subscriber(context.status, context.error)
                                    }
                                }
                            case let .content(content):
                                if let current = strongSelf.messageContexts[contextId] {
                                    strongSelf.beginSendingMessage(messageContext: current, messageId: contextId, groupId: groupId, content: content)
                                    if let groupId = groupId {
                                        strongSelf.beginSendingGroupIfPossible(groupId: groupId)
                                    }
                                    strongSelf.updateWaitingUploads(peerId: peerId)
                                }
                        }
                    }
                }))
            }
            break loop
        }
    }
}

sendGroupMessagesContent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
private func sendGroupMessagesContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, accountPeerId: PeerId, group: [(messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) -> Signal<Void, NoError> {
    let queue = self.queue
    return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
        if group.isEmpty {
            return .complete()
        }

        let peerId = group[0].messageId.peerId

        var messages: [(Message, PendingMessageUploadedContentAndReuploadInfo)] = []
        for (id, content) in group {
            if let message = transaction.getMessage(id) {
                messages.append((message, content))
            } else {
                return failMessages(postbox: postbox, ids: group.map { $0.0 })
            }
        }

        messages.sort { $0.0.index < $1.0.index }

        if peerId.namespace == Namespaces.Peer.SecretChat {
            for (message, content) in messages {
                PendingMessageManager.sendSecretMessageContent(transaction: transaction, message: message, content: content)
            }

            return .complete()
        } else if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) {
            var isForward = false
            var hideSendersNames = false
            var hideCaptions = false
            var replyMessageId: Int32?
            var scheduleTime: Int32?
            var sendAsPeerId: PeerId?

            var flags: Int32 = 0

            for attribute in messages[0].0.attributes {
                if let replyAttribute = attribute as? ReplyMessageAttribute {
                    replyMessageId = replyAttribute.messageId.id
                } else if let _ = attribute as? ForwardSourceInfoAttribute {
                    isForward = true
                } else if let attribute = attribute as? NotificationInfoMessageAttribute {
                    if attribute.flags.contains(.muted) {
                        flags |= Int32(1 << 5)
                    }
                } else if let attribute = attribute as? OutgoingScheduleInfoMessageAttribute {
                    flags |= Int32(1 << 10)
                    scheduleTime = attribute.scheduleTime
                } else if let attribute = attribute as? ForwardOptionsMessageAttribute {
                    hideSendersNames = attribute.hideNames
                    hideCaptions = attribute.hideCaptions
                } else if let attribute = attribute as? SendAsMessageAttribute {
                    sendAsPeerId = attribute.peerId
                }
            }

            let sendMessageRequest: Signal<Api.Updates, MTRpcError>
            if isForward {
                if messages.contains(where: { $0.0.groupingKey != nil }) {
                    flags |= (1 << 9)
                }
                if hideSendersNames {
                    flags |= (1 << 11)
                }
                if hideCaptions {
                    flags |= (1 << 12)
                }

                var sendAsInputPeer: Api.InputPeer?
                if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
                    sendAsInputPeer = inputPeer
                    flags |= (1 << 13)
                }

                var forwardIds: [(MessageId, Int64)] = []
                for (message, content) in messages {
                    var uniqueId: Int64?
                    inner: for attribute in message.attributes {
                        if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
                            uniqueId = outgoingInfo.uniqueId
                            break inner
                        }
                    }

                    if let uniqueId = uniqueId {
                        switch content.content {
                            case let .forward(forwardAttribute):
                                forwardIds.append((forwardAttribute.messageId, uniqueId))
                            default:
                                assertionFailure()
                                return .complete()
                        }
                    } else {
                        return .complete()
                    }
                }
                let forwardPeerIds = Set(forwardIds.map { $0.0.peerId })
                if forwardPeerIds.count != 1 {
                    assertionFailure()
                    sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward peer ids"))
                } else if let inputSourcePeerId = forwardPeerIds.first, let inputSourcePeer = transaction.getPeer(inputSourcePeerId).flatMap(apiInputPeer) {
                    let dependencyTag = PendingMessageRequestDependencyTag(messageId: messages[0].0.id)

                    sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: flags, fromPeer: inputSourcePeer, id: forwardIds.map { $0.0.id }, randomId: forwardIds.map { $0.1 }, toPeer: inputPeer, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
                } else {
                    assertionFailure()
                    sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward source"))
                }
            } else {
                flags |= (1 << 7)
                if let _ = replyMessageId {
                    flags |= Int32(1 << 0)
                }

                var sendAsInputPeer: Api.InputPeer?
                if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
                    sendAsInputPeer = inputPeer
                    flags |= (1 << 13)
                }

                var singleMedias: [Api.InputSingleMedia] = []
                for (message, content) in messages {
                    var uniqueId: Int64?
                    inner: for attribute in message.attributes {
                        if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
                            uniqueId = outgoingInfo.uniqueId
                            break inner
                        }
                    }
                    if let uniqueId = uniqueId {
                        switch content.content {
                            case let .media(inputMedia, text):
                                var messageEntities: [Api.MessageEntity]?
                                for attribute in message.attributes {
                                    if let attribute = attribute as? TextEntitiesMessageAttribute {
                                        messageEntities = apiTextAttributeEntities(attribute, associatedPeers: message.peers)
                                    }
                                }

                                var singleFlags: Int32 = 0
                                if let _ = messageEntities {
                                    singleFlags |= 1 << 0
                                }

                                singleMedias.append(.inputSingleMedia(flags: singleFlags, media: inputMedia, randomId: uniqueId, message: text, entities: messageEntities))
                            default:
                                return failMessages(postbox: postbox, ids: group.map { $0.0 })
                        }
                    } else {
                        return failMessages(postbox: postbox, ids: group.map { $0.0 })
                    }
                }

                sendMessageRequest = network.request(Api.functions.messages.sendMultiMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, multiMedia: singleMedias, scheduleDate: scheduleTime, sendAs: sendAsInputPeer))
            }

            return sendMessageRequest
            |> deliverOn(queue)
            |> mapToSignal { result -> Signal<Void, MTRpcError> in
                if let strongSelf = self {
                    return strongSelf.applySentGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages.map { $0.0 }, result: result)
                    |> mapError { _ -> MTRpcError in
                    }
                } else {
                    return .never()
                }
            }
            |> `catch` { error -> Signal<Void, NoError> in
                return deferred {
                    if let strongSelf = self {
                        if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
                            var allFoundAndValid = true
                            for (message, _) in messages {
                                if let context = strongSelf.messageContexts[message.id] {
                                    if context.forcedReuploadOnce {
                                        allFoundAndValid = false
                                        break
                                    }
                                } else {
                                    allFoundAndValid = false
                                    break
                                }
                            }

                            if allFoundAndValid {
                                for (message, _) in messages {
                                    if let context = strongSelf.messageContexts[message.id] {
                                        context.forcedReuploadOnce = true
                                    }
                                }

                                strongSelf.beginSendingMessages(messages.map({ $0.0.id }))
                                return .complete()
                            }
                        } else if let failureReason = reasonForError(error.errorDescription), let message = messages.first?.0 {
                            for (message, _) in messages {
                                if let context = strongSelf.messageContexts[message.id] {
                                    context.error = failureReason
                                    for f in context.statusSubscribers.copyItems() {
                                        f(context.status, context.error)
                                    }
                                }
                            }

                            if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
                                for subscriber in context.messageFailedSubscribers.copyItems() {
                                    subscriber(failureReason)
                                }
                            }
                        }
                    }
                    return failMessages(postbox: postbox, ids: group.map { $0.0 })
                } |> runOn(queue)
            }
        } else {
            assertionFailure()
            return failMessages(postbox: postbox, ids: group.map { $0.0 })
        }
    }
    |> switchToLatest
}

commitSendingMessageGroup

1
2
3
4
5
6
7
8
9
10
11
private func commitSendingMessageGroup(groupId: Int64, messages: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) {
    for (context, _, _) in messages {
        context.state = .sending(groupId: groupId)
    }
    let sendMessage: Signal<PendingMessageResult, NoError> = self.sendGroupMessagesContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, accountPeerId: self.accountPeerId, group: messages.map { ($0.1, $0.2) })
    |> map { next -> PendingMessageResult in
        return .progress(1.0)
    }
    messages[0].0.sendDisposable.set((sendMessage
    |> deliverOn(self.queue)).start())
}

sendSecretMessageContent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static func sendSecretMessageContent(transaction: Transaction, message: Message, content: PendingMessageUploadedContentAndReuploadInfo) {
    var secretFile: SecretChatOutgoingFile?
    switch content.content {
        case let .secretMedia(file, size, key):
            if let fileReference = SecretChatOutgoingFileReference(file) {
                secretFile = SecretChatOutgoingFile(reference: fileReference, size: size, key: key)
            }
        default:
            break
    }

    var layer: SecretChatLayer?
    let state = transaction.getPeerChatState(message.id.peerId) as? SecretChatState
    if let state = state {
        switch state.embeddedState {
            case .terminated, .handshake:
                break
            case .basicLayer:
                layer = .layer8
            case let .sequenceBasedLayer(sequenceState):
                layer = sequenceState.layerNegotiationState.activeLayer.secretChatLayer
        }
    }

    if let state = state, let layer = layer {
        var sentAsAction = false
        for media in message.media {
            if let media = media as? TelegramMediaAction {
                if case let .messageAutoremoveTimeoutUpdated(value) = media.action {
                    sentAsAction = true
                    let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .setMessageAutoremoveTimeout(layer: layer, actionGloballyUniqueId: message.globallyUniqueId!, timeout: value, messageId: message.id), state: state)
                    if updatedState != state {
                        transaction.setPeerChatState(message.id.peerId, state: updatedState)
                    }
                } else if case .historyScreenshot = media.action {
                    sentAsAction = true
                    let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .screenshotMessages(layer: layer, actionGloballyUniqueId: message.globallyUniqueId!, globallyUniqueIds: [], messageId: message.id), state: state)
                    if updatedState != state {
                        transaction.setPeerChatState(message.id.peerId, state: updatedState)
                    }
                }
                break
            }
        }

        if sentAsAction {
            transaction.updateMessage(message.id, update: { currentMessage in
                var flags = StoreMessageFlags(message.flags)
                if !flags.contains(.Failed) {
                    flags.insert(.Sending)
                }
                var storeForwardInfo: StoreMessageForwardInfo?
                if let forwardInfo = currentMessage.forwardInfo {
                    storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
                }
                return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: flags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
            })
        } else {
            let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .sendMessage(layer: layer, id: message.id, file: secretFile), state: state)
            if updatedState != state {
                transaction.setPeerChatState(message.id.peerId, state: updatedState)
            }
            transaction.updateMessage(message.id, update: { currentMessage in
                var flags = StoreMessageFlags(message.flags)
                if !flags.contains(.Failed) {
                    flags.insert(.Sending)
                }
                var storeForwardInfo: StoreMessageForwardInfo?
                if let forwardInfo = currentMessage.forwardInfo {
                    storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
                }
                return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: flags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
            })
        }
    } else {
        transaction.updateMessage(message.id, update: { currentMessage in
            var storeForwardInfo: StoreMessageForwardInfo?
            if let forwardInfo = currentMessage.forwardInfo {
                storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
            }
            return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
        })
    }
}

sendMessageContent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
private func sendMessageContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, accountPeerId: PeerId, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) -> Signal<Void, NoError> {
    let queue = self.queue
    return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
        guard let message = transaction.getMessage(messageId) else {
            return .complete()
        }

        if messageId.peerId.namespace == Namespaces.Peer.SecretChat {
            PendingMessageManager.sendSecretMessageContent(transaction: transaction, message: message, content: content)
            return .complete()
        } else if let peer = transaction.getPeer(messageId.peerId), let inputPeer = apiInputPeer(peer) {
            var uniqueId: Int64 = 0
            var forwardSourceInfoAttribute: ForwardSourceInfoAttribute?
            var messageEntities: [Api.MessageEntity]?
            var replyMessageId: Int32?
            var scheduleTime: Int32?
            var sendAsPeerId: PeerId?

            var flags: Int32 = 0

            for attribute in message.attributes {
                if let replyAttribute = attribute as? ReplyMessageAttribute {
                    replyMessageId = replyAttribute.messageId.id
                } else if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
                    uniqueId = outgoingInfo.uniqueId
                } else if let attribute = attribute as? ForwardSourceInfoAttribute {
                    forwardSourceInfoAttribute = attribute
                } else if let attribute = attribute as? TextEntitiesMessageAttribute {
                    messageEntities = apiTextAttributeEntities(attribute, associatedPeers: message.peers)
                } else if let attribute = attribute as? OutgoingContentInfoMessageAttribute {
                    if attribute.flags.contains(.disableLinkPreviews) {
                        flags |= Int32(1 << 1)
                    }
                } else if let attribute = attribute as? NotificationInfoMessageAttribute {
                    if attribute.flags.contains(.muted) {
                        flags |= Int32(1 << 5)
                    }
                } else if let attribute = attribute as? OutgoingScheduleInfoMessageAttribute {
                    flags |= Int32(1 << 10)
                    scheduleTime = attribute.scheduleTime
                } else if let attribute = attribute as? SendAsMessageAttribute {
                    sendAsPeerId = attribute.peerId
                }
            }

            if case .forward = content.content {
            } else {
                flags |= (1 << 7)

                if let _ = replyMessageId {
                    flags |= Int32(1 << 0)
                }
                if let _ = messageEntities {
                    flags |= Int32(1 << 3)
                }
            }

            var sendAsInputPeer: Api.InputPeer?
            if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
                sendAsInputPeer = inputPeer
                flags |= (1 << 13)
            }

            let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId)

            let sendMessageRequest: Signal<NetworkRequestResult<Api.Updates>, MTRpcError>
            switch content.content {
                case .text:
                    sendMessageRequest = network.requestWithAdditionalInfo(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), info: .acknowledgement, tag: dependencyTag)
                case let .media(inputMedia, text):
                    sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
                    |> map(NetworkRequestResult.result)
                case let .forward(sourceInfo):
                    if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) {
                        sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: flags, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
                        |> map(NetworkRequestResult.result)
                    } else {
                        sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
                    }
                case let .chatContextResult(chatContextResult):
                    if chatContextResult.hideVia {
                        flags |= Int32(1 << 11)
                    }
                    sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id, scheduleDate: scheduleTime, sendAs: sendAsInputPeer))
                    |> map(NetworkRequestResult.result)
                case .messageScreenshot:
                    sendMessageRequest = network.request(Api.functions.messages.sendScreenshotNotification(peer: inputPeer, replyToMsgId: replyMessageId ?? 0, randomId: uniqueId))
                    |> map(NetworkRequestResult.result)
                case .secretMedia:
                    assertionFailure()
                    sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
            }

            return sendMessageRequest
            |> deliverOn(queue)
            |> mapToSignal { result -> Signal<Void, MTRpcError> in
                guard let strongSelf = self else {
                    return .never()
                }
                switch result {
                    case .progress:
                        return .complete()
                    case .acknowledged:
                        return strongSelf.applyAcknowledgedMessage(postbox: postbox, message: message)
                        |> mapError { _ -> MTRpcError in
                        }
                    case let .result(result):
                        return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result)
                        |> mapError { _ -> MTRpcError in
                        }
                }
            }
            |> `catch` { error -> Signal<Void, NoError> in
                queue.async {
                    guard let strongSelf = self, let context = strongSelf.messageContexts[messageId] else {
                        return
                    }
                    if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
                        if !context.forcedReuploadOnce {
                            context.forcedReuploadOnce = true
                            strongSelf.beginSendingMessages([messageId])
                            return
                        }
                    } else if let failureReason = reasonForError(error.errorDescription) {
                        if let context = strongSelf.messageContexts[message.id] {
                            context.error = failureReason
                            for f in context.statusSubscribers.copyItems() {
                                f(context.status, context.error)
                            }
                        }

                        if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
                            for subscriber in context.messageFailedSubscribers.copyItems() {
                                subscriber(failureReason)
                            }
                        }
                    }
                    let _ = (postbox.transaction { transaction -> Void in
                        transaction.updateMessage(message.id, update: { currentMessage in
                            var storeForwardInfo: StoreMessageForwardInfo?
                            if let forwardInfo = currentMessage.forwardInfo {
                                storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
                            }
                            return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
                        })
                    }).start()
                }

                return .complete()
            }
        } else {
            return postbox.transaction { transaction -> Void in
                transaction.updateMessage(message.id, update: { currentMessage in
                    var storeForwardInfo: StoreMessageForwardInfo?
                    if let forwardInfo = currentMessage.forwardInfo {
                        storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
                    }
                    return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
                })
            }
        }
    } |> switchToLatest
}

commitSendingSingleMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private func commitSendingSingleMessage(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) {
    messageContext.state = .sending(groupId: nil)
    let sendMessage: Signal<PendingMessageResult, NoError> = self.sendMessageContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, accountPeerId: self.accountPeerId, messageId: messageId, content: content)
    |> map { next -> PendingMessageResult in
        return .progress(1.0)
    }
    messageContext.sendDisposable.set((sendMessage
    |> deliverOn(self.queue)).start(next: { [weak self] next in
        if let strongSelf = self {
            assert(strongSelf.queue.isCurrent())

            switch next {
                case let .progress(progress):
                    if let current = strongSelf.messageContexts[messageId] {
                        let status = PendingMessageStatus(isRunning: true, progress: progress)
                        current.status = status
                        for subscriber in current.statusSubscribers.copyItems() {
                            subscriber(current.status, current.error)
                        }
                    }
            }
        }
    }))
}

applyAcknowledgedMessage

apply after send request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private func applyAcknowledgedMessage(postbox: Postbox, message: Message) -> Signal<Void, NoError> {
    return postbox.transaction { transaction -> Void in
        transaction.updateMessage(message.id, update: { currentMessage in
            var attributes = message.attributes
            var found = false
            for i in 0 ..< attributes.count {
                if let attribute = attributes[i] as? OutgoingMessageInfoAttribute {
                    attributes[i] = attribute.withUpdatedAcknowledged(true)
                    found = true
                    break
                }
            }

            if !found {
                return .skip
            }

            var storeForwardInfo: StoreMessageForwardInfo?
            if let forwardInfo = currentMessage.forwardInfo {
                storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
            }
            return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
        })
    }
}

applySentMessage

apply after send request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private func applySentMessage(postbox: Postbox, stateManager: AccountStateManager, message: Message, result: Api.Updates) -> Signal<Void, NoError> {
    var apiMessage: Api.Message?
    for resultMessage in result.messages {
        if let id = resultMessage.id(namespace: Namespaces.Message.allScheduled.contains(message.id.namespace) ? Namespaces.Message.ScheduledCloud : Namespaces.Message.Cloud) {
            if id.peerId == message.id.peerId {
                apiMessage = resultMessage
                break
            }
        }
    }

    var namespace = Namespaces.Message.Cloud
    if let apiMessage = apiMessage, let id = apiMessage.id(namespace: message.scheduleTime != nil && message.scheduleTime == apiMessage.timestamp ? Namespaces.Message.ScheduledCloud : Namespaces.Message.Cloud) {
        namespace = id.namespace
    }

    return applyUpdateMessage(postbox: postbox, stateManager: stateManager, message: message, result: result, accountPeerId: self.accountPeerId)
    |> afterDisposed { [weak self] in
        if let strongSelf = self {
            strongSelf.queue.async {
                if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
                    for subscriber in context.messageDeliveredSubscribers.copyItems() {
                        subscriber(namespace)
                    }
                }
            }
        }
    }
}

applySentGroupMessages

apply after send request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private func applySentGroupMessages(postbox: Postbox, stateManager: AccountStateManager, messages: [Message], result: Api.Updates) -> Signal<Void, NoError> {
    var namespace = Namespaces.Message.Cloud
    if let message = messages.first, let apiMessage = result.messages.first, message.scheduleTime != nil && message.scheduleTime == apiMessage.timestamp {
        namespace = Namespaces.Message.ScheduledCloud
    }

    return applyUpdateGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages, result: result)
    |> afterDisposed { [weak self] in
        if let strongSelf = self {
            strongSelf.queue.async {
                if let message = messages.first, let context = strongSelf.peerSummaryContexts[message.id.peerId] {
                    for subscriber in context.messageDeliveredSubscribers.copyItems() {
                        subscriber(namespace)
                    }
                }
            }
        }
    }
}

deliveredMessageEvents

Listen sending succeed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public func deliveredMessageEvents(peerId: PeerId) -> Signal<MessageId.Namespace, NoError> {
    return Signal { subscriber in
        let disposable = MetaDisposable()

        self.queue.async {
            let summaryContext: PeerPendingMessagesSummaryContext
            if let current = self.peerSummaryContexts[peerId] {
                summaryContext = current
            } else {
                summaryContext = PeerPendingMessagesSummaryContext()
                self.peerSummaryContexts[peerId] = summaryContext
            }

            let index = summaryContext.messageDeliveredSubscribers.add({ namespace in
                subscriber.putNext(namespace)
            })

            disposable.set(ActionDisposable {
                self.queue.async {
                    if let current = self.peerSummaryContexts[peerId] {
                        current.messageDeliveredSubscribers.remove(index)
                        if current.messageDeliveredSubscribers.isEmpty {
                            self.peerSummaryContexts.removeValue(forKey: peerId)
                        }
                    }
                }
            })
        }

        return disposable
    }
}

failedMessageEvents

Listen sending failed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public func failedMessageEvents(peerId: PeerId) -> Signal<PendingMessageFailureReason, NoError> {
    return Signal { subscriber in
        let disposable = MetaDisposable()

        self.queue.async {
            let summaryContext: PeerPendingMessagesSummaryContext
            if let current = self.peerSummaryContexts[peerId] {
                summaryContext = current
            } else {
                summaryContext = PeerPendingMessagesSummaryContext()
                self.peerSummaryContexts[peerId] = summaryContext
            }

            let index = summaryContext.messageFailedSubscribers.add({ reason in
                subscriber.putNext(reason)
            })

            disposable.set(ActionDisposable {
                self.queue.async {
                    if let current = self.peerSummaryContexts[peerId] {
                        current.messageFailedSubscribers.remove(index)
                        if current.messageFailedSubscribers.isEmpty {
                            self.peerSummaryContexts.removeValue(forKey: peerId)
                        }
                    }
                }
            })
        }

        return disposable
    }
}
This post is licensed under CC BY 4.0 by the author.