Home telegram - send message
Post
Cancel

telegram - send message

ChatController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
self.chatDisplayNode.sendMessages = { [weak self] messages, silentPosting, scheduleTime, isAnyMessageTextPartitioned in
    if let strongSelf = self, case let .peer(peerId) = strongSelf.chatLocation {
        strongSelf.commitPurposefulAction()

        if let channel = strongSelf.presentationInterfaceState.renderedPeer?.peer as? UChatChannel, channel.isRestrictedBySlowmode {
            let forwardCount = messages.reduce(0, { count, message -> Int in
                if case .forward = message {
                    return count + 1
                } else {
                    return count
                }
            })

            var errorText: String?
            if forwardCount > 1 {
                errorText = strongSelf.presentationData.strings.Chat_AttachmentMultipleForwardDisabled
            } else if isAnyMessageTextPartitioned {
                errorText = strongSelf.presentationData.strings.Chat_MultipleTextMessagesDisabled
            }

            if let errorText = errorText {
                strongSelf.present(standardTextAlertController(theme: AlertControllerTheme(presentationTheme: strongSelf.presentationData.theme), title: nil, text: errorText, actions: [TextAlertAction(type: .defaultAction, title: strongSelf.presentationData.strings.Common_OK, action: {})]), in: .window(.root))
                return
            }
        }

        let transformedMessages: [EnqueueMessage]
        if let silentPosting = silentPosting {
            transformedMessages = strongSelf.transformEnqueueMessages(messages, silentPosting: silentPosting)
        } else if let scheduleTime = scheduleTime {
            transformedMessages = strongSelf.transformEnqueueMessages(messages, silentPosting: false, scheduleTime: scheduleTime)
        } else {
            transformedMessages = strongSelf.transformEnqueueMessages(messages)
        }

        let _ = (enqueueMessages(account: strongSelf.context.account, peerId: peerId, messages: transformedMessages)
        |> deliverOnMainQueue).start(next: { messageIds in
            if let strongSelf = self {
                if strongSelf.presentationInterfaceState.isScheduledMessages {
                } else {
                    strongSelf.chatDisplayNode.historyNode.scrollToEndOfHistory()
                }
            }
        })

        strongSelf.donateIntent()
    }
}

enqueueMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public func enqueueMessages(account: Account, peerId: PeerId, messages: [EnqueueMessage]) -> Signal<[MessageId?], NoError> {
    let signal: Signal<[(Bool, EnqueueMessage)], NoError>
    if let transformOutgoingMessageMedia = account.transformOutgoingMessageMedia {
        signal = opportunisticallyTransformOutgoingMedia(network: account.network, postbox: account.postbox, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messages: messages, userInteractive: true)
    } else {
        signal = .single(messages.map { (false, $0) })
    }
    return signal
    |> mapToSignal { messages -> Signal<[MessageId?], NoError> in
        return account.postbox.transaction { transaction -> [MessageId?] in
            return enqueueMessages(transaction: transaction, account: account, peerId: peerId, messages: messages)
        }
    }
}

unsentMessageIdsView

1
2
3
4
5
6
7
8
9
10
11
public func unsentMessageIdsView() -> Signal<UnsentMessageIdsView, NoError> {
    return Signal { subscriber in
        let disposable = MetaDisposable()
        self.queue.async {
            disposable.set(self.viewTracker.unsentMessageIdsViewSignal().start(next: { view in
                subscriber.putNext(view)
            }))
        }
        return disposable
    }
}

unsentMessageIdsViewSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func unsentMessageIdsViewSignal() -> Signal<UnsentMessageIdsView, NoError> {
    return Signal { subscriber in
        let disposable = MetaDisposable()
        self.queue.async {
            subscriber.putNext(UnsentMessageIdsView(self.unsentMessageView.ids))
            
            let pipe = ValuePipe<UnsentMessageIdsView>()
            let index = self.unsendMessageIdsViewSubscribers.add(pipe)
            
            let pipeDisposable = pipe.signal().start(next: { view in
                subscriber.putNext(view)
            })
            
            disposable.set(ActionDisposable {
                self.queue.async {
                    pipeDisposable.dispose()
                    self.unsendMessageIdsViewSubscribers.remove(index)
                }
            })
        }
        return disposable
    }
}

unsentViewUpdated

1
2
3
4
5
private func unsentViewUpdated() {
    for subscriber in self.unsendMessageIdsViewSubscribers.copyItems() {
        subscriber.putNext(UnsentMessageIdsView(self.unsentMessageView.ids))
    }
}

refreshViewsDueToExternalTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func refreshViewsDueToExternalTransaction(postbox: Postbox, fetchUnsentMessageIds: () -> [MessageId], fetchSynchronizePeerReadStateOperations: () -> [PeerId: PeerReadStateSynchronizationOperation]) {
    var updateTrackedHoles = false
    
    for (mutableView, pipe) in self.messageHistoryViews.copyItems() {
        if mutableView.refreshDueToExternalTransaction(postbox: postbox) {
            pipe.putNext((MessageHistoryView(mutableView), .Generic))
            
            updateTrackedHoles = true
        }
    }
    
    for (mutableView, pipe) in self.chatListViews.copyItems() {
        if mutableView.refreshDueToExternalTransaction(postbox: postbox) {
            mutableView.render(postbox: postbox, renderMessage: self.renderMessage, getPeer: { id in
                return self.getPeer(id)
            }, getPeerNotificationSettings: self.getPeerNotificationSettings, getPeerPresence: self.getPeerPresence)
            pipe.putNext((ChatListView(mutableView), .Generic))
        }
    }
    
    if updateTrackedHoles {
        self.updateTrackedHoles()
    }
    
    if self.unsentMessageView.refreshDueToExternalTransaction(fetchUnsentMessageIds: fetchUnsentMessageIds) {
        self.unsentViewUpdated()
    }
    
    if self.synchronizeReadStatesView.refreshDueToExternalTransaction(fetchSynchronizePeerReadStateOperations: fetchSynchronizePeerReadStateOperations) {
        self.synchronizeReadStateViewUpdated()
    }
    
    for (mutableView, pipe) in self.peerViews.copyItems() {
        if mutableView.reset(postbox: postbox) {
            pipe.putNext(PeerView(mutableView))
        }
    }
}

afterBegin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private func afterBegin() {
    let currentTransactionStateVersion = self.metadataTable.transactionStateVersion()
    if currentTransactionStateVersion != self.transactionStateVersion {
        for table in self.tables {
            table.clearMemoryCache()
        }
        self.viewTracker.refreshViewsDueToExternalTransaction(postbox: self, fetchUnsentMessageIds: {
            return self.messageHistoryUnsentTable.get()
        }, fetchSynchronizePeerReadStateOperations: {
            return self.synchronizeReadStateTable.get(getCombinedPeerReadState: { peerId in
                return self.readStateTable.getCombinedState(peerId)
            })
        })
        self.transactionStateVersion = currentTransactionStateVersion
        
        self.masterClientId.set(.single(self.metadataTable.masterClientId()))
    }
}

updateViews

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
func updateViews(postbox: Postbox, transaction: PostboxTransaction) {
    var updateTrackedHoles = false

    if let currentUpdatedState = transaction.currentUpdatedState {
        for (mutableView, pipe) in self.postboxStateViews.copyItems() {
            if mutableView.replay(updatedState: currentUpdatedState) {
                pipe.putNext(PostboxStateView(mutableView))
            }
        }
    }

    for (mutableView, pipe) in self.messageHistoryViews.copyItems() {
        var updated = false

        let previousPeerIds = mutableView.peerIds

        if mutableView.replay(postbox: postbox, transaction: transaction) {
            updated = true
        }

        var updateType: ViewUpdateType = .Generic
        switch mutableView.peerIds {
        case let .single(peerId):
            for key in transaction.currentPeerHoleOperations.keys {
                if key.peerId == peerId {
                    updateType = .FillHole
                    break
                }
            }
        case .associated:
            var ids = Set<PeerId>()
            switch mutableView.peerIds {
            case .single:
                assertionFailure()
            case let .associated(mainPeerId, associatedId):
                ids.insert(mainPeerId)
                if let associatedId = associatedId {
                    ids.insert(associatedId.peerId)
                }
            }

            if !ids.isEmpty {
                for key in transaction.currentPeerHoleOperations.keys {
                    if ids.contains(key.peerId) {
                        updateType = .FillHole
                        break
                    }
                }
            }
        }

        mutableView.updatePeerIds(transaction: transaction)
        if mutableView.peerIds != previousPeerIds {
            updateType = .UpdateVisible

            let _ = mutableView.refreshDueToExternalTransaction(postbox: postbox)
            updated = true
        }

        if updated {
            updateTrackedHoles = true
            pipe.putNext((MessageHistoryView(mutableView), updateType))
        }
    }

    for (mutableView, pipe) in self.messageViews.copyItems() {
        let operations = transaction.currentOperationsByPeerId[mutableView.messageId.peerId]
        if operations != nil || !transaction.updatedMedia.isEmpty || !transaction.currentUpdatedCachedPeerData.isEmpty {
            if mutableView.replay(operations ?? [], updatedMedia: transaction.updatedMedia, renderIntermediateMessage: self.renderMessage) {
                pipe.putNext(MessageView(mutableView))
            }
        }
    }

    if !transaction.chatListOperations.isEmpty || !transaction.currentUpdatedPeerNotificationSettings.isEmpty || !transaction.currentUpdatedPeers.isEmpty || !transaction.currentInvalidateMessageTagSummaries.isEmpty || !transaction.currentUpdatedMessageTagSummaries.isEmpty || !transaction.currentOperationsByPeerId.isEmpty || transaction.replacedAdditionalChatListItems != nil || !transaction.currentUpdatedPeerPresences.isEmpty {
        for (mutableView, pipe) in self.chatListViews.copyItems() {
            let context = MutableChatListViewReplayContext()
            if mutableView.replay(postbox: postbox, operations: transaction.chatListOperations, updatedPeerNotificationSettings: transaction.currentUpdatedPeerNotificationSettings, updatedPeers: transaction.currentUpdatedPeers, updatedPeerPresences: transaction.currentUpdatedPeerPresences, transaction: transaction, context: context) {
                mutableView.complete(postbox: postbox, context: context)
                mutableView.render(postbox: postbox, renderMessage: self.renderMessage, getPeer: { id in
                    return self.getPeer(id)
                }, getPeerNotificationSettings: self.getPeerNotificationSettings, getPeerPresence: self.getPeerPresence)
                pipe.putNext((ChatListView(mutableView), .Generic))
            }
        }

        self.updateTrackedChatListHoles()
    }

    if updateTrackedHoles {
        self.updateTrackedHoles()
    }

    if self.unsentMessageView.replay(transaction.unsentMessageOperations) {
        self.unsentViewUpdated()
    }

    if self.synchronizeReadStatesView.replay(transaction.updatedSynchronizePeerReadStateOperations) {
        self.synchronizeReadStateViewUpdated()
    }

    for (view, pipe) in self.unreadMessageCountsViews.copyItems() {
        if view.replay(postbox: postbox, transaction: transaction) {
            pipe.putNext(UnreadMessageCountsView(view))
        }
    }

    if let replaceContactPeerIds = transaction.replaceContactPeerIds {
        for (mutableView, pipe) in self.contactPeerIdsViews.copyItems() {
            if mutableView.replay(updateRemoteTotalCount: transaction.replaceRemoteContactCount, replace: replaceContactPeerIds) {
                pipe.putNext(ContactPeerIdsView(mutableView))
            }
        }
    }

    for (mutableView, pipe) in self.contactPeersViews.copyItems() {
        if mutableView.replay(replacePeerIds: transaction.replaceContactPeerIds, updatedPeerPresences: transaction.currentUpdatedPeerPresences, getPeer: self.getPeer, getPeerPresence: self.getPeerPresence) {
            pipe.putNext(ContactPeersView(mutableView))
        }
    }

    /// Andrew Young - 我的分组
    for (mutableView, pipe) in self.contactGroupsViews.copyItems() {
        if mutableView.replay(updatedGroups: transaction.currentUpdatedGroups, updatedPeerPresences: transaction.currentUpdatedPeerPresences, getContactGroup: self.getContactGroup, getPeer: self.getPeer, getPeerPresence: self.getPeerPresence) {
            pipe.putNext(ContactGroupsView(mutableView))
        }
    }

    for (mutableView, pipe) in self.peerViews.copyItems() {
        if mutableView.replay(postbox: postbox, transaction: transaction) {
            pipe.putNext(PeerView(mutableView))
        }
    }

    for (mutableView, pipe) in self.peerMergedOperationLogViews.copyItems() {
        if mutableView.replay(operations: transaction.currentPeerMergedOperationLogOperations, getOperations: self.operationLogGetOperations, getTailIndex: self.operationLogGetTailIndex) {
            pipe.putNext(PeerMergedOperationLogView(mutableView))
        }
    }

    for (mutableView, pipe) in self.timestampBasedMessageAttributesViews.copyItems() {
        if mutableView.replay(operations: transaction.currentTimestampBasedMessageAttributesOperations, getHead: self.getTimestampBasedMessageAttributesHead) {
            pipe.putNext(TimestampBasedMessageAttributesView(mutableView))
        }
    }

    for (mutableView, pipe) in self.preferencesViews.copyItems() {
        if mutableView.replay(postbox: postbox, transaction: transaction) {
            pipe.putNext(PreferencesView(mutableView))
        }
    }

    for (mutableView, pipe) in self.multiplePeersViews.copyItems() {
        if mutableView.replay(updatedPeers: transaction.currentUpdatedPeers, updatedPeerPresences: transaction.currentUpdatedPeerPresences) {
            pipe.putNext(MultiplePeersView(mutableView))
        }
    }

    for (mutableView, pipe) in self.itemCollectionsViews.copyItems() {
        if mutableView.replay(postbox: postbox, transaction: transaction) {
            pipe.putNext(ItemCollectionsView(mutableView))
        }
    }

    for (mutableView, pipe) in self.combinedViews.copyItems() {
        if mutableView.replay(postbox: postbox, transaction: transaction) {
            pipe.putNext(mutableView.immutableView())
        }
    }
}

beforeCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private func beforeCommit() -> (updatedTransactionStateVersion: Int64?, updatedMasterClientId: Int64?) {
    self.chatListTable.replay(historyOperationsByPeerId: self.currentOperationsByPeerId, updatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, updatedChatListInclusions: self.currentUpdatedChatListInclusions, messageHistoryTable: self.messageHistoryTable, peerChatInterfaceStateTable: self.peerChatInterfaceStateTable, operations: &self.currentChatListOperations)

    self.peerChatTopTaggedMessageIdsTable.replay(historyOperationsByPeerId: self.currentOperationsByPeerId)

    let alteredInitialPeerCombinedReadStates = self.readStateTable.transactionAlteredInitialPeerCombinedReadStates()
    let updatedPeers = self.peerTable.transactionUpdatedPeers()
    let transactionParticipationInTotalUnreadCountUpdates = self.peerNotificationSettingsTable.transactionParticipationInTotalUnreadCountUpdates(postbox: self)
    self.chatListIndexTable.commitWithTransaction(postbox: self, alteredInitialPeerCombinedReadStates: alteredInitialPeerCombinedReadStates, updatedPeers: updatedPeers, transactionParticipationInTotalUnreadCountUpdates: transactionParticipationInTotalUnreadCountUpdates, updatedRootUnreadState: &self.currentUpdatedTotalUnreadState, updatedGroupTotalUnreadSummaries: &self.currentUpdatedGroupTotalUnreadSummaries, currentUpdatedGroupSummarySynchronizeOperations: &self.currentUpdatedGroupSummarySynchronizeOperations)

    let transaction = PostboxTransaction(currentUpdatedState: self.currentUpdatedState, currentPeerHoleOperations: self.currentPeerHoleOperations, currentOperationsByPeerId: self.currentOperationsByPeerId, chatListOperations: self.currentChatListOperations, currentUpdatedChatListInclusions: self.currentUpdatedChatListInclusions, currentUpdatedPeers: self.currentUpdatedPeers, currentUpdatedPeerNotificationSettings: self.currentUpdatedPeerNotificationSettings, currentUpdatedPeerNotificationBehaviorTimestamps: self.currentUpdatedPeerNotificationBehaviorTimestamps, currentUpdatedCachedPeerData: self.currentUpdatedCachedPeerData, currentUpdatedPeerPresences: currentUpdatedPeerPresences, currentUpdatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, currentUpdatedTotalUnreadState: self.currentUpdatedTotalUnreadState, currentUpdatedTotalUnreadSummaries: self.currentUpdatedGroupTotalUnreadSummaries, alteredInitialPeerCombinedReadStates: alteredInitialPeerCombinedReadStates, currentPeerMergedOperationLogOperations: self.currentPeerMergedOperationLogOperations, currentTimestampBasedMessageAttributesOperations: self.currentTimestampBasedMessageAttributesOperations, unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations, currentUpdatedGroupSummarySynchronizeOperations: self.currentUpdatedGroupSummarySynchronizeOperations, currentPreferencesOperations: self.currentPreferencesOperations, currentOrderedItemListOperations: self.currentOrderedItemListOperations, currentItemCollectionItemsOperations: self.currentItemCollectionItemsOperations, currentItemCollectionInfosOperations: self.currentItemCollectionInfosOperations, currentUpdatedPeerChatStates: self.currentUpdatedPeerChatStates, currentGlobalTagsOperations: self.currentGlobalTagsOperations, currentLocalTagsOperations: self.currentLocalTagsOperations, updatedMedia: self.currentUpdatedMedia, replaceRemoteContactCount: self.currentReplaceRemoteContactCount, replaceContactPeerIds: self.currentReplacedContactPeerIds, currentPendingMessageActionsOperations: self.currentPendingMessageActionsOperations, currentUpdatedMessageActionsSummaries: self.currentUpdatedMessageActionsSummaries, currentUpdatedMessageTagSummaries: self.currentUpdatedMessageTagSummaries, currentInvalidateMessageTagSummaries: self.currentInvalidateMessageTagSummaries, currentUpdatedPendingPeerNotificationSettings: self.currentUpdatedPendingPeerNotificationSettings, replacedAdditionalChatListItems: self.currentReplacedAdditionalChatListItems, updatedNoticeEntryKeys: self.currentUpdatedNoticeEntryKeys, updatedCacheEntryKeys: self.currentUpdatedCacheEntryKeys, currentUpdatedGroups: self.currentUpdatedGroups, currentUpdatedLives: self.currentUpdatedLives, currentUpdatedMasterClientId: currentUpdatedMasterClientId)
    var updatedTransactionState: Int64?
    var updatedMasterClientId: Int64?
    if !transaction.isEmpty {
        self.viewTracker.updateViews(postbox: self, transaction: transaction)
        self.transactionStateVersion = self.metadataTable.incrementTransactionStateVersion()
        updatedTransactionState = self.transactionStateVersion

        if let currentUpdatedMasterClientId = self.currentUpdatedMasterClientId {
            self.metadataTable.setMasterClientId(currentUpdatedMasterClientId)
            updatedMasterClientId = currentUpdatedMasterClientId
        }
    }

    self.currentPeerHoleOperations.removeAll()
    self.currentOperationsByPeerId.removeAll()
    self.currentUpdatedChatListInclusions.removeAll()
    self.currentUpdatedPeers.removeAll()
    self.currentChatListOperations.removeAll()
    self.currentUpdatedChatListInclusions.removeAll()
    self.currentUnsentOperations.removeAll()
    self.currentUpdatedSynchronizeReadStateOperations.removeAll()
    self.currentUpdatedGroupSummarySynchronizeOperations.removeAll()
    self.currentGlobalTagsOperations.removeAll()
    self.currentLocalTagsOperations.removeAll()
    self.currentUpdatedMedia.removeAll()
    self.currentReplaceRemoteContactCount = nil
    self.currentReplacedContactPeerIds = nil
    self.currentReplacedAdditionalChatListItems = nil
    self.currentUpdatedNoticeEntryKeys.removeAll()
    self.currentUpdatedCacheEntryKeys.removeAll()
    self.currentUpdatedMasterClientId = nil
    self.currentUpdatedPeerNotificationSettings.removeAll()
    self.currentUpdatedPeerNotificationBehaviorTimestamps.removeAll()
    self.currentUpdatedCachedPeerData.removeAll()
    self.currentUpdatedPeerPresences.removeAll()
    self.currentUpdatedPeerChatListEmbeddedStates.removeAll()
    self.currentUpdatedTotalUnreadState = nil
    self.currentUpdatedGroupTotalUnreadSummaries.removeAll()
    self.currentPeerMergedOperationLogOperations.removeAll()
    self.currentTimestampBasedMessageAttributesOperations.removeAll()
    self.currentPreferencesOperations.removeAll()
    self.currentOrderedItemListOperations.removeAll()
    self.currentItemCollectionItemsOperations.removeAll()
    self.currentItemCollectionInfosOperations.removeAll()
    self.currentUpdatedPeerChatStates.removeAll()
    self.currentPendingMessageActionsOperations.removeAll()
    self.currentUpdatedMessageActionsSummaries.removeAll()
    self.currentUpdatedMessageTagSummaries.removeAll()
    self.currentInvalidateMessageTagSummaries.removeAll()
    self.currentUpdatedPendingPeerNotificationSettings.removeAll()
    self.currentGroupIdsWithUpdatedReadStats.removeAll()
    /// Andrew Young - 我的分组
    self.currentUpdatedGroups.removeAll()
    /// Andrew Young - 直播用户
    self.currentUpdatedLives.removeAll()

    for table in self.tables {
        table.beforeCommit()
    }

    return (updatedTransactionState, updatedMasterClientId)
}

internalTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private func internalTransaction<T>(_ f: (Transaction) -> T) -> (result: T, updatedTransactionStateVersion: Int64?, updatedMasterClientId: Int64?) {
    self.valueBox.begin()
    self.afterBegin()
    let transaction = Transaction(postbox: self)
    let result = f(transaction)
    transaction.disposed = true
    let (updatedTransactionState, updatedMasterClientId) = self.beforeCommit()
    self.valueBox.commit()
    
    if let currentUpdatedState = self.currentUpdatedState {
        self.statePipe.putNext(currentUpdatedState)
    }
    self.currentUpdatedState = nil
    
    return (result, updatedTransactionState, updatedMasterClientId)
}

transactionSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public func transactionSignal<T, E>(userInteractive: Bool = false, _ f: @escaping(Subscriber<T, E>, Transaction) -> Disposable) -> Signal<T, E> {
    return Signal { subscriber in
        let disposable = MetaDisposable()
        
        let f: () -> Void = {
            self.beginInternalTransaction {
                let (_, updatedTransactionState, updatedMasterClientId) = self.internalTransaction({ transaction in
                    disposable.set(f(subscriber, transaction))
                })
                
                if updatedTransactionState != nil || updatedMasterClientId != nil {
                    //self.pipeNotifier.notify()
                }
                
                if let updatedMasterClientId = updatedMasterClientId {
                    self.masterClientId.set(.single(updatedMasterClientId))
                }
            }
        }
        if userInteractive {
            self.queue.justDispatchWithQoS(qos: DispatchQoS.userInteractive, f)
        } else {
            self.queue.justDispatch(f)
        }
        
        return disposable
    }
}

transaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public func transaction<T>(userInteractive: Bool = false, ignoreDisabled: Bool = false, _ f: @escaping(Transaction) -> T) -> Signal<T, NoError> {
    return Signal { subscriber in
        let f: () -> Void = {
            self.beginInternalTransaction(ignoreDisabled: ignoreDisabled, {
                let (result, updatedTransactionState, updatedMasterClientId) = self.internalTransaction({ transaction in
                    return f(transaction)
                })
                
                if updatedTransactionState != nil || updatedMasterClientId != nil {
                    //self.pipeNotifier.notify()
                }
                
                if let updatedMasterClientId = updatedMasterClientId {
                    self.masterClientId.set(.single(updatedMasterClientId))
                }
                
                subscriber.putNext(result)
                subscriber.putCompletion()
            })
        }
        if self.queue.isCurrent() && Queue.mainQueue().isCurrent() {
            f()
        } else if userInteractive {
            self.queue.justDispatchWithQoS(qos: DispatchQoS.userInteractive, f)
        } else {
            self.queue.justDispatch(f)
        }
        return EmptyDisposable
    }
}

unsentMessageOperations

1
let unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]

class PostboxTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
final class PostboxTransaction {
    let currentUpdatedState: PostboxCoding?
    let currentPeerHoleOperations: [MessageHistoryIndexHoleOperationKey: [MessageHistoryIndexHoleOperation]]
    let currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]]
    let chatListOperations: [PeerGroupId: [ChatListOperation]]
    let currentUpdatedChatListInclusions: [PeerId: PeerChatListInclusion]
    let currentUpdatedPeers: [PeerId: Peer]
    let currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings]
    let currentUpdatedPeerNotificationBehaviorTimestamps: [PeerId: PeerNotificationSettingsBehaviorTimestamp]
    let currentUpdatedCachedPeerData: [PeerId: CachedPeerData]
    let currentUpdatedPeerPresences: [PeerId: PeerPresence]
    let currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?]
    let currentUpdatedTotalUnreadState: ChatListTotalUnreadState?
    let currentUpdatedTotalUnreadSummaries: [PeerGroupId: PeerGroupUnreadCountersCombinedSummary]
    let alteredInitialPeerCombinedReadStates: [PeerId: CombinedPeerReadState]
    let currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation]
    let currentTimestampBasedMessageAttributesOperations: [TimestampBasedMessageAttributesOperation]
    let currentPreferencesOperations: [PreferencesOperation]
    let currentOrderedItemListOperations: [Int32: [OrderedItemListOperation]]
    let currentItemCollectionItemsOperations: [ItemCollectionId: [ItemCollectionItemsOperation]]
    let currentItemCollectionInfosOperations: [ItemCollectionInfosOperation]
    let currentUpdatedPeerChatStates: Set<PeerId>
    let currentGlobalTagsOperations: [GlobalMessageHistoryTagsOperation]
    let currentLocalTagsOperations: [IntermediateMessageHistoryLocalTagsOperation]
    let currentPendingMessageActionsOperations: [PendingMessageActionsOperation]
    let currentUpdatedMessageActionsSummaries: [PendingMessageActionsSummaryKey: Int32]
    let currentUpdatedMessageTagSummaries: [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary]
    let currentInvalidateMessageTagSummaries: [InvalidatedMessageHistoryTagsSummaryEntryOperation]
    let currentUpdatedPendingPeerNotificationSettings: Set<PeerId>
    
    let unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]
    let updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]
    let currentUpdatedGroupSummarySynchronizeOperations: [PeerGroupAndNamespace: Bool]
    let updatedMedia: [MediaId: Media?]
    let replaceRemoteContactCount: Int32?
    let replaceContactPeerIds: Set<PeerId>?
    let currentUpdatedMasterClientId: Int64?
    let replacedAdditionalChatListItems: [PeerId]?
    let updatedNoticeEntryKeys: Set<NoticeEntryKey>
    let updatedCacheEntryKeys: Set<ItemCacheEntryId>
    /// Andrew Young - 我的分组
    let currentUpdatedGroups: [ContactsGroupInfo]
    /// Andrew Young - 直播用户
    let currentUpdatedLives: [PeerLiveUser]
    
    var isEmpty: Bool {
        if currentUpdatedState != nil {
            return false
        }
        if !currentPeerHoleOperations.isEmpty {
            return false
        }
        if !currentOperationsByPeerId.isEmpty {
            return false
        }
        if !chatListOperations.isEmpty {
            return false
        }
        if !currentUpdatedChatListInclusions.isEmpty {
            return false
        }
        if !currentUpdatedPeers.isEmpty {
            return false
        }
        if !currentUpdatedPeerNotificationSettings.isEmpty {
            return false
        }
        if !currentUpdatedPeerNotificationBehaviorTimestamps.isEmpty {
            return false
        }
        if !currentUpdatedCachedPeerData.isEmpty {
            return false
        }
        if !currentUpdatedPeerPresences.isEmpty {
            return false
        }
        if !currentUpdatedPeerChatListEmbeddedStates.isEmpty {
            return false
        }
        if !unsentMessageOperations.isEmpty {
            return false
        }
        if !updatedSynchronizePeerReadStateOperations.isEmpty {
            return false
        }
        if !currentUpdatedGroupSummarySynchronizeOperations.isEmpty {
            return false
        }
        if !updatedMedia.isEmpty {
            return false
        }
        if replaceRemoteContactCount != nil {
            return false
        }
        if let replaceContactPeerIds = replaceContactPeerIds, !replaceContactPeerIds.isEmpty {
            return false
        }
        if currentUpdatedMasterClientId != nil {
            return false
        }
        if currentUpdatedTotalUnreadState != nil {
            return false
        }
        if !currentUpdatedTotalUnreadSummaries.isEmpty {
            return false
        }
        if !alteredInitialPeerCombinedReadStates.isEmpty {
            return false
        }
        if !alteredInitialPeerCombinedReadStates.isEmpty {
            return false
        }
        if !currentPeerMergedOperationLogOperations.isEmpty {
            return false
        }
        if !currentTimestampBasedMessageAttributesOperations.isEmpty {
            return false
        }
        if !currentPreferencesOperations.isEmpty {
            return false
        }
        if !currentOrderedItemListOperations.isEmpty {
            return false
        }
        if !currentItemCollectionItemsOperations.isEmpty {
            return false
        }
        if !currentItemCollectionInfosOperations.isEmpty {
            return false
        }
        if !currentUpdatedPeerChatStates.isEmpty {
            return false
        }
        if !self.currentGlobalTagsOperations.isEmpty {
            return false
        }
        if !self.currentLocalTagsOperations.isEmpty {
            return false
        }
        if !self.currentPendingMessageActionsOperations.isEmpty {
            return false
        }
        if !self.currentUpdatedMessageActionsSummaries.isEmpty {
            return false
        }
        if !self.currentUpdatedMessageTagSummaries.isEmpty {
            return false
        }
        if !self.currentInvalidateMessageTagSummaries.isEmpty {
            return false
        }
        if !self.currentUpdatedPendingPeerNotificationSettings.isEmpty {
            return false
        }
        if replacedAdditionalChatListItems != nil {
            return false
        }
        if !updatedNoticeEntryKeys.isEmpty {
            return false
        }
        if !updatedCacheEntryKeys.isEmpty {
            return false
        }
        /// Andrew Young - 我的分组
        if !currentUpdatedGroups.isEmpty {
            return false
        }
        /// Andrew Young - 直播用户
        if !currentUpdatedLives.isEmpty {
            return false
        }
        return true
    }
    
    init(currentUpdatedState: PostboxCoding?, currentPeerHoleOperations: [MessageHistoryIndexHoleOperationKey: [MessageHistoryIndexHoleOperation]] = [:], currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], chatListOperations: [PeerGroupId: [ChatListOperation]], currentUpdatedChatListInclusions: [PeerId: PeerChatListInclusion], currentUpdatedPeers: [PeerId: Peer], currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings], currentUpdatedPeerNotificationBehaviorTimestamps: [PeerId: PeerNotificationSettingsBehaviorTimestamp], currentUpdatedCachedPeerData: [PeerId: CachedPeerData], currentUpdatedPeerPresences: [PeerId: PeerPresence], currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?], currentUpdatedTotalUnreadState: ChatListTotalUnreadState?, currentUpdatedTotalUnreadSummaries: [PeerGroupId: PeerGroupUnreadCountersCombinedSummary], alteredInitialPeerCombinedReadStates: [PeerId: CombinedPeerReadState], currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation], currentTimestampBasedMessageAttributesOperations: [TimestampBasedMessageAttributesOperation], unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?], currentUpdatedGroupSummarySynchronizeOperations: [PeerGroupAndNamespace: Bool], currentPreferencesOperations: [PreferencesOperation], currentOrderedItemListOperations: [Int32: [OrderedItemListOperation]], currentItemCollectionItemsOperations: [ItemCollectionId: [ItemCollectionItemsOperation]], currentItemCollectionInfosOperations: [ItemCollectionInfosOperation], currentUpdatedPeerChatStates: Set<PeerId>, currentGlobalTagsOperations: [GlobalMessageHistoryTagsOperation], currentLocalTagsOperations: [IntermediateMessageHistoryLocalTagsOperation], updatedMedia: [MediaId: Media?], replaceRemoteContactCount: Int32?, replaceContactPeerIds: Set<PeerId>?, currentPendingMessageActionsOperations: [PendingMessageActionsOperation], currentUpdatedMessageActionsSummaries: [PendingMessageActionsSummaryKey: Int32], currentUpdatedMessageTagSummaries: [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], currentInvalidateMessageTagSummaries: [InvalidatedMessageHistoryTagsSummaryEntryOperation], currentUpdatedPendingPeerNotificationSettings: Set<PeerId>, replacedAdditionalChatListItems: [PeerId]?, updatedNoticeEntryKeys: Set<NoticeEntryKey>, updatedCacheEntryKeys: Set<ItemCacheEntryId>, currentUpdatedGroups: [ContactsGroupInfo], currentUpdatedLives: [PeerLiveUser], currentUpdatedMasterClientId: Int64?) {
        self.currentUpdatedState = currentUpdatedState
        self.currentPeerHoleOperations = currentPeerHoleOperations
        self.currentOperationsByPeerId = currentOperationsByPeerId
        self.chatListOperations = chatListOperations
        self.currentUpdatedChatListInclusions = currentUpdatedChatListInclusions
        self.currentUpdatedPeers = currentUpdatedPeers
        self.currentUpdatedPeerNotificationSettings = currentUpdatedPeerNotificationSettings
        self.currentUpdatedPeerNotificationBehaviorTimestamps = currentUpdatedPeerNotificationBehaviorTimestamps
        self.currentUpdatedCachedPeerData = currentUpdatedCachedPeerData
        self.currentUpdatedPeerPresences = currentUpdatedPeerPresences
        self.currentUpdatedPeerChatListEmbeddedStates = currentUpdatedPeerChatListEmbeddedStates
        self.currentUpdatedTotalUnreadState = currentUpdatedTotalUnreadState
        self.currentUpdatedTotalUnreadSummaries = currentUpdatedTotalUnreadSummaries
        self.alteredInitialPeerCombinedReadStates = alteredInitialPeerCombinedReadStates
        self.currentPeerMergedOperationLogOperations = currentPeerMergedOperationLogOperations
        self.currentTimestampBasedMessageAttributesOperations = currentTimestampBasedMessageAttributesOperations
        self.unsentMessageOperations = unsentMessageOperations
        self.updatedSynchronizePeerReadStateOperations = updatedSynchronizePeerReadStateOperations
        self.currentUpdatedGroupSummarySynchronizeOperations = currentUpdatedGroupSummarySynchronizeOperations
        self.currentPreferencesOperations = currentPreferencesOperations
        self.currentOrderedItemListOperations = currentOrderedItemListOperations
        self.currentItemCollectionItemsOperations = currentItemCollectionItemsOperations
        self.currentItemCollectionInfosOperations = currentItemCollectionInfosOperations
        self.currentUpdatedPeerChatStates = currentUpdatedPeerChatStates
        self.currentGlobalTagsOperations = currentGlobalTagsOperations
        self.currentLocalTagsOperations = currentLocalTagsOperations
        self.updatedMedia = updatedMedia
        self.replaceRemoteContactCount = replaceRemoteContactCount
        self.replaceContactPeerIds = replaceContactPeerIds
        self.currentPendingMessageActionsOperations = currentPendingMessageActionsOperations
        self.currentUpdatedMessageActionsSummaries = currentUpdatedMessageActionsSummaries
        self.currentUpdatedMessageTagSummaries = currentUpdatedMessageTagSummaries
        self.currentInvalidateMessageTagSummaries = currentInvalidateMessageTagSummaries
        self.currentUpdatedPendingPeerNotificationSettings = currentUpdatedPendingPeerNotificationSettings
        self.currentUpdatedMasterClientId = currentUpdatedMasterClientId
        self.replacedAdditionalChatListItems = replacedAdditionalChatListItems
        self.updatedNoticeEntryKeys = updatedNoticeEntryKeys
        self.updatedCacheEntryKeys = updatedCacheEntryKeys
        /// Andrew Young - 我的分组
        self.currentUpdatedGroups = currentUpdatedGroups
        /// Andrew Young - 直播用户
        self.currentUpdatedLives = currentUpdatedLives
    }
}

addMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
fileprivate func addMessages(transaction: Transaction, messages: [StoreMessage], location: AddMessagesLocation) -> [Int64: MessageId] {
    var addedMessagesByPeerId: [PeerId: [StoreMessage]] = [:]
    let addResult = self.messageHistoryTable.addMessages(messages: messages, operationsByPeerId: &self.currentOperationsByPeerId, updatedMedia: &self.currentUpdatedMedia, unsentMessageOperations: &currentUnsentOperations, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations, globalTagsOperations: &self.currentGlobalTagsOperations, pendingActionsOperations: &self.currentPendingMessageActionsOperations, updatedMessageActionsSummaries: &self.currentUpdatedMessageActionsSummaries, updatedMessageTagSummaries: &self.currentUpdatedMessageTagSummaries, invalidateMessageTagSummaries: &self.currentInvalidateMessageTagSummaries, localTagsOperations: &self.currentLocalTagsOperations, processMessages: { messagesByPeerId in
        addedMessagesByPeerId = messagesByPeerId
    })

    for (peerId, peerMessages) in addedMessagesByPeerId {
        switch location {
        case .Random:
            break
        case .UpperHistoryBlock:
            var earliestByNamespace: [MessageId.Namespace: MessageId] = [:]
            for message in peerMessages {
                if case let .Id(id) = message.id {
                    if let currentEarliestId = earliestByNamespace[id.namespace] {
                        if id < currentEarliestId {
                            earliestByNamespace[id.namespace] = id
                        }
                    } else {
                        earliestByNamespace[id.namespace] = id
                    }
                }
            }
            for (_, id) in earliestByNamespace {
                self.messageHistoryHoleIndexTable.remove(peerId: id.peerId, namespace: id.namespace, space: .everywhere, range: id.id ... (Int32.max - 1), operations: &self.currentPeerHoleOperations)
            }
        }

        if let bag = self.installedMessageActionsByPeerId[peerId] {
            for f in bag.copyItems() {
                f(peerMessages, transaction)
            }
        }
    }

    return addResult
}

addMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func addMessages(messages: [StoreMessage], operationsByPeerId: inout [PeerId: [MessageHistoryOperation]], updatedMedia: inout [MediaId: Media?], unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], updatedPeerReadStateOperations: inout [PeerId: PeerReadStateSynchronizationOperation?], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation], processMessages: (([PeerId : [StoreMessage]]) -> Void)?) -> [Int64: MessageId] {
    let messagesByPeerId = self.messagesGroupedByPeerId(messages)
    var globallyUniqueIdToMessageId: [Int64: MessageId] = [:]
    var globalTagsInitialized = Set<GlobalMessageTags>()
    for (peerId, peerMessages) in messagesByPeerId {
        var operations: [MessageHistoryIndexOperation] = []
        let internalPeerMessages = self.internalStoreMessages(peerMessages)
        for message in internalPeerMessages {
            if let globallyUniqueId = message.globallyUniqueId {
                globallyUniqueIdToMessageId[globallyUniqueId] = message.id
            }
            if !message.globalTags.isEmpty {
                for tag in message.globalTags {
                    if !globalTagsInitialized.contains(tag) {
                        self.globalTagsTable.ensureInitialized(tag)
                        globalTagsInitialized.insert(tag)
                    }
                }
            }
        }
        self.messageHistoryIndexTable.addMessages(internalPeerMessages, operations: &operations)

        self.processIndexOperations(peerId, operations: operations, processedOperationsByPeerId: &operationsByPeerId, updatedMedia: &updatedMedia, unsentMessageOperations: &unsentMessageOperations, updatedPeerReadStateOperations: &updatedPeerReadStateOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
    }

    processMessages?(messagesByPeerId)

    return globallyUniqueIdToMessageId
}

processIndexOperations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
private func processIndexOperations(_ peerId: PeerId, operations: [MessageHistoryIndexOperation], processedOperationsByPeerId: inout [PeerId: [MessageHistoryOperation]], updatedMedia: inout [MediaId: Media?], unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], updatedPeerReadStateOperations: inout [PeerId: PeerReadStateSynchronizationOperation?], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) {
    let sharedKey = self.key(MessageIndex(id: MessageId(peerId: PeerId(namespace: 0, id: 0), namespace: 0, id: 0), timestamp: 0))
    let sharedBuffer = WriteBuffer()
    let sharedEncoder = PostboxEncoder()

    var outputOperations: [MessageHistoryOperation] = []
    var accumulatedRemoveIndices: [(MessageIndex)] = []
    var accumulatedAddedIncomingMessageIndices = Set<MessageIndex>()

    var updatedCombinedState: CombinedPeerReadState?
    var invalidateReadState = false

    var updateExistingMedia: [MediaId: Media] = [:]

    let commitAccumulatedAddedIndices: () -> Void = {
        if !accumulatedAddedIncomingMessageIndices.isEmpty {
            let (combinedState, invalidate) = self.readStateTable.addIncomingMessages(peerId, indices: accumulatedAddedIncomingMessageIndices)
            if let combinedState = combinedState {
                updatedCombinedState = combinedState
            }
            if invalidate {
                invalidateReadState = true
            }

            accumulatedAddedIncomingMessageIndices.removeAll()
        }
    }

    for operation in operations {
        switch operation {
            case let .InsertMessage(storeMessage):
                processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)

                let (message, updatedGroupInfos) = self.justInsertMessage(storeMessage, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, localTagsOperations: &localTagsOperations, updateExistingMedia: &updateExistingMedia)
                outputOperations.append(.InsertMessage(message))
                if !updatedGroupInfos.isEmpty {
                    outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
                }

                if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
                    self.unsentTable.add(message.id, operations: &unsentMessageOperations)
                }
                let tags = message.tags.rawValue
                if tags != 0 {
                    for i in 0 ..< 32 {
                        let currentTags = tags >> UInt32(i)
                        if currentTags == 0 {
                            break
                        }

                        if (currentTags & 1) != 0 {
                            let tag = MessageTags(rawValue: 1 << UInt32(i))
                            self.tagsTable.add(tags: tag, index: message.index, updatedSummaries: &updatedMessageTagSummaries, invalidateSummaries: &invalidateMessageTagSummaries)
                        }
                    }
                }
                let globalTags = message.globalTags.rawValue
                if globalTags != 0 {
                    for i in 0 ..< 32 {
                        let currentTags = globalTags >> UInt32(i)
                        if currentTags == 0 {
                            break
                        }

                        if (currentTags & 1) != 0 {
                            let tag = GlobalMessageTags(rawValue: 1 << UInt32(i))
                            if self.globalTagsTable.addMessage(tag, index: message.index) {
                                globalTagsOperations.append(.insertMessage(tag, message))
                            }
                        }
                    }
                }
                if !message.localTags.isEmpty {
                    self.localTagsTable.set(id: message.id, tags: message.localTags, previousTags: [], operations: &localTagsOperations)
                }
                if !message.flags.intersection(.IsIncomingMask).isEmpty {
                    accumulatedAddedIncomingMessageIndices.insert(message.index)
                }
            case let .InsertExistingMessage(storeMessage):
                commitAccumulatedAddedIndices()
                processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)

                var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
                if let (message, previousTags) = self.justUpdate(storeMessage.index, message: storeMessage, keepLocalTags: true, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia) {
                    outputOperations.append(.Remove([(storeMessage.index, previousTags)]))
                    outputOperations.append(.InsertMessage(message))
                    if !updatedGroupInfos.isEmpty {
                        outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
                    }
            }
            case let .Remove(index):
                commitAccumulatedAddedIndices()
                accumulatedRemoveIndices.append(index)
            case let .Update(index, storeMessage):
                commitAccumulatedAddedIndices()
                processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)

                var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
                if let (message, previousTags) = self.justUpdate(index, message: storeMessage, keepLocalTags: false, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia) {
                    outputOperations.append(.Remove([(index, previousTags)]))
                    outputOperations.append(.InsertMessage(message))
                    if !updatedGroupInfos.isEmpty {
                        outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
                    }

                    if !message.flags.intersection(.IsIncomingMask).isEmpty {
                        if index != message.index {
                            accumulatedRemoveIndices.append(index)
                            accumulatedAddedIncomingMessageIndices.insert(message.index)
                        }
                    }
                }
            case let .UpdateTimestamp(index, timestamp):
                commitAccumulatedAddedIndices()
                processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)

                var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
                let tagsAndGlobalTags = self.justUpdateTimestamp(index, timestamp: timestamp, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia)
                outputOperations.append(.UpdateTimestamp(index, timestamp))
                if !updatedGroupInfos.isEmpty {
                    outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
                }
                if let (_, globalTags) = tagsAndGlobalTags {
                    if !globalTags.isEmpty {
                        globalTagsOperations.append(.updateTimestamp(globalTags, index, timestamp))
                    }
                }
        }
    }

    commitAccumulatedAddedIndices()
    processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)

    if let updatedCombinedState = updatedCombinedState {
        outputOperations.append(.UpdateReadState(peerId, updatedCombinedState))
    }

    if invalidateReadState {
        self.synchronizeReadStateTable.set(peerId, operation: .Validate, operations: &updatedPeerReadStateOperations)
    }

    if processedOperationsByPeerId[peerId] == nil {
        processedOperationsByPeerId[peerId] = outputOperations
    } else {
        processedOperationsByPeerId[peerId]!.append(contentsOf: outputOperations)
    }

    for (_, media) in updateExistingMedia {
        if let id = media.id {
            var updatedMessageIndices = Set<MessageIndex>()
            self.updateMedia(id, media: media, operationsByPeerId: &processedOperationsByPeerId, updatedMedia: &updatedMedia, updatedMessageIndices: &updatedMessageIndices)
        }
    }

    //self.debugCheckTagIndexIntegrity(peerId: peerId)
}

processIndexOperationsCommitAccumulatedRemoveIndices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private func processIndexOperationsCommitAccumulatedRemoveIndices(peerId: PeerId, accumulatedRemoveIndices: inout [MessageIndex], updatedCombinedState: inout CombinedPeerReadState?, invalidateReadState: inout Bool, unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], outputOperations: inout [MessageHistoryOperation], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) {
    if !accumulatedRemoveIndices.isEmpty {
        let (combinedState, invalidate) = self.readStateTable.deleteMessages(peerId, indices: accumulatedRemoveIndices, incomingStatsInIndices: { peerId, namespace, indices in
            return self.incomingMessageStatsInIndices(peerId, namespace: namespace, indices: indices)
        })
        if let combinedState = combinedState {
            updatedCombinedState = combinedState
        }
        if invalidate {
            invalidateReadState = true
        }

        let buckets = self.continuousIndexIntervalsForRemoving(accumulatedRemoveIndices)
        for bucket in buckets {
            var indicesWithMetadata: [(MessageIndex, MessageTags)] = []
            var globalIndicesWithMetadata: [(GlobalMessageTags, MessageIndex)] = []

            for index in bucket {
                let tagsAndGlobalTags = self.justRemove(index, unsentMessageOperations: &unsentMessageOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
                if let (tags, globalTags) = tagsAndGlobalTags {
                    indicesWithMetadata.append((index, tags))

                    if !globalTags.isEmpty {
                        globalIndicesWithMetadata.append((globalTags, index))
                    }
                } else {
                    indicesWithMetadata.append((index, MessageTags()))
                }
            }
            assert(bucket.count == indicesWithMetadata.count)
            outputOperations.append(.Remove(indicesWithMetadata))
            if !globalIndicesWithMetadata.isEmpty {
                globalTagsOperations.append(.remove(globalIndicesWithMetadata))
            }
            var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
            self.maybeCombineGroupsInNamespace(at: bucket[0], updatedGroupInfos: &updatedGroupInfos)
            if !updatedGroupInfos.isEmpty {
                outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
            }
        }

        accumulatedRemoveIndices.removeAll()
    }
}

justRemove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private func justRemove(_ index: MessageIndex, unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) -> (MessageTags, GlobalMessageTags)? {
    let key = self.key(index)
    if let value = self.valueBox.get(self.table, key: key) {
        let resultTags: MessageTags
        let resultGlobalTags: GlobalMessageTags
        let message = self.readIntermediateEntry(key, value: value).message
        let embeddedMediaData = message.embeddedMediaData
        if embeddedMediaData.length > 4 {
            var embeddedMediaCount: Int32 = 0
            embeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4)
            for _ in 0 ..< embeddedMediaCount {
                var mediaLength: Int32 = 0
                embeddedMediaData.read(&mediaLength, offset: 0, length: 4)
                if let media = PostboxDecoder(buffer: MemoryBuffer(memory: embeddedMediaData.memory + embeddedMediaData.offset, capacity: Int(mediaLength), length: Int(mediaLength), freeWhenDone: false)).decodeRootObject() as? Media {
                    self.messageMediaTable.removeEmbeddedMedia(media)
                }
                embeddedMediaData.skip(Int(mediaLength))
            }
        }

        if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
            self.unsentTable.remove(index.id, operations: &unsentMessageOperations)
        }

        if let globallyUniqueId = message.globallyUniqueId {
            self.globallyUniqueMessageIdsTable.remove(peerId: message.id.peerId, globallyUniqueId: globallyUniqueId)
        }

        self.pendingActionsTable.removeMessage(id: message.id, operations: &pendingActionsOperations, updatedSummaries: &updatedMessageActionsSummaries)

        for tag in message.tags {
            self.tagsTable.remove(tags: tag, index: index, updatedSummaries: &updatedMessageTagSummaries, invalidateSummaries: &invalidateMessageTagSummaries)
        }
        for tag in message.globalTags {
            self.globalTagsTable.remove(tag, index: index)
        }
        if !message.localTags.isEmpty {
            self.localTagsTable.set(id: index.id, tags: [], previousTags: message.localTags, operations: &localTagsOperations)
        }

        for mediaId in message.referencedMedia {
            let _ = self.messageMediaTable.removeReference(mediaId)
        }

        if self.seedConfiguration.peerNamespacesRequiringMessageTextIndex.contains(message.id.peerId.namespace) {
            self.textIndexTable.remove(messageId: message.id)
        }

        resultTags = message.tags
        resultGlobalTags = message.globalTags

        self.valueBox.remove(self.table, key: key, secure: true)
        return (resultTags, resultGlobalTags)
    } else {
        return nil
    }
}

unsentTable

1
let unsentTable: MessageHistoryUnsentTable

class MessageHistoryTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private let seedConfiguration: SeedConfiguration
let messageHistoryIndexTable: MessageHistoryIndexTable
let messageHistoryHoleIndexTable: MessageHistoryHoleIndexTable
let messageMediaTable: MessageMediaTable
let historyMetadataTable: MessageHistoryMetadataTable
let globallyUniqueMessageIdsTable: MessageGloballyUniqueIdTable
let unsentTable: MessageHistoryUnsentTable
let tagsTable: MessageHistoryTagsTable
let globalTagsTable: GlobalMessageHistoryTagsTable
let localTagsTable: LocalMessageHistoryTagsTable
let readStateTable: MessageHistoryReadStateTable
let synchronizeReadStateTable: MessageHistorySynchronizeReadStateTable
let textIndexTable: MessageHistoryTextIndexTable
let summaryTable: MessageHistoryTagsSummaryTable
let pendingActionsTable: PendingMessageActionsTable

class MessageHistoryUnsentTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func add(_ id: MessageId, operations: inout [IntermediateMessageHistoryUnsentOperation]) {
    self.valueBox.set(self.table, key: self.key(id), value: MemoryBuffer())
    operations.append(.Insert(id))
}
func remove(_ id: MessageId, operations: inout [IntermediateMessageHistoryUnsentOperation]) {
    self.valueBox.remove(self.table, key: self.key(id), secure: false)
    operations.append(.Remove(id))
}
func get() -> [MessageId] {
    var ids: [MessageId] = []
    self.valueBox.range(self.table, start: self.lowerBound(), end: self.upperBound(), keys: { key in
        ids.append(MessageId(peerId: PeerId(key.getInt64(4 + 4)), namespace: key.getInt32(0), id: key.getInt32(4)))
        return true
    }, limit: 0)
    return ids
}

unsentTable.add

1
2
3
if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
    self.unsentTable.add(message.id, operations: &unsentMessageOperations)
}

messagesGroupedByPeerId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private func messagesGroupedByPeerId(_ messages: [StoreMessage]) -> [PeerId: [StoreMessage]] {
    var dict: [PeerId: [StoreMessage]] = [:]
    
    for message in messages {
        let peerId = message.id.peerId
        if dict[peerId] == nil {
            dict[peerId] = [message]
        } else {
            dict[peerId]!.append(message)
        }
    }
    
    return dict
}

internalStoreMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
func internalStoreMessages(_ messages: [StoreMessage]) -> [InternalStoreMessage] {
    var internalStoreMessages: [InternalStoreMessage] = []
    for message in messages {
        switch message.id {
            case let .Id(id):
                internalStoreMessages.append(InternalStoreMessage(id: id, timestamp: message.timestamp, globallyUniqueId: message.globallyUniqueId, groupingKey: message.groupingKey, flags: message.flags, tags: message.tags, globalTags: message.globalTags, localTags: message.localTags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributes: message.attributes, media: message.media))
            case let .Partial(peerId, namespace):
                let id = self.historyMetadataTable.getNextMessageIdAndIncrement(peerId, namespace: namespace)
                internalStoreMessages.append(InternalStoreMessage(id: id, timestamp: message.timestamp, globallyUniqueId: message.globallyUniqueId, groupingKey: message.groupingKey, flags: message.flags, tags: message.tags, globalTags: message.globalTags, localTags: message.localTags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributes: message.attributes, media: message.media))
        }
    }
    return internalStoreMessages
}

messageHistoryIndexTable

1
let messageHistoryIndexTable: MessageHistoryIndexTable

messageHistoryIndexTable.addMessages

1
self.messageHistoryIndexTable.addMessages(internalPeerMessages, operations: &operations)
This post is licensed under CC BY 4.0 by the author.