listen unsent messageIds
1
2
3
self.managedOperationsDisposable.add(postbox.unsentMessageIdsView().start(next: { [weak pendingMessageManager] view in
pendingMessageManager?.updatePendingMessageIds(view.ids)
}))
class
UnsentMessageIdsView
1
let ids: Set<MessageId>
updatePendingMessageIds
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func updatePendingMessageIds(_ messageIds: Set<MessageId>) {
self.queue.async {
let addedMessageIds = messageIds.subtracting(self.pendingMessageIds)
let removedMessageIds = self.pendingMessageIds.subtracting(messageIds)
let removedSecretMessageIds = Set(removedMessageIds.filter({ $0.peerId.namespace == Namespaces.Peer.SecretChat }))
var updateUploadingPeerIds = Set<PeerId>()
var updateUploadingGroupIds = Set<Int64>()
for id in removedMessageIds {
if let context = self.messageContexts[id] {
if let groupId = context.state.groupId {
updateUploadingGroupIds.insert(groupId)
}
context.state = .none
updateUploadingPeerIds.insert(id.peerId)
context.sendDisposable.dispose()
context.uploadDisposable.dispose()
context.activityDisposable.dispose()
if context.status != nil {
context.status = nil
for subscriber in context.statusSubscribers.copyItems() {
subscriber(nil, context.error)
}
}
if context.statusSubscribers.isEmpty {
self.messageContexts.removeValue(forKey: id)
}
}
}
// begin sending if addedMessageIds not empty
if !addedMessageIds.isEmpty {
self.beginSendingMessages(Array(addedMessageIds).sorted())
}
self.pendingMessageIds = messageIds
for peerId in updateUploadingPeerIds {
self.updateWaitingUploads(peerId: peerId)
}
for groupId in updateUploadingGroupIds {
self.beginSendingGroupIfPossible(groupId: groupId)
}
if !removedSecretMessageIds.isEmpty {
let _ = (self.postbox.transaction { transaction -> Set<PeerId> in
var peerIdsWithDeliveredMessages = Set<PeerId>()
for id in removedSecretMessageIds {
if let message = transaction.getMessage(id) {
if message.isSentOrAcknowledged {
peerIdsWithDeliveredMessages.insert(id.peerId)
}
}
}
return peerIdsWithDeliveredMessages
}
|> deliverOn(self.queue)).start(next: { [weak self] peerIdsWithDeliveredMessages in
guard let strongSelf = self else {
return
}
for peerId in peerIdsWithDeliveredMessages {
if let context = strongSelf.peerSummaryContexts[peerId] {
for subscriber in context.messageDeliveredSubscribers.copyItems() {
subscriber()
}
}
}
})
}
var peersWithPendingMessages = Set<PeerId>()
for id in self.pendingMessageIds {
peersWithPendingMessages.insert(id.peerId)
}
self._hasPendingMessages.set(peersWithPendingMessages)
}
}
class
PendingMessageContext
1
2
3
4
5
6
7
8
9
10
var state: PendingMessageState = .none
let uploadDisposable = MetaDisposable()
let sendDisposable = MetaDisposable()
var activityType: PeerInputActivity? = nil
var contentType: PendingMessageUploadedContentType? = nil
let activityDisposable = MetaDisposable()
var status: PendingMessageStatus?
var error: PendingMessageFailureReason?
var statusSubscribers = Bag<(PendingMessageStatus?, PendingMessageFailureReason?) -> Void>()
var forcedReuploadOnce: Bool = false
beginSendingMessages
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private func beginSendingMessages(_ ids: [MessageId]) {
assert(self.queue.isCurrent())
for id in ids.sorted() {
// prepare message context
let messageContext: PendingMessageContext
if let current = self.messageContexts[id] {
messageContext = current
} else {
messageContext = PendingMessageContext()
self.messageContexts[id] = messageContext
}
let status = PendingMessageStatus(isRunning: false, progress: 0.0)
if status != messageContext.status {
messageContext.status = status
for subscriber in messageContext.statusSubscribers.copyItems() {
subscriber(messageContext.status, messageContext.error)
}
}
}
let disposable = MetaDisposable()
let messages = self.postbox.messagesAtIds(ids)
|> deliverOn(self.queue)
|> afterDisposed { [weak self, weak disposable] in
if let strongSelf = self, let strongDisposable = disposable {
strongSelf.beginSendingMessagesDisposables.remove(strongDisposable)
}
}
self.beginSendingMessagesDisposables.add(disposable)
// load messages with ids from postbox
disposable.set(messages.start(next: { [weak self] messages in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
for message in messages.filter({ !$0.flags.contains(.Sending) }).sorted(by: { $0.id < $1.id }) {
guard let messageContext = strongSelf.messageContexts[message.id] else {
continue
}
messageContext.activityType = uploadActivityTypeForMessage(message)
// mark state as collect uploading info
strongSelf.collectUploadingInfo(messageContext: messageContext, message: message)
}
for (messageContext, _) in strongSelf.messageContexts.values.compactMap({ messageContext -> (PendingMessageContext, Message)? in
if case let .collectingInfo(message) = messageContext.state {
return (messageContext, message)
} else {
return nil
}
}).sorted(by: { lhs, rhs in
return lhs.1.index < rhs.1.index
}) {
if case let .collectingInfo(message) = messageContext.state {
// prepare uploading content
let (contentUploadSignal, contentType) = messageContentToUpload(network: strongSelf.network, postbox: strongSelf.postbox, auxiliaryMethods: strongSelf.auxiliaryMethods, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, messageMediaPreuploadManager: strongSelf.messageMediaPreuploadManager, revalidationContext: strongSelf.revalidationContext, forceReupload: messageContext.forcedReuploadOnce, isGrouped: message.groupingKey != nil, message: message)
messageContext.contentType = contentType
if strongSelf.canBeginUploadingMessage(id: message.id, type: contentType) {
// begin uploading
strongSelf.beginUploadingMessage(messageContext: messageContext, id: message.id, groupId: message.groupingKey, uploadSignal: contentUploadSignal)
} else {
// mark state as waiting for upload to start
messageContext.state = .waitingForUploadToStart(groupId: message.groupingKey, upload: contentUploadSignal)
}
}
}
}
}))
}
PendingMessageUploadedContentResult
enum
PendingMessageUploadedContent
1
2
3
4
5
6
case text(String)
case media(Api.InputMedia, String)
case forward(ForwardSourceInfoAttribute)
case chatContextResult(OutgoingChatContextResultMessageAttribute)
case secretMedia(Api.InputEncryptedFile, Int32, SecretFileEncryptionKey)
case messageScreenshot
enum
PendingMessageReuploadInfo
1
case reuploadFile(FileMediaReference)
enum
PendingMessageUploadedContentAndReuploadInfo
1
2
let content: PendingMessageUploadedContent
let reuploadInfo: PendingMessageReuploadInfo?
enum
PendingMessageUploadedContentResult
1
2
case progress(Float)
case content(PendingMessageUploadedContentAndReuploadInfo)
messageContentToUpload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func messageContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, revalidationContext: MediaReferenceRevalidationContext, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, messageId: MessageId?, attributes: [MessageAttribute], text: String, media: [Media]) -> (Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>, PendingMessageUploadedContentType) {
var contextResult: OutgoingChatContextResultMessageAttribute?
var autoremoveAttribute: AutoremoveTimeoutMessageAttribute?
for attribute in attributes {
if let attribute = attribute as? OutgoingChatContextResultMessageAttribute {
if peerId.namespace != Namespaces.Peer.SecretChat {
contextResult = attribute
}
} else if let attribute = attribute as? AutoremoveTimeoutMessageAttribute {
autoremoveAttribute = attribute
}
}
var forwardInfo: ForwardSourceInfoAttribute?
for attribute in attributes {
if let attribute = attribute as? ForwardSourceInfoAttribute {
if peerId.namespace != Namespaces.Peer.SecretChat {
forwardInfo = attribute
}
}
}
if let media = media.first as? UChatMediaAction, media.action == .historyScreenshot {
return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .messageScreenshot, reuploadInfo: nil))), .none)
} else if let forwardInfo = forwardInfo {
return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .forward(forwardInfo), reuploadInfo: nil))), .text)
} else if let contextResult = contextResult {
return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .chatContextResult(contextResult), reuploadInfo: nil))), .text)
} else if let media = media.first, let mediaResult = mediaContentToUpload(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, revalidationContext: revalidationContext, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, media: media, text: text, autoremoveAttribute: autoremoveAttribute, messageId: messageId, attributes: attributes) {
return (mediaResult, .media)
} else {
return (.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .text(text), reuploadInfo: nil))), .text)
}
}
mediaContentToUpload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func mediaContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, revalidationContext: MediaReferenceRevalidationContext, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, media: Media, text: String, autoremoveAttribute: AutoremoveTimeoutMessageAttribute?, messageId: MessageId?, attributes: [MessageAttribute]) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>? {
if let image = media as? UChatMediaImage, let largest = largestImageRepresentation(image.representations) {
if peerId.namespace == Namespaces.Peer.SecretChat, let resource = largest.resource as? SecretFileMediaResource {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(.inputEncryptedFile(id: resource.fileId, accessHash: resource.accessHash), resource.decryptedSize, resource.key), reuploadInfo: nil)))
}
if peerId.namespace != Namespaces.Peer.SecretChat, let reference = image.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaPhoto(flags: 0, id: Api.InputPhoto.inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)))
} else {
return uploadedMediaImageContent(network: network, postbox: postbox, transformOutgoingMessageMedia: transformOutgoingMessageMedia, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, image: image, messageId: messageId, text: text, attributes: attributes, autoremoveAttribute: autoremoveAttribute)
}
} else if let file = media as? UChatMediaFile {
if let resource = file.resource as? CloudDocumentMediaResource {
if peerId.namespace == Namespaces.Peer.SecretChat {
for attribute in file.attributes {
if case let .Sticker(sticker) = attribute {
if let _ = sticker.packReference {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: PendingMessageUploadedContent.text(text), reuploadInfo: nil)))
}
}
}
return uploadedMediaFileContent(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, forceReupload: true, isGrouped: isGrouped, peerId: peerId, messageId: messageId, text: text, attributes: attributes, file: file)
} else {
if forceReupload {
let mediaReference: AnyMediaReference
if file.isSticker {
mediaReference = .standalone(media: file)
} else {
mediaReference = .savedGif(media: file)
}
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: UChatCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: resource)
|> mapError { _ -> PendingMessageUploadError in
return .generic
}
|> mapToSignal { validatedResource -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
if let validatedResource = validatedResource.updatedResource as? UChatCloudMediaResourceWithFileReference, let reference = validatedResource.fileReference {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: reference)), ttlSeconds: nil), text), reuploadInfo: nil)))
} else {
return .fail(.generic)
}
}
}
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: resource.fileReference ?? Data())), ttlSeconds: nil), text), reuploadInfo: nil)))
}
} else {
return uploadedMediaFileContent(network: network, postbox: postbox, auxiliaryMethods: auxiliaryMethods, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messageMediaPreuploadManager: messageMediaPreuploadManager, forceReupload: forceReupload, isGrouped: isGrouped, peerId: peerId, messageId: messageId, text: text, attributes: attributes, file: file)
}
} else if let contact = media as? UChatMediaContact {
let input = Api.InputMedia.inputMediaContact(phoneNumber: contact.phoneNumber, firstName: contact.firstName, lastName: contact.lastName, vcard: contact.vCardData ?? "")
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
}
//xlb 好友名片分享
else if let shareContact = media as? UChatMediaShareContact {
let input = Api.InputMedia.inputMediaShareContact(userId: shareContact.userId ?? 0)
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
} else if let map = media as? UChatMediaMap {
let input: Api.InputMedia
if let liveBroadcastingTimeout = map.liveBroadcastingTimeout {
input = .inputMediaGeoLive(flags: 1 << 1, geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude), period: liveBroadcastingTimeout)
} else if let venue = map.venue {
input = .inputMediaVenue(geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude), title: venue.title, address: venue.address ?? "", provider: venue.provider ?? "", venueId: venue.id ?? "", venueType: venue.type ?? "")
} else {
input = .inputMediaGeoPoint(geoPoint: Api.InputGeoPoint.inputGeoPoint(lat: map.latitude, long: map.longitude))
}
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
} else if let poll = media as? UChatMediaPoll {
if peerId.namespace == Namespaces.Peer.SecretChat {
return .fail(.generic)
}
let inputPoll = Api.InputMedia.inputMediaPoll(poll: Api.Poll.poll(id: 0, flags: 0, question: poll.text, answers: poll.options.map({ $0.apiOption })))
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(inputPoll, text), reuploadInfo: nil)))
// hcl redenvelopes
} else if let redEnvelopes = media as? UChatMediaRedEnvelopes {
let input = Api.InputMedia.inputMediaRedPacket(count: 0)
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(input, text), reuploadInfo: nil)))
}
//xlb 分享
else if let share = media as? UChatMediaShare {
switch share.type {
case 1:
//游戏
let input = Api.InputShare.inputShareGeme(title: share.gameInfo?.title ?? "", photo: share.gameInfo?.photo ?? "", jumpUrl: share.gameInfo?.jumpUrl ?? "")
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .share(input, text), reuploadInfo: nil)))
case 2:
//直播
let input = Api.InputShare.inputShareLive(state: share.livelInfo?.live_stat ?? 0, begin_at: share.livelInfo?.begin_at ?? 0, group_id: share.livelInfo?.group_id ?? 0, live_user: share.livelInfo?.live_user ?? 0, live_flag: share.livelInfo?.live_flag ?? "", live_name: share.livelInfo?.live_name ?? "", live_icon: share.livelInfo?.live_icon ?? "")
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .share(input, text), reuploadInfo: nil)))
default:
return nil
}
} else {
return nil
}
}
uploadedMediaFileContent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
private func uploadedMediaFileContent(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, messageMediaPreuploadManager: MessageMediaPreuploadManager, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, messageId: MessageId?, text: String, attributes: [MessageAttribute], file: UChatMediaFile) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> {
// fetch in local cache first
return maybePredownloadedFileResource(postbox: postbox, auxiliaryMethods: auxiliaryMethods, peerId: peerId, resource: file.resource, forceRefresh: forceReupload)
|> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
// reference key
var referenceKey: CachedSentMediaReferenceKey?
switch result {
case let .media(media):
if !forceReupload, let file = media as? UChatMediaFile, let resource = file.resource as? CloudDocumentMediaResource, let fileReference = resource.fileReference {
return .single(.progress(1.0))
|> then(
.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(Api.InputMedia.inputMediaDocument(flags: 0, id: Api.InputDocument.inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)))
)
}
case let .localReference(key):
referenceKey = key
case .none:
referenceKey = nil
}
// hintFileIsLarge
var hintFileIsLarge = false
// hintSize
var hintSize: Int?
if let size = file.size {
hintSize = size
} else if let resource = file.resource as? LocalFileReferenceMediaResource, let size = resource.size {
hintSize = Int(size)
}
// mp4
if (file.resource.headerSize != 0 || file.mimeType.hasPrefix("video/mp4")) && !file.isAnimated {
hintFileIsLarge = true
}
// file reference
let fileReference: AnyMediaReference
if let partialReference = file.partialReference {
fileReference = partialReference.mediaReference(file)
} else {
fileReference = .standalone(media: file)
}
let upload = messageMediaPreuploadManager.upload(network: network, postbox: postbox, source: .resource(fileReference.resourceReference(file.resource)), encrypt: peerId.namespace == Namespaces.Peer.SecretChat, tag: UChatMediaResourceFetchTag(statsCategory: statsCategoryForFileWithAttributes(file.attributes)), hintFileSize: hintSize, hintFileIsLarge: hintFileIsLarge)
|> mapError { _ -> PendingMessageUploadError in return .generic
}
var alreadyTransformed = false
for attribute in attributes {
if let attribute = attribute as? OutgoingMessageInfoAttribute {
if attribute.flags.contains(.transformedMedia) {
alreadyTransformed = true
}
}
}
let transform: Signal<UploadedMediaTransform, NoError>
if let transformOutgoingMessageMedia = transformOutgoingMessageMedia, let messageId = messageId, !alreadyTransformed {
transform = .single(.pending)
|> then(transformOutgoingMessageMedia(postbox, network, .standalone(media: file), false)
|> mapToSignal { mediaReference -> Signal<UploadedMediaTransform, NoError> in
return postbox.transaction { transaction -> UploadedMediaTransform in
if let media = mediaReference?.media {
if let id = media.id {
let _ = transaction.updateMedia(id, update: media)
transaction.updateMessage(messageId, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: nil)
}
var updatedAttributes = currentMessage.attributes
if let index = updatedAttributes.firstIndex(where: { $0 is OutgoingMessageInfoAttribute }){
let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
} else {
updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
})
}
return .done(media)
} else {
return .done(file)
}
}
})
} else {
transform = .single(.done(file))
}
let transformedFileAndThumbnail: Signal<UploadedMediaFileAndThumbnail, PendingMessageUploadError> = .single(.pending)
|> then(transform
|> mapToSignalPromotingError { media -> Signal<UploadedMediaFileAndThumbnail, PendingMessageUploadError> in
switch media {
case .pending:
return .single(.pending)
case let .done(media):
if let media = media as? UChatMediaFile, let smallestThumbnail = smallestImageRepresentation(media.previewRepresentations) {
if peerId.namespace == Namespaces.Peer.SecretChat {
return .single(.done(media, .none))
} else {
let fileReference: AnyMediaReference
if let partialReference = media.partialReference {
fileReference = partialReference.mediaReference(media)
} else {
fileReference = .standalone(media: media)
}
return uploadedThumbnail(network: network, postbox: postbox, resourceReference: fileReference.resourceReference(smallestThumbnail.resource))
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> map { result in
if let result = result {
return .done(media, .file(result))
} else {
return .done(media, .none)
}
}
}
} else {
return .single(.done(file, .none))
}
}
})
return combineLatest(upload, transformedFileAndThumbnail)
|> mapToSignal { content, fileAndThumbnailResult -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
switch content {
case let .progress(progress):
return .single(.progress(progress))
case let .inputFile(inputFile):
if case let .done(file, thumbnail) = fileAndThumbnailResult {
var flags: Int32 = 0
var thumbnailFile: Api.InputFile?
if case let .file(file) = thumbnail {
thumbnailFile = file
}
if let _ = thumbnailFile {
flags |= 1 << 2
}
var ttlSeconds: Int32?
for attribute in attributes {
if let attribute = attribute as? AutoremoveTimeoutMessageAttribute {
flags |= 1 << 1
ttlSeconds = attribute.timeout
}
}
if !file.isAnimated {
flags |= 1 << 3
}
if ttlSeconds != nil {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
}
if !isGrouped {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
}
return postbox.transaction { transaction -> Api.InputPeer? in
return transaction.getPeer(peerId).flatMap(apiInputPeer)
}
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> mapToSignal { inputPeer -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
if let inputPeer = inputPeer {
return network.request(Api.functions.messages.uploadMedia(peer: inputPeer, media: .inputMediaUploadedDocument(flags: flags, file: inputFile, thumb: thumbnailFile, mimeType: file.mimeType, attributes: inputDocumentAttributesFromFileAttributes(file.attributes), stickers: nil, ttlSeconds: ttlSeconds)))
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
switch result {
case let .messageMediaDocument(_, document, _):
if let document = document, let mediaFile = uchatMediaFileFromApiDocument(document), let resource = mediaFile.resource as? CloudDocumentMediaResource, let fileReference = resource.fileReference {
return maybeCacheUploadedResource(postbox: postbox, key: referenceKey, result: .content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaDocument(flags: 0, id: .inputDocument(id: resource.fileId, accessHash: resource.accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: nil), text), reuploadInfo: nil)), media: mediaFile)
}
default:
break
}
return .fail(.generic)
}
} else {
return .fail(.generic)
}
}
} else {
return .complete()
}
case let .inputSecretFile(file, size, key):
if case .done = fileAndThumbnailResult {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(file, size, key), reuploadInfo: nil)))
} else {
return .complete()
}
}
}
}
}
uploadedMediaImageContent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
private func uploadedMediaImageContent(network: Network, postbox: Postbox, transformOutgoingMessageMedia: TransformOutgoingMessageMedia?, forceReupload: Bool, isGrouped: Bool, peerId: PeerId, image: UChatMediaImage, messageId: MessageId?, text: String, attributes: [MessageAttribute], autoremoveAttribute: AutoremoveTimeoutMessageAttribute?) -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> {
guard let largestRepresentation = largestImageRepresentation(image.representations) else {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .text(text), reuploadInfo: nil)))
}
let predownloadedResource: Signal<PredownloadedResource, PendingMessageUploadError> = maybePredownloadedImageResource(postbox: postbox, peerId: peerId, resource: largestRepresentation.resource, forceRefresh: forceReupload)
return predownloadedResource
|> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
var referenceKey: CachedSentMediaReferenceKey?
switch result {
case let .media(media):
if !forceReupload, let image = media as? UChatMediaImage, let reference = image.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
var flags: Int32 = 0
var ttlSeconds: Int32?
if let autoremoveAttribute = autoremoveAttribute {
flags |= 1 << 0
ttlSeconds = autoremoveAttribute.timeout
}
return .single(.progress(1.0))
|> then(
.single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaPhoto(flags: flags, id: .inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
)
}
case let .localReference(key):
referenceKey = key
case .none:
referenceKey = nil
}
var alreadyTransformed = false
for attribute in attributes {
if let attribute = attribute as? OutgoingMessageInfoAttribute {
if attribute.flags.contains(.transformedMedia) {
alreadyTransformed = true
}
}
}
let transform: Signal<UploadedMediaTransform, NoError>
if let transformOutgoingMessageMedia = transformOutgoingMessageMedia, let messageId = messageId, !alreadyTransformed {
transform = .single(.pending)
|> then(
transformOutgoingMessageMedia(postbox, network, .standalone(media: image), false)
|> mapToSignal { mediaReference -> Signal<UploadedMediaTransform, NoError> in
return postbox.transaction { transaction -> UploadedMediaTransform in
if let media = mediaReference?.media {
if let id = media.id {
let _ = transaction.updateMedia(id, update: media)
transaction.updateMessage(messageId, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: nil)
}
var updatedAttributes = currentMessage.attributes
if let index = updatedAttributes.firstIndex(where: { $0 is OutgoingMessageInfoAttribute }){
let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
} else {
updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
})
}
return .done(media)
} else {
return .done(image)
}
}
}
)
} else {
transform = .single(.done(image))
}
return transform
|> mapError { _ -> PendingMessageUploadError in
return .generic
}
|> mapToSignal { transformResult -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
switch transformResult {
case .pending:
return .single(.progress(0.0))
case let .done(transformedMedia):
let transformedImage = (transformedMedia as? UChatMediaImage) ?? image
guard let largestRepresentation = largestImageRepresentation(transformedImage.representations) else {
return .fail(.generic)
}
let imageReference: AnyMediaReference
if let partialReference = transformedImage.partialReference {
imageReference = partialReference.mediaReference(transformedImage)
} else {
imageReference = .standalone(media: transformedImage)
}
return multipartUpload(network: network, postbox: postbox, source: .resource(imageReference.resourceReference(largestRepresentation.resource)), encrypt: peerId.namespace == Namespaces.Peer.SecretChat, tag: UChatMediaResourceFetchTag(statsCategory: .image), hintFileSize: nil, hintFileIsLarge: false)
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> mapToSignal { next -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
switch next {
case let .progress(progress):
return .single(.progress(progress))
case let .inputFile(file):
var flags: Int32 = 0
var ttlSeconds: Int32?
if let autoremoveAttribute = autoremoveAttribute {
flags |= 1 << 1
ttlSeconds = autoremoveAttribute.timeout
}
return postbox.transaction { transaction -> Api.InputPeer? in
return transaction.getPeer(peerId).flatMap(apiInputPeer)
}
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> mapToSignal { inputPeer -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
if let inputPeer = inputPeer {
if autoremoveAttribute != nil {
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaUploadedPhoto(flags: flags, file: file, stickers: nil, ttlSeconds: ttlSeconds), text), reuploadInfo: nil)))
}
return network.request(Api.functions.messages.uploadMedia(peer: inputPeer, media: Api.InputMedia.inputMediaUploadedPhoto(flags: flags, file: file, stickers: nil, ttlSeconds: ttlSeconds)))
|> mapError { _ -> PendingMessageUploadError in return .generic }
|> mapToSignal { result -> Signal<PendingMessageUploadedContentResult, PendingMessageUploadError> in
switch result {
case let .messageMediaPhoto(_, photo, _):
if let photo = photo, let mediaImage = uchatMediaImageFromApiPhoto(photo), let reference = mediaImage.reference, case let .cloud(id, accessHash, maybeFileReference) = reference, let fileReference = maybeFileReference {
var flags: Int32 = 0
var ttlSeconds: Int32?
if let autoremoveAttribute = autoremoveAttribute {
flags |= 1 << 0
ttlSeconds = autoremoveAttribute.timeout
}
return maybeCacheUploadedResource(postbox: postbox, key: referenceKey, result: .content(PendingMessageUploadedContentAndReuploadInfo(content: .media(.inputMediaPhoto(flags: flags, id: .inputPhoto(id: id, accessHash: accessHash, fileReference: Buffer(data: fileReference)), ttlSeconds: ttlSeconds), text), reuploadInfo: nil)), media: mediaImage)
}
default:
break
}
return .fail(.generic)
}
} else {
return .fail(.generic)
}
}
case let .inputSecretFile(file, size, key):
return .single(.content(PendingMessageUploadedContentAndReuploadInfo(content: .secretMedia(file, size, key), reuploadInfo: nil)))
}
}
}
}
}
}
beginUploadingMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private func beginUploadingMessage(messageContext: PendingMessageContext, id: MessageId, threadId: Int64?, groupId: Int64?, uploadSignal: Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>) {
messageContext.state = .uploading(groupId: groupId)
let status = PendingMessageStatus(isRunning: true, progress: 0.0)
messageContext.status = status
for subscriber in messageContext.statusSubscribers.copyItems() {
subscriber(messageContext.status, messageContext.error)
}
let activityCategory: PeerActivitySpace.Category
if let threadId = threadId {
activityCategory = .thread(threadId)
} else {
activityCategory = .global
}
self.addContextActivityIfNeeded(messageContext, peerId: PeerActivitySpace(peerId: id.peerId, category: activityCategory))
let queue = self.queue
messageContext.uploadDisposable.set((uploadSignal
|> deliverOn(queue)
|> `catch` { [weak self] _ -> Signal<PendingMessageUploadedContentResult, NoError> in
if let strongSelf = self {
let modify = strongSelf.postbox.transaction { transaction -> Void in
transaction.updateMessage(id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
return modify
|> mapToSignal { _ in
return .complete()
}
}
return .complete()
}
|> deliverOn(queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[id] {
let status = PendingMessageStatus(isRunning: true, progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(current.status, current.error)
}
}
case let .content(content):
if let current = strongSelf.messageContexts[id] {
// begin send request after uploading
strongSelf.beginSendingMessage(messageContext: current, messageId: id, groupId: groupId, content: content)
strongSelf.updateWaitingUploads(peerId: id.peerId)
if let groupId = groupId {
strongSelf.beginSendingGroupIfPossible(groupId: groupId)
}
}
}
}
}))
}
addContextActivityIfNeeded
1
2
3
4
5
private func addContextActivityIfNeeded(_ context: PendingMessageContext, peerId: PeerActivitySpace) {
if let activityType = context.activityType {
context.activityDisposable.set(self.localInputActivityManager.acquireActivity(chatPeerId: peerId, peerId: self.accountPeerId, activity: activityType))
}
}
updateWaitingUploads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private func updateWaitingUploads(peerId: PeerId) {
assert(self.queue.isCurrent())
let messageIdsForPeer: [MessageId] = self.messageContexts.keys.filter({ $0.peerId == peerId }).sorted()
loop: for contextId in messageIdsForPeer {
let context = self.messageContexts[contextId]!
if case let .waitingForUploadToStart(groupId, uploadSignal) = context.state {
if self.canBeginUploadingMessage(id: contextId, type: context.contentType ?? .media) {
context.state = .uploading(groupId: groupId)
let status = PendingMessageStatus(isRunning: true, progress: 0.0)
context.status = status
for subscriber in context.statusSubscribers.copyItems() {
subscriber(context.status, context.error)
}
let activityCategory: PeerActivitySpace.Category
if let threadId = context.threadId {
activityCategory = .thread(threadId)
} else {
activityCategory = .global
}
self.addContextActivityIfNeeded(context, peerId: PeerActivitySpace(peerId: peerId, category: activityCategory))
context.uploadDisposable.set((uploadSignal
|> deliverOn(self.queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[contextId] {
let status = PendingMessageStatus(isRunning: true, progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(context.status, context.error)
}
}
case let .content(content):
if let current = strongSelf.messageContexts[contextId] {
strongSelf.beginSendingMessage(messageContext: current, messageId: contextId, groupId: groupId, content: content)
if let groupId = groupId {
strongSelf.beginSendingGroupIfPossible(groupId: groupId)
}
strongSelf.updateWaitingUploads(peerId: peerId)
}
}
}
}))
}
break loop
}
}
}
sendGroupMessagesContent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
private func sendGroupMessagesContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, accountPeerId: PeerId, group: [(messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) -> Signal<Void, NoError> {
let queue = self.queue
return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
if group.isEmpty {
return .complete()
}
let peerId = group[0].messageId.peerId
var messages: [(Message, PendingMessageUploadedContentAndReuploadInfo)] = []
for (id, content) in group {
if let message = transaction.getMessage(id) {
messages.append((message, content))
} else {
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
}
messages.sort { $0.0.index < $1.0.index }
if peerId.namespace == Namespaces.Peer.SecretChat {
for (message, content) in messages {
PendingMessageManager.sendSecretMessageContent(transaction: transaction, message: message, content: content)
}
return .complete()
} else if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) {
var isForward = false
var hideSendersNames = false
var hideCaptions = false
var replyMessageId: Int32?
var scheduleTime: Int32?
var sendAsPeerId: PeerId?
var flags: Int32 = 0
for attribute in messages[0].0.attributes {
if let replyAttribute = attribute as? ReplyMessageAttribute {
replyMessageId = replyAttribute.messageId.id
} else if let _ = attribute as? ForwardSourceInfoAttribute {
isForward = true
} else if let attribute = attribute as? NotificationInfoMessageAttribute {
if attribute.flags.contains(.muted) {
flags |= Int32(1 << 5)
}
} else if let attribute = attribute as? OutgoingScheduleInfoMessageAttribute {
flags |= Int32(1 << 10)
scheduleTime = attribute.scheduleTime
} else if let attribute = attribute as? ForwardOptionsMessageAttribute {
hideSendersNames = attribute.hideNames
hideCaptions = attribute.hideCaptions
} else if let attribute = attribute as? SendAsMessageAttribute {
sendAsPeerId = attribute.peerId
}
}
let sendMessageRequest: Signal<Api.Updates, MTRpcError>
if isForward {
if messages.contains(where: { $0.0.groupingKey != nil }) {
flags |= (1 << 9)
}
if hideSendersNames {
flags |= (1 << 11)
}
if hideCaptions {
flags |= (1 << 12)
}
var sendAsInputPeer: Api.InputPeer?
if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
sendAsInputPeer = inputPeer
flags |= (1 << 13)
}
var forwardIds: [(MessageId, Int64)] = []
for (message, content) in messages {
var uniqueId: Int64?
inner: for attribute in message.attributes {
if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
uniqueId = outgoingInfo.uniqueId
break inner
}
}
if let uniqueId = uniqueId {
switch content.content {
case let .forward(forwardAttribute):
forwardIds.append((forwardAttribute.messageId, uniqueId))
default:
assertionFailure()
return .complete()
}
} else {
return .complete()
}
}
let forwardPeerIds = Set(forwardIds.map { $0.0.peerId })
if forwardPeerIds.count != 1 {
assertionFailure()
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward peer ids"))
} else if let inputSourcePeerId = forwardPeerIds.first, let inputSourcePeer = transaction.getPeer(inputSourcePeerId).flatMap(apiInputPeer) {
let dependencyTag = PendingMessageRequestDependencyTag(messageId: messages[0].0.id)
sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: flags, fromPeer: inputSourcePeer, id: forwardIds.map { $0.0.id }, randomId: forwardIds.map { $0.1 }, toPeer: inputPeer, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
} else {
assertionFailure()
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward source"))
}
} else {
flags |= (1 << 7)
if let _ = replyMessageId {
flags |= Int32(1 << 0)
}
var sendAsInputPeer: Api.InputPeer?
if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
sendAsInputPeer = inputPeer
flags |= (1 << 13)
}
var singleMedias: [Api.InputSingleMedia] = []
for (message, content) in messages {
var uniqueId: Int64?
inner: for attribute in message.attributes {
if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
uniqueId = outgoingInfo.uniqueId
break inner
}
}
if let uniqueId = uniqueId {
switch content.content {
case let .media(inputMedia, text):
var messageEntities: [Api.MessageEntity]?
for attribute in message.attributes {
if let attribute = attribute as? TextEntitiesMessageAttribute {
messageEntities = apiTextAttributeEntities(attribute, associatedPeers: message.peers)
}
}
var singleFlags: Int32 = 0
if let _ = messageEntities {
singleFlags |= 1 << 0
}
singleMedias.append(.inputSingleMedia(flags: singleFlags, media: inputMedia, randomId: uniqueId, message: text, entities: messageEntities))
default:
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
} else {
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
}
sendMessageRequest = network.request(Api.functions.messages.sendMultiMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, multiMedia: singleMedias, scheduleDate: scheduleTime, sendAs: sendAsInputPeer))
}
return sendMessageRequest
|> deliverOn(queue)
|> mapToSignal { result -> Signal<Void, MTRpcError> in
if let strongSelf = self {
return strongSelf.applySentGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages.map { $0.0 }, result: result)
|> mapError { _ -> MTRpcError in
}
} else {
return .never()
}
}
|> `catch` { error -> Signal<Void, NoError> in
return deferred {
if let strongSelf = self {
if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
var allFoundAndValid = true
for (message, _) in messages {
if let context = strongSelf.messageContexts[message.id] {
if context.forcedReuploadOnce {
allFoundAndValid = false
break
}
} else {
allFoundAndValid = false
break
}
}
if allFoundAndValid {
for (message, _) in messages {
if let context = strongSelf.messageContexts[message.id] {
context.forcedReuploadOnce = true
}
}
strongSelf.beginSendingMessages(messages.map({ $0.0.id }))
return .complete()
}
} else if let failureReason = reasonForError(error.errorDescription), let message = messages.first?.0 {
for (message, _) in messages {
if let context = strongSelf.messageContexts[message.id] {
context.error = failureReason
for f in context.statusSubscribers.copyItems() {
f(context.status, context.error)
}
}
}
if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
for subscriber in context.messageFailedSubscribers.copyItems() {
subscriber(failureReason)
}
}
}
}
return failMessages(postbox: postbox, ids: group.map { $0.0 })
} |> runOn(queue)
}
} else {
assertionFailure()
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
}
|> switchToLatest
}
commitSendingMessageGroup
1
2
3
4
5
6
7
8
9
10
11
private func commitSendingMessageGroup(groupId: Int64, messages: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) {
for (context, _, _) in messages {
context.state = .sending(groupId: groupId)
}
let sendMessage: Signal<PendingMessageResult, NoError> = self.sendGroupMessagesContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, accountPeerId: self.accountPeerId, group: messages.map { ($0.1, $0.2) })
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messages[0].0.sendDisposable.set((sendMessage
|> deliverOn(self.queue)).start())
}
sendSecretMessageContent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static func sendSecretMessageContent(transaction: Transaction, message: Message, content: PendingMessageUploadedContentAndReuploadInfo) {
var secretFile: SecretChatOutgoingFile?
switch content.content {
case let .secretMedia(file, size, key):
if let fileReference = SecretChatOutgoingFileReference(file) {
secretFile = SecretChatOutgoingFile(reference: fileReference, size: size, key: key)
}
default:
break
}
var layer: SecretChatLayer?
let state = transaction.getPeerChatState(message.id.peerId) as? SecretChatState
if let state = state {
switch state.embeddedState {
case .terminated, .handshake:
break
case .basicLayer:
layer = .layer8
case let .sequenceBasedLayer(sequenceState):
layer = sequenceState.layerNegotiationState.activeLayer.secretChatLayer
}
}
if let state = state, let layer = layer {
var sentAsAction = false
for media in message.media {
if let media = media as? TelegramMediaAction {
if case let .messageAutoremoveTimeoutUpdated(value) = media.action {
sentAsAction = true
let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .setMessageAutoremoveTimeout(layer: layer, actionGloballyUniqueId: message.globallyUniqueId!, timeout: value, messageId: message.id), state: state)
if updatedState != state {
transaction.setPeerChatState(message.id.peerId, state: updatedState)
}
} else if case .historyScreenshot = media.action {
sentAsAction = true
let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .screenshotMessages(layer: layer, actionGloballyUniqueId: message.globallyUniqueId!, globallyUniqueIds: [], messageId: message.id), state: state)
if updatedState != state {
transaction.setPeerChatState(message.id.peerId, state: updatedState)
}
}
break
}
}
if sentAsAction {
transaction.updateMessage(message.id, update: { currentMessage in
var flags = StoreMessageFlags(message.flags)
if !flags.contains(.Failed) {
flags.insert(.Sending)
}
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: flags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
} else {
let updatedState = addSecretChatOutgoingOperation(transaction: transaction, peerId: message.id.peerId, operation: .sendMessage(layer: layer, id: message.id, file: secretFile), state: state)
if updatedState != state {
transaction.setPeerChatState(message.id.peerId, state: updatedState)
}
transaction.updateMessage(message.id, update: { currentMessage in
var flags = StoreMessageFlags(message.flags)
if !flags.contains(.Failed) {
flags.insert(.Sending)
}
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: flags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
} else {
transaction.updateMessage(message.id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
}
sendMessageContent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
private func sendMessageContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, accountPeerId: PeerId, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) -> Signal<Void, NoError> {
let queue = self.queue
return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
guard let message = transaction.getMessage(messageId) else {
return .complete()
}
if messageId.peerId.namespace == Namespaces.Peer.SecretChat {
PendingMessageManager.sendSecretMessageContent(transaction: transaction, message: message, content: content)
return .complete()
} else if let peer = transaction.getPeer(messageId.peerId), let inputPeer = apiInputPeer(peer) {
var uniqueId: Int64 = 0
var forwardSourceInfoAttribute: ForwardSourceInfoAttribute?
var messageEntities: [Api.MessageEntity]?
var replyMessageId: Int32?
var scheduleTime: Int32?
var sendAsPeerId: PeerId?
var flags: Int32 = 0
for attribute in message.attributes {
if let replyAttribute = attribute as? ReplyMessageAttribute {
replyMessageId = replyAttribute.messageId.id
} else if let outgoingInfo = attribute as? OutgoingMessageInfoAttribute {
uniqueId = outgoingInfo.uniqueId
} else if let attribute = attribute as? ForwardSourceInfoAttribute {
forwardSourceInfoAttribute = attribute
} else if let attribute = attribute as? TextEntitiesMessageAttribute {
messageEntities = apiTextAttributeEntities(attribute, associatedPeers: message.peers)
} else if let attribute = attribute as? OutgoingContentInfoMessageAttribute {
if attribute.flags.contains(.disableLinkPreviews) {
flags |= Int32(1 << 1)
}
} else if let attribute = attribute as? NotificationInfoMessageAttribute {
if attribute.flags.contains(.muted) {
flags |= Int32(1 << 5)
}
} else if let attribute = attribute as? OutgoingScheduleInfoMessageAttribute {
flags |= Int32(1 << 10)
scheduleTime = attribute.scheduleTime
} else if let attribute = attribute as? SendAsMessageAttribute {
sendAsPeerId = attribute.peerId
}
}
if case .forward = content.content {
} else {
flags |= (1 << 7)
if let _ = replyMessageId {
flags |= Int32(1 << 0)
}
if let _ = messageEntities {
flags |= Int32(1 << 3)
}
}
var sendAsInputPeer: Api.InputPeer?
if let sendAsPeerId = sendAsPeerId, let sendAsPeer = transaction.getPeer(sendAsPeerId), let inputPeer = apiInputPeerOrSelf(sendAsPeer, accountPeerId: accountPeerId) {
sendAsInputPeer = inputPeer
flags |= (1 << 13)
}
let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId)
let sendMessageRequest: Signal<NetworkRequestResult<Api.Updates>, MTRpcError>
switch content.content {
case .text:
sendMessageRequest = network.requestWithAdditionalInfo(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), info: .acknowledgement, tag: dependencyTag)
case let .media(inputMedia, text):
sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
|> map(NetworkRequestResult.result)
case let .forward(sourceInfo):
if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) {
sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: flags, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer, scheduleDate: scheduleTime, sendAs: sendAsInputPeer), tag: dependencyTag)
|> map(NetworkRequestResult.result)
} else {
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
}
case let .chatContextResult(chatContextResult):
if chatContextResult.hideVia {
flags |= Int32(1 << 11)
}
sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id, scheduleDate: scheduleTime, sendAs: sendAsInputPeer))
|> map(NetworkRequestResult.result)
case .messageScreenshot:
sendMessageRequest = network.request(Api.functions.messages.sendScreenshotNotification(peer: inputPeer, replyToMsgId: replyMessageId ?? 0, randomId: uniqueId))
|> map(NetworkRequestResult.result)
case .secretMedia:
assertionFailure()
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
}
return sendMessageRequest
|> deliverOn(queue)
|> mapToSignal { result -> Signal<Void, MTRpcError> in
guard let strongSelf = self else {
return .never()
}
switch result {
case .progress:
return .complete()
case .acknowledged:
return strongSelf.applyAcknowledgedMessage(postbox: postbox, message: message)
|> mapError { _ -> MTRpcError in
}
case let .result(result):
return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result)
|> mapError { _ -> MTRpcError in
}
}
}
|> `catch` { error -> Signal<Void, NoError> in
queue.async {
guard let strongSelf = self, let context = strongSelf.messageContexts[messageId] else {
return
}
if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
if !context.forcedReuploadOnce {
context.forcedReuploadOnce = true
strongSelf.beginSendingMessages([messageId])
return
}
} else if let failureReason = reasonForError(error.errorDescription) {
if let context = strongSelf.messageContexts[message.id] {
context.error = failureReason
for f in context.statusSubscribers.copyItems() {
f(context.status, context.error)
}
}
if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
for subscriber in context.messageFailedSubscribers.copyItems() {
subscriber(failureReason)
}
}
}
let _ = (postbox.transaction { transaction -> Void in
transaction.updateMessage(message.id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}).start()
}
return .complete()
}
} else {
return postbox.transaction { transaction -> Void in
transaction.updateMessage(message.id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
}
} |> switchToLatest
}
commitSendingSingleMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private func commitSendingSingleMessage(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) {
messageContext.state = .sending(groupId: nil)
let sendMessage: Signal<PendingMessageResult, NoError> = self.sendMessageContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, accountPeerId: self.accountPeerId, messageId: messageId, content: content)
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messageContext.sendDisposable.set((sendMessage
|> deliverOn(self.queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[messageId] {
let status = PendingMessageStatus(isRunning: true, progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(current.status, current.error)
}
}
}
}
}))
}
applyAcknowledgedMessage
apply after send request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private func applyAcknowledgedMessage(postbox: Postbox, message: Message) -> Signal<Void, NoError> {
return postbox.transaction { transaction -> Void in
transaction.updateMessage(message.id, update: { currentMessage in
var attributes = message.attributes
var found = false
for i in 0 ..< attributes.count {
if let attribute = attributes[i] as? OutgoingMessageInfoAttribute {
attributes[i] = attribute.withUpdatedAcknowledged(true)
found = true
break
}
}
if !found {
return .skip
}
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
}
}
applySentMessage
apply after send request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private func applySentMessage(postbox: Postbox, stateManager: AccountStateManager, message: Message, result: Api.Updates) -> Signal<Void, NoError> {
var apiMessage: Api.Message?
for resultMessage in result.messages {
if let id = resultMessage.id(namespace: Namespaces.Message.allScheduled.contains(message.id.namespace) ? Namespaces.Message.ScheduledCloud : Namespaces.Message.Cloud) {
if id.peerId == message.id.peerId {
apiMessage = resultMessage
break
}
}
}
var namespace = Namespaces.Message.Cloud
if let apiMessage = apiMessage, let id = apiMessage.id(namespace: message.scheduleTime != nil && message.scheduleTime == apiMessage.timestamp ? Namespaces.Message.ScheduledCloud : Namespaces.Message.Cloud) {
namespace = id.namespace
}
return applyUpdateMessage(postbox: postbox, stateManager: stateManager, message: message, result: result, accountPeerId: self.accountPeerId)
|> afterDisposed { [weak self] in
if let strongSelf = self {
strongSelf.queue.async {
if let context = strongSelf.peerSummaryContexts[message.id.peerId] {
for subscriber in context.messageDeliveredSubscribers.copyItems() {
subscriber(namespace)
}
}
}
}
}
}
applySentGroupMessages
apply after send request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private func applySentGroupMessages(postbox: Postbox, stateManager: AccountStateManager, messages: [Message], result: Api.Updates) -> Signal<Void, NoError> {
var namespace = Namespaces.Message.Cloud
if let message = messages.first, let apiMessage = result.messages.first, message.scheduleTime != nil && message.scheduleTime == apiMessage.timestamp {
namespace = Namespaces.Message.ScheduledCloud
}
return applyUpdateGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages, result: result)
|> afterDisposed { [weak self] in
if let strongSelf = self {
strongSelf.queue.async {
if let message = messages.first, let context = strongSelf.peerSummaryContexts[message.id.peerId] {
for subscriber in context.messageDeliveredSubscribers.copyItems() {
subscriber(namespace)
}
}
}
}
}
}
deliveredMessageEvents
Listen sending succeed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public func deliveredMessageEvents(peerId: PeerId) -> Signal<MessageId.Namespace, NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.queue.async {
let summaryContext: PeerPendingMessagesSummaryContext
if let current = self.peerSummaryContexts[peerId] {
summaryContext = current
} else {
summaryContext = PeerPendingMessagesSummaryContext()
self.peerSummaryContexts[peerId] = summaryContext
}
let index = summaryContext.messageDeliveredSubscribers.add({ namespace in
subscriber.putNext(namespace)
})
disposable.set(ActionDisposable {
self.queue.async {
if let current = self.peerSummaryContexts[peerId] {
current.messageDeliveredSubscribers.remove(index)
if current.messageDeliveredSubscribers.isEmpty {
self.peerSummaryContexts.removeValue(forKey: peerId)
}
}
}
})
}
return disposable
}
}
failedMessageEvents
Listen sending failed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public func failedMessageEvents(peerId: PeerId) -> Signal<PendingMessageFailureReason, NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.queue.async {
let summaryContext: PeerPendingMessagesSummaryContext
if let current = self.peerSummaryContexts[peerId] {
summaryContext = current
} else {
summaryContext = PeerPendingMessagesSummaryContext()
self.peerSummaryContexts[peerId] = summaryContext
}
let index = summaryContext.messageFailedSubscribers.add({ reason in
subscriber.putNext(reason)
})
disposable.set(ActionDisposable {
self.queue.async {
if let current = self.peerSummaryContexts[peerId] {
current.messageFailedSubscribers.remove(index)
if current.messageFailedSubscribers.isEmpty {
self.peerSummaryContexts.removeValue(forKey: peerId)
}
}
}
})
}
return disposable
}
}