ChatController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
self.chatDisplayNode.sendMessages = { [weak self] messages, silentPosting, scheduleTime, isAnyMessageTextPartitioned in
if let strongSelf = self, case let .peer(peerId) = strongSelf.chatLocation {
strongSelf.commitPurposefulAction()
if let channel = strongSelf.presentationInterfaceState.renderedPeer?.peer as? UChatChannel, channel.isRestrictedBySlowmode {
let forwardCount = messages.reduce(0, { count, message -> Int in
if case .forward = message {
return count + 1
} else {
return count
}
})
var errorText: String?
if forwardCount > 1 {
errorText = strongSelf.presentationData.strings.Chat_AttachmentMultipleForwardDisabled
} else if isAnyMessageTextPartitioned {
errorText = strongSelf.presentationData.strings.Chat_MultipleTextMessagesDisabled
}
if let errorText = errorText {
strongSelf.present(standardTextAlertController(theme: AlertControllerTheme(presentationTheme: strongSelf.presentationData.theme), title: nil, text: errorText, actions: [TextAlertAction(type: .defaultAction, title: strongSelf.presentationData.strings.Common_OK, action: {})]), in: .window(.root))
return
}
}
let transformedMessages: [EnqueueMessage]
if let silentPosting = silentPosting {
transformedMessages = strongSelf.transformEnqueueMessages(messages, silentPosting: silentPosting)
} else if let scheduleTime = scheduleTime {
transformedMessages = strongSelf.transformEnqueueMessages(messages, silentPosting: false, scheduleTime: scheduleTime)
} else {
transformedMessages = strongSelf.transformEnqueueMessages(messages)
}
let _ = (enqueueMessages(account: strongSelf.context.account, peerId: peerId, messages: transformedMessages)
|> deliverOnMainQueue).start(next: { messageIds in
if let strongSelf = self {
if strongSelf.presentationInterfaceState.isScheduledMessages {
} else {
strongSelf.chatDisplayNode.historyNode.scrollToEndOfHistory()
}
}
})
strongSelf.donateIntent()
}
}
enqueueMessages
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public func enqueueMessages(account: Account, peerId: PeerId, messages: [EnqueueMessage]) -> Signal<[MessageId?], NoError> {
let signal: Signal<[(Bool, EnqueueMessage)], NoError>
if let transformOutgoingMessageMedia = account.transformOutgoingMessageMedia {
signal = opportunisticallyTransformOutgoingMedia(network: account.network, postbox: account.postbox, transformOutgoingMessageMedia: transformOutgoingMessageMedia, messages: messages, userInteractive: true)
} else {
signal = .single(messages.map { (false, $0) })
}
return signal
|> mapToSignal { messages -> Signal<[MessageId?], NoError> in
return account.postbox.transaction { transaction -> [MessageId?] in
return enqueueMessages(transaction: transaction, account: account, peerId: peerId, messages: messages)
}
}
}
unsentMessageIdsView
1
2
3
4
5
6
7
8
9
10
11
public func unsentMessageIdsView() -> Signal<UnsentMessageIdsView, NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.queue.async {
disposable.set(self.viewTracker.unsentMessageIdsViewSignal().start(next: { view in
subscriber.putNext(view)
}))
}
return disposable
}
}
unsentMessageIdsViewSignal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func unsentMessageIdsViewSignal() -> Signal<UnsentMessageIdsView, NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.queue.async {
subscriber.putNext(UnsentMessageIdsView(self.unsentMessageView.ids))
let pipe = ValuePipe<UnsentMessageIdsView>()
let index = self.unsendMessageIdsViewSubscribers.add(pipe)
let pipeDisposable = pipe.signal().start(next: { view in
subscriber.putNext(view)
})
disposable.set(ActionDisposable {
self.queue.async {
pipeDisposable.dispose()
self.unsendMessageIdsViewSubscribers.remove(index)
}
})
}
return disposable
}
}
unsentViewUpdated
1
2
3
4
5
private func unsentViewUpdated() {
for subscriber in self.unsendMessageIdsViewSubscribers.copyItems() {
subscriber.putNext(UnsentMessageIdsView(self.unsentMessageView.ids))
}
}
refreshViewsDueToExternalTransaction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func refreshViewsDueToExternalTransaction(postbox: Postbox, fetchUnsentMessageIds: () -> [MessageId], fetchSynchronizePeerReadStateOperations: () -> [PeerId: PeerReadStateSynchronizationOperation]) {
var updateTrackedHoles = false
for (mutableView, pipe) in self.messageHistoryViews.copyItems() {
if mutableView.refreshDueToExternalTransaction(postbox: postbox) {
pipe.putNext((MessageHistoryView(mutableView), .Generic))
updateTrackedHoles = true
}
}
for (mutableView, pipe) in self.chatListViews.copyItems() {
if mutableView.refreshDueToExternalTransaction(postbox: postbox) {
mutableView.render(postbox: postbox, renderMessage: self.renderMessage, getPeer: { id in
return self.getPeer(id)
}, getPeerNotificationSettings: self.getPeerNotificationSettings, getPeerPresence: self.getPeerPresence)
pipe.putNext((ChatListView(mutableView), .Generic))
}
}
if updateTrackedHoles {
self.updateTrackedHoles()
}
if self.unsentMessageView.refreshDueToExternalTransaction(fetchUnsentMessageIds: fetchUnsentMessageIds) {
self.unsentViewUpdated()
}
if self.synchronizeReadStatesView.refreshDueToExternalTransaction(fetchSynchronizePeerReadStateOperations: fetchSynchronizePeerReadStateOperations) {
self.synchronizeReadStateViewUpdated()
}
for (mutableView, pipe) in self.peerViews.copyItems() {
if mutableView.reset(postbox: postbox) {
pipe.putNext(PeerView(mutableView))
}
}
}
afterBegin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private func afterBegin() {
let currentTransactionStateVersion = self.metadataTable.transactionStateVersion()
if currentTransactionStateVersion != self.transactionStateVersion {
for table in self.tables {
table.clearMemoryCache()
}
self.viewTracker.refreshViewsDueToExternalTransaction(postbox: self, fetchUnsentMessageIds: {
return self.messageHistoryUnsentTable.get()
}, fetchSynchronizePeerReadStateOperations: {
return self.synchronizeReadStateTable.get(getCombinedPeerReadState: { peerId in
return self.readStateTable.getCombinedState(peerId)
})
})
self.transactionStateVersion = currentTransactionStateVersion
self.masterClientId.set(.single(self.metadataTable.masterClientId()))
}
}
updateViews
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
func updateViews(postbox: Postbox, transaction: PostboxTransaction) {
var updateTrackedHoles = false
if let currentUpdatedState = transaction.currentUpdatedState {
for (mutableView, pipe) in self.postboxStateViews.copyItems() {
if mutableView.replay(updatedState: currentUpdatedState) {
pipe.putNext(PostboxStateView(mutableView))
}
}
}
for (mutableView, pipe) in self.messageHistoryViews.copyItems() {
var updated = false
let previousPeerIds = mutableView.peerIds
if mutableView.replay(postbox: postbox, transaction: transaction) {
updated = true
}
var updateType: ViewUpdateType = .Generic
switch mutableView.peerIds {
case let .single(peerId):
for key in transaction.currentPeerHoleOperations.keys {
if key.peerId == peerId {
updateType = .FillHole
break
}
}
case .associated:
var ids = Set<PeerId>()
switch mutableView.peerIds {
case .single:
assertionFailure()
case let .associated(mainPeerId, associatedId):
ids.insert(mainPeerId)
if let associatedId = associatedId {
ids.insert(associatedId.peerId)
}
}
if !ids.isEmpty {
for key in transaction.currentPeerHoleOperations.keys {
if ids.contains(key.peerId) {
updateType = .FillHole
break
}
}
}
}
mutableView.updatePeerIds(transaction: transaction)
if mutableView.peerIds != previousPeerIds {
updateType = .UpdateVisible
let _ = mutableView.refreshDueToExternalTransaction(postbox: postbox)
updated = true
}
if updated {
updateTrackedHoles = true
pipe.putNext((MessageHistoryView(mutableView), updateType))
}
}
for (mutableView, pipe) in self.messageViews.copyItems() {
let operations = transaction.currentOperationsByPeerId[mutableView.messageId.peerId]
if operations != nil || !transaction.updatedMedia.isEmpty || !transaction.currentUpdatedCachedPeerData.isEmpty {
if mutableView.replay(operations ?? [], updatedMedia: transaction.updatedMedia, renderIntermediateMessage: self.renderMessage) {
pipe.putNext(MessageView(mutableView))
}
}
}
if !transaction.chatListOperations.isEmpty || !transaction.currentUpdatedPeerNotificationSettings.isEmpty || !transaction.currentUpdatedPeers.isEmpty || !transaction.currentInvalidateMessageTagSummaries.isEmpty || !transaction.currentUpdatedMessageTagSummaries.isEmpty || !transaction.currentOperationsByPeerId.isEmpty || transaction.replacedAdditionalChatListItems != nil || !transaction.currentUpdatedPeerPresences.isEmpty {
for (mutableView, pipe) in self.chatListViews.copyItems() {
let context = MutableChatListViewReplayContext()
if mutableView.replay(postbox: postbox, operations: transaction.chatListOperations, updatedPeerNotificationSettings: transaction.currentUpdatedPeerNotificationSettings, updatedPeers: transaction.currentUpdatedPeers, updatedPeerPresences: transaction.currentUpdatedPeerPresences, transaction: transaction, context: context) {
mutableView.complete(postbox: postbox, context: context)
mutableView.render(postbox: postbox, renderMessage: self.renderMessage, getPeer: { id in
return self.getPeer(id)
}, getPeerNotificationSettings: self.getPeerNotificationSettings, getPeerPresence: self.getPeerPresence)
pipe.putNext((ChatListView(mutableView), .Generic))
}
}
self.updateTrackedChatListHoles()
}
if updateTrackedHoles {
self.updateTrackedHoles()
}
if self.unsentMessageView.replay(transaction.unsentMessageOperations) {
self.unsentViewUpdated()
}
if self.synchronizeReadStatesView.replay(transaction.updatedSynchronizePeerReadStateOperations) {
self.synchronizeReadStateViewUpdated()
}
for (view, pipe) in self.unreadMessageCountsViews.copyItems() {
if view.replay(postbox: postbox, transaction: transaction) {
pipe.putNext(UnreadMessageCountsView(view))
}
}
if let replaceContactPeerIds = transaction.replaceContactPeerIds {
for (mutableView, pipe) in self.contactPeerIdsViews.copyItems() {
if mutableView.replay(updateRemoteTotalCount: transaction.replaceRemoteContactCount, replace: replaceContactPeerIds) {
pipe.putNext(ContactPeerIdsView(mutableView))
}
}
}
for (mutableView, pipe) in self.contactPeersViews.copyItems() {
if mutableView.replay(replacePeerIds: transaction.replaceContactPeerIds, updatedPeerPresences: transaction.currentUpdatedPeerPresences, getPeer: self.getPeer, getPeerPresence: self.getPeerPresence) {
pipe.putNext(ContactPeersView(mutableView))
}
}
/// Andrew Young - 我的分组
for (mutableView, pipe) in self.contactGroupsViews.copyItems() {
if mutableView.replay(updatedGroups: transaction.currentUpdatedGroups, updatedPeerPresences: transaction.currentUpdatedPeerPresences, getContactGroup: self.getContactGroup, getPeer: self.getPeer, getPeerPresence: self.getPeerPresence) {
pipe.putNext(ContactGroupsView(mutableView))
}
}
for (mutableView, pipe) in self.peerViews.copyItems() {
if mutableView.replay(postbox: postbox, transaction: transaction) {
pipe.putNext(PeerView(mutableView))
}
}
for (mutableView, pipe) in self.peerMergedOperationLogViews.copyItems() {
if mutableView.replay(operations: transaction.currentPeerMergedOperationLogOperations, getOperations: self.operationLogGetOperations, getTailIndex: self.operationLogGetTailIndex) {
pipe.putNext(PeerMergedOperationLogView(mutableView))
}
}
for (mutableView, pipe) in self.timestampBasedMessageAttributesViews.copyItems() {
if mutableView.replay(operations: transaction.currentTimestampBasedMessageAttributesOperations, getHead: self.getTimestampBasedMessageAttributesHead) {
pipe.putNext(TimestampBasedMessageAttributesView(mutableView))
}
}
for (mutableView, pipe) in self.preferencesViews.copyItems() {
if mutableView.replay(postbox: postbox, transaction: transaction) {
pipe.putNext(PreferencesView(mutableView))
}
}
for (mutableView, pipe) in self.multiplePeersViews.copyItems() {
if mutableView.replay(updatedPeers: transaction.currentUpdatedPeers, updatedPeerPresences: transaction.currentUpdatedPeerPresences) {
pipe.putNext(MultiplePeersView(mutableView))
}
}
for (mutableView, pipe) in self.itemCollectionsViews.copyItems() {
if mutableView.replay(postbox: postbox, transaction: transaction) {
pipe.putNext(ItemCollectionsView(mutableView))
}
}
for (mutableView, pipe) in self.combinedViews.copyItems() {
if mutableView.replay(postbox: postbox, transaction: transaction) {
pipe.putNext(mutableView.immutableView())
}
}
}
beforeCommit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private func beforeCommit() -> (updatedTransactionStateVersion: Int64?, updatedMasterClientId: Int64?) {
self.chatListTable.replay(historyOperationsByPeerId: self.currentOperationsByPeerId, updatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, updatedChatListInclusions: self.currentUpdatedChatListInclusions, messageHistoryTable: self.messageHistoryTable, peerChatInterfaceStateTable: self.peerChatInterfaceStateTable, operations: &self.currentChatListOperations)
self.peerChatTopTaggedMessageIdsTable.replay(historyOperationsByPeerId: self.currentOperationsByPeerId)
let alteredInitialPeerCombinedReadStates = self.readStateTable.transactionAlteredInitialPeerCombinedReadStates()
let updatedPeers = self.peerTable.transactionUpdatedPeers()
let transactionParticipationInTotalUnreadCountUpdates = self.peerNotificationSettingsTable.transactionParticipationInTotalUnreadCountUpdates(postbox: self)
self.chatListIndexTable.commitWithTransaction(postbox: self, alteredInitialPeerCombinedReadStates: alteredInitialPeerCombinedReadStates, updatedPeers: updatedPeers, transactionParticipationInTotalUnreadCountUpdates: transactionParticipationInTotalUnreadCountUpdates, updatedRootUnreadState: &self.currentUpdatedTotalUnreadState, updatedGroupTotalUnreadSummaries: &self.currentUpdatedGroupTotalUnreadSummaries, currentUpdatedGroupSummarySynchronizeOperations: &self.currentUpdatedGroupSummarySynchronizeOperations)
let transaction = PostboxTransaction(currentUpdatedState: self.currentUpdatedState, currentPeerHoleOperations: self.currentPeerHoleOperations, currentOperationsByPeerId: self.currentOperationsByPeerId, chatListOperations: self.currentChatListOperations, currentUpdatedChatListInclusions: self.currentUpdatedChatListInclusions, currentUpdatedPeers: self.currentUpdatedPeers, currentUpdatedPeerNotificationSettings: self.currentUpdatedPeerNotificationSettings, currentUpdatedPeerNotificationBehaviorTimestamps: self.currentUpdatedPeerNotificationBehaviorTimestamps, currentUpdatedCachedPeerData: self.currentUpdatedCachedPeerData, currentUpdatedPeerPresences: currentUpdatedPeerPresences, currentUpdatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, currentUpdatedTotalUnreadState: self.currentUpdatedTotalUnreadState, currentUpdatedTotalUnreadSummaries: self.currentUpdatedGroupTotalUnreadSummaries, alteredInitialPeerCombinedReadStates: alteredInitialPeerCombinedReadStates, currentPeerMergedOperationLogOperations: self.currentPeerMergedOperationLogOperations, currentTimestampBasedMessageAttributesOperations: self.currentTimestampBasedMessageAttributesOperations, unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations, currentUpdatedGroupSummarySynchronizeOperations: self.currentUpdatedGroupSummarySynchronizeOperations, currentPreferencesOperations: self.currentPreferencesOperations, currentOrderedItemListOperations: self.currentOrderedItemListOperations, currentItemCollectionItemsOperations: self.currentItemCollectionItemsOperations, currentItemCollectionInfosOperations: self.currentItemCollectionInfosOperations, currentUpdatedPeerChatStates: self.currentUpdatedPeerChatStates, currentGlobalTagsOperations: self.currentGlobalTagsOperations, currentLocalTagsOperations: self.currentLocalTagsOperations, updatedMedia: self.currentUpdatedMedia, replaceRemoteContactCount: self.currentReplaceRemoteContactCount, replaceContactPeerIds: self.currentReplacedContactPeerIds, currentPendingMessageActionsOperations: self.currentPendingMessageActionsOperations, currentUpdatedMessageActionsSummaries: self.currentUpdatedMessageActionsSummaries, currentUpdatedMessageTagSummaries: self.currentUpdatedMessageTagSummaries, currentInvalidateMessageTagSummaries: self.currentInvalidateMessageTagSummaries, currentUpdatedPendingPeerNotificationSettings: self.currentUpdatedPendingPeerNotificationSettings, replacedAdditionalChatListItems: self.currentReplacedAdditionalChatListItems, updatedNoticeEntryKeys: self.currentUpdatedNoticeEntryKeys, updatedCacheEntryKeys: self.currentUpdatedCacheEntryKeys, currentUpdatedGroups: self.currentUpdatedGroups, currentUpdatedLives: self.currentUpdatedLives, currentUpdatedMasterClientId: currentUpdatedMasterClientId)
var updatedTransactionState: Int64?
var updatedMasterClientId: Int64?
if !transaction.isEmpty {
self.viewTracker.updateViews(postbox: self, transaction: transaction)
self.transactionStateVersion = self.metadataTable.incrementTransactionStateVersion()
updatedTransactionState = self.transactionStateVersion
if let currentUpdatedMasterClientId = self.currentUpdatedMasterClientId {
self.metadataTable.setMasterClientId(currentUpdatedMasterClientId)
updatedMasterClientId = currentUpdatedMasterClientId
}
}
self.currentPeerHoleOperations.removeAll()
self.currentOperationsByPeerId.removeAll()
self.currentUpdatedChatListInclusions.removeAll()
self.currentUpdatedPeers.removeAll()
self.currentChatListOperations.removeAll()
self.currentUpdatedChatListInclusions.removeAll()
self.currentUnsentOperations.removeAll()
self.currentUpdatedSynchronizeReadStateOperations.removeAll()
self.currentUpdatedGroupSummarySynchronizeOperations.removeAll()
self.currentGlobalTagsOperations.removeAll()
self.currentLocalTagsOperations.removeAll()
self.currentUpdatedMedia.removeAll()
self.currentReplaceRemoteContactCount = nil
self.currentReplacedContactPeerIds = nil
self.currentReplacedAdditionalChatListItems = nil
self.currentUpdatedNoticeEntryKeys.removeAll()
self.currentUpdatedCacheEntryKeys.removeAll()
self.currentUpdatedMasterClientId = nil
self.currentUpdatedPeerNotificationSettings.removeAll()
self.currentUpdatedPeerNotificationBehaviorTimestamps.removeAll()
self.currentUpdatedCachedPeerData.removeAll()
self.currentUpdatedPeerPresences.removeAll()
self.currentUpdatedPeerChatListEmbeddedStates.removeAll()
self.currentUpdatedTotalUnreadState = nil
self.currentUpdatedGroupTotalUnreadSummaries.removeAll()
self.currentPeerMergedOperationLogOperations.removeAll()
self.currentTimestampBasedMessageAttributesOperations.removeAll()
self.currentPreferencesOperations.removeAll()
self.currentOrderedItemListOperations.removeAll()
self.currentItemCollectionItemsOperations.removeAll()
self.currentItemCollectionInfosOperations.removeAll()
self.currentUpdatedPeerChatStates.removeAll()
self.currentPendingMessageActionsOperations.removeAll()
self.currentUpdatedMessageActionsSummaries.removeAll()
self.currentUpdatedMessageTagSummaries.removeAll()
self.currentInvalidateMessageTagSummaries.removeAll()
self.currentUpdatedPendingPeerNotificationSettings.removeAll()
self.currentGroupIdsWithUpdatedReadStats.removeAll()
/// Andrew Young - 我的分组
self.currentUpdatedGroups.removeAll()
/// Andrew Young - 直播用户
self.currentUpdatedLives.removeAll()
for table in self.tables {
table.beforeCommit()
}
return (updatedTransactionState, updatedMasterClientId)
}
internalTransaction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private func internalTransaction<T>(_ f: (Transaction) -> T) -> (result: T, updatedTransactionStateVersion: Int64?, updatedMasterClientId: Int64?) {
self.valueBox.begin()
self.afterBegin()
let transaction = Transaction(postbox: self)
let result = f(transaction)
transaction.disposed = true
let (updatedTransactionState, updatedMasterClientId) = self.beforeCommit()
self.valueBox.commit()
if let currentUpdatedState = self.currentUpdatedState {
self.statePipe.putNext(currentUpdatedState)
}
self.currentUpdatedState = nil
return (result, updatedTransactionState, updatedMasterClientId)
}
transactionSignal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public func transactionSignal<T, E>(userInteractive: Bool = false, _ f: @escaping(Subscriber<T, E>, Transaction) -> Disposable) -> Signal<T, E> {
return Signal { subscriber in
let disposable = MetaDisposable()
let f: () -> Void = {
self.beginInternalTransaction {
let (_, updatedTransactionState, updatedMasterClientId) = self.internalTransaction({ transaction in
disposable.set(f(subscriber, transaction))
})
if updatedTransactionState != nil || updatedMasterClientId != nil {
//self.pipeNotifier.notify()
}
if let updatedMasterClientId = updatedMasterClientId {
self.masterClientId.set(.single(updatedMasterClientId))
}
}
}
if userInteractive {
self.queue.justDispatchWithQoS(qos: DispatchQoS.userInteractive, f)
} else {
self.queue.justDispatch(f)
}
return disposable
}
}
transaction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public func transaction<T>(userInteractive: Bool = false, ignoreDisabled: Bool = false, _ f: @escaping(Transaction) -> T) -> Signal<T, NoError> {
return Signal { subscriber in
let f: () -> Void = {
self.beginInternalTransaction(ignoreDisabled: ignoreDisabled, {
let (result, updatedTransactionState, updatedMasterClientId) = self.internalTransaction({ transaction in
return f(transaction)
})
if updatedTransactionState != nil || updatedMasterClientId != nil {
//self.pipeNotifier.notify()
}
if let updatedMasterClientId = updatedMasterClientId {
self.masterClientId.set(.single(updatedMasterClientId))
}
subscriber.putNext(result)
subscriber.putCompletion()
})
}
if self.queue.isCurrent() && Queue.mainQueue().isCurrent() {
f()
} else if userInteractive {
self.queue.justDispatchWithQoS(qos: DispatchQoS.userInteractive, f)
} else {
self.queue.justDispatch(f)
}
return EmptyDisposable
}
}
unsentMessageOperations
1
let unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]
class
PostboxTransaction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
final class PostboxTransaction {
let currentUpdatedState: PostboxCoding?
let currentPeerHoleOperations: [MessageHistoryIndexHoleOperationKey: [MessageHistoryIndexHoleOperation]]
let currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]]
let chatListOperations: [PeerGroupId: [ChatListOperation]]
let currentUpdatedChatListInclusions: [PeerId: PeerChatListInclusion]
let currentUpdatedPeers: [PeerId: Peer]
let currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings]
let currentUpdatedPeerNotificationBehaviorTimestamps: [PeerId: PeerNotificationSettingsBehaviorTimestamp]
let currentUpdatedCachedPeerData: [PeerId: CachedPeerData]
let currentUpdatedPeerPresences: [PeerId: PeerPresence]
let currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?]
let currentUpdatedTotalUnreadState: ChatListTotalUnreadState?
let currentUpdatedTotalUnreadSummaries: [PeerGroupId: PeerGroupUnreadCountersCombinedSummary]
let alteredInitialPeerCombinedReadStates: [PeerId: CombinedPeerReadState]
let currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation]
let currentTimestampBasedMessageAttributesOperations: [TimestampBasedMessageAttributesOperation]
let currentPreferencesOperations: [PreferencesOperation]
let currentOrderedItemListOperations: [Int32: [OrderedItemListOperation]]
let currentItemCollectionItemsOperations: [ItemCollectionId: [ItemCollectionItemsOperation]]
let currentItemCollectionInfosOperations: [ItemCollectionInfosOperation]
let currentUpdatedPeerChatStates: Set<PeerId>
let currentGlobalTagsOperations: [GlobalMessageHistoryTagsOperation]
let currentLocalTagsOperations: [IntermediateMessageHistoryLocalTagsOperation]
let currentPendingMessageActionsOperations: [PendingMessageActionsOperation]
let currentUpdatedMessageActionsSummaries: [PendingMessageActionsSummaryKey: Int32]
let currentUpdatedMessageTagSummaries: [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary]
let currentInvalidateMessageTagSummaries: [InvalidatedMessageHistoryTagsSummaryEntryOperation]
let currentUpdatedPendingPeerNotificationSettings: Set<PeerId>
let unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]
let updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]
let currentUpdatedGroupSummarySynchronizeOperations: [PeerGroupAndNamespace: Bool]
let updatedMedia: [MediaId: Media?]
let replaceRemoteContactCount: Int32?
let replaceContactPeerIds: Set<PeerId>?
let currentUpdatedMasterClientId: Int64?
let replacedAdditionalChatListItems: [PeerId]?
let updatedNoticeEntryKeys: Set<NoticeEntryKey>
let updatedCacheEntryKeys: Set<ItemCacheEntryId>
/// Andrew Young - 我的分组
let currentUpdatedGroups: [ContactsGroupInfo]
/// Andrew Young - 直播用户
let currentUpdatedLives: [PeerLiveUser]
var isEmpty: Bool {
if currentUpdatedState != nil {
return false
}
if !currentPeerHoleOperations.isEmpty {
return false
}
if !currentOperationsByPeerId.isEmpty {
return false
}
if !chatListOperations.isEmpty {
return false
}
if !currentUpdatedChatListInclusions.isEmpty {
return false
}
if !currentUpdatedPeers.isEmpty {
return false
}
if !currentUpdatedPeerNotificationSettings.isEmpty {
return false
}
if !currentUpdatedPeerNotificationBehaviorTimestamps.isEmpty {
return false
}
if !currentUpdatedCachedPeerData.isEmpty {
return false
}
if !currentUpdatedPeerPresences.isEmpty {
return false
}
if !currentUpdatedPeerChatListEmbeddedStates.isEmpty {
return false
}
if !unsentMessageOperations.isEmpty {
return false
}
if !updatedSynchronizePeerReadStateOperations.isEmpty {
return false
}
if !currentUpdatedGroupSummarySynchronizeOperations.isEmpty {
return false
}
if !updatedMedia.isEmpty {
return false
}
if replaceRemoteContactCount != nil {
return false
}
if let replaceContactPeerIds = replaceContactPeerIds, !replaceContactPeerIds.isEmpty {
return false
}
if currentUpdatedMasterClientId != nil {
return false
}
if currentUpdatedTotalUnreadState != nil {
return false
}
if !currentUpdatedTotalUnreadSummaries.isEmpty {
return false
}
if !alteredInitialPeerCombinedReadStates.isEmpty {
return false
}
if !alteredInitialPeerCombinedReadStates.isEmpty {
return false
}
if !currentPeerMergedOperationLogOperations.isEmpty {
return false
}
if !currentTimestampBasedMessageAttributesOperations.isEmpty {
return false
}
if !currentPreferencesOperations.isEmpty {
return false
}
if !currentOrderedItemListOperations.isEmpty {
return false
}
if !currentItemCollectionItemsOperations.isEmpty {
return false
}
if !currentItemCollectionInfosOperations.isEmpty {
return false
}
if !currentUpdatedPeerChatStates.isEmpty {
return false
}
if !self.currentGlobalTagsOperations.isEmpty {
return false
}
if !self.currentLocalTagsOperations.isEmpty {
return false
}
if !self.currentPendingMessageActionsOperations.isEmpty {
return false
}
if !self.currentUpdatedMessageActionsSummaries.isEmpty {
return false
}
if !self.currentUpdatedMessageTagSummaries.isEmpty {
return false
}
if !self.currentInvalidateMessageTagSummaries.isEmpty {
return false
}
if !self.currentUpdatedPendingPeerNotificationSettings.isEmpty {
return false
}
if replacedAdditionalChatListItems != nil {
return false
}
if !updatedNoticeEntryKeys.isEmpty {
return false
}
if !updatedCacheEntryKeys.isEmpty {
return false
}
/// Andrew Young - 我的分组
if !currentUpdatedGroups.isEmpty {
return false
}
/// Andrew Young - 直播用户
if !currentUpdatedLives.isEmpty {
return false
}
return true
}
init(currentUpdatedState: PostboxCoding?, currentPeerHoleOperations: [MessageHistoryIndexHoleOperationKey: [MessageHistoryIndexHoleOperation]] = [:], currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], chatListOperations: [PeerGroupId: [ChatListOperation]], currentUpdatedChatListInclusions: [PeerId: PeerChatListInclusion], currentUpdatedPeers: [PeerId: Peer], currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings], currentUpdatedPeerNotificationBehaviorTimestamps: [PeerId: PeerNotificationSettingsBehaviorTimestamp], currentUpdatedCachedPeerData: [PeerId: CachedPeerData], currentUpdatedPeerPresences: [PeerId: PeerPresence], currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?], currentUpdatedTotalUnreadState: ChatListTotalUnreadState?, currentUpdatedTotalUnreadSummaries: [PeerGroupId: PeerGroupUnreadCountersCombinedSummary], alteredInitialPeerCombinedReadStates: [PeerId: CombinedPeerReadState], currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation], currentTimestampBasedMessageAttributesOperations: [TimestampBasedMessageAttributesOperation], unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?], currentUpdatedGroupSummarySynchronizeOperations: [PeerGroupAndNamespace: Bool], currentPreferencesOperations: [PreferencesOperation], currentOrderedItemListOperations: [Int32: [OrderedItemListOperation]], currentItemCollectionItemsOperations: [ItemCollectionId: [ItemCollectionItemsOperation]], currentItemCollectionInfosOperations: [ItemCollectionInfosOperation], currentUpdatedPeerChatStates: Set<PeerId>, currentGlobalTagsOperations: [GlobalMessageHistoryTagsOperation], currentLocalTagsOperations: [IntermediateMessageHistoryLocalTagsOperation], updatedMedia: [MediaId: Media?], replaceRemoteContactCount: Int32?, replaceContactPeerIds: Set<PeerId>?, currentPendingMessageActionsOperations: [PendingMessageActionsOperation], currentUpdatedMessageActionsSummaries: [PendingMessageActionsSummaryKey: Int32], currentUpdatedMessageTagSummaries: [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], currentInvalidateMessageTagSummaries: [InvalidatedMessageHistoryTagsSummaryEntryOperation], currentUpdatedPendingPeerNotificationSettings: Set<PeerId>, replacedAdditionalChatListItems: [PeerId]?, updatedNoticeEntryKeys: Set<NoticeEntryKey>, updatedCacheEntryKeys: Set<ItemCacheEntryId>, currentUpdatedGroups: [ContactsGroupInfo], currentUpdatedLives: [PeerLiveUser], currentUpdatedMasterClientId: Int64?) {
self.currentUpdatedState = currentUpdatedState
self.currentPeerHoleOperations = currentPeerHoleOperations
self.currentOperationsByPeerId = currentOperationsByPeerId
self.chatListOperations = chatListOperations
self.currentUpdatedChatListInclusions = currentUpdatedChatListInclusions
self.currentUpdatedPeers = currentUpdatedPeers
self.currentUpdatedPeerNotificationSettings = currentUpdatedPeerNotificationSettings
self.currentUpdatedPeerNotificationBehaviorTimestamps = currentUpdatedPeerNotificationBehaviorTimestamps
self.currentUpdatedCachedPeerData = currentUpdatedCachedPeerData
self.currentUpdatedPeerPresences = currentUpdatedPeerPresences
self.currentUpdatedPeerChatListEmbeddedStates = currentUpdatedPeerChatListEmbeddedStates
self.currentUpdatedTotalUnreadState = currentUpdatedTotalUnreadState
self.currentUpdatedTotalUnreadSummaries = currentUpdatedTotalUnreadSummaries
self.alteredInitialPeerCombinedReadStates = alteredInitialPeerCombinedReadStates
self.currentPeerMergedOperationLogOperations = currentPeerMergedOperationLogOperations
self.currentTimestampBasedMessageAttributesOperations = currentTimestampBasedMessageAttributesOperations
self.unsentMessageOperations = unsentMessageOperations
self.updatedSynchronizePeerReadStateOperations = updatedSynchronizePeerReadStateOperations
self.currentUpdatedGroupSummarySynchronizeOperations = currentUpdatedGroupSummarySynchronizeOperations
self.currentPreferencesOperations = currentPreferencesOperations
self.currentOrderedItemListOperations = currentOrderedItemListOperations
self.currentItemCollectionItemsOperations = currentItemCollectionItemsOperations
self.currentItemCollectionInfosOperations = currentItemCollectionInfosOperations
self.currentUpdatedPeerChatStates = currentUpdatedPeerChatStates
self.currentGlobalTagsOperations = currentGlobalTagsOperations
self.currentLocalTagsOperations = currentLocalTagsOperations
self.updatedMedia = updatedMedia
self.replaceRemoteContactCount = replaceRemoteContactCount
self.replaceContactPeerIds = replaceContactPeerIds
self.currentPendingMessageActionsOperations = currentPendingMessageActionsOperations
self.currentUpdatedMessageActionsSummaries = currentUpdatedMessageActionsSummaries
self.currentUpdatedMessageTagSummaries = currentUpdatedMessageTagSummaries
self.currentInvalidateMessageTagSummaries = currentInvalidateMessageTagSummaries
self.currentUpdatedPendingPeerNotificationSettings = currentUpdatedPendingPeerNotificationSettings
self.currentUpdatedMasterClientId = currentUpdatedMasterClientId
self.replacedAdditionalChatListItems = replacedAdditionalChatListItems
self.updatedNoticeEntryKeys = updatedNoticeEntryKeys
self.updatedCacheEntryKeys = updatedCacheEntryKeys
/// Andrew Young - 我的分组
self.currentUpdatedGroups = currentUpdatedGroups
/// Andrew Young - 直播用户
self.currentUpdatedLives = currentUpdatedLives
}
}
addMessages
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
fileprivate func addMessages(transaction: Transaction, messages: [StoreMessage], location: AddMessagesLocation) -> [Int64: MessageId] {
var addedMessagesByPeerId: [PeerId: [StoreMessage]] = [:]
let addResult = self.messageHistoryTable.addMessages(messages: messages, operationsByPeerId: &self.currentOperationsByPeerId, updatedMedia: &self.currentUpdatedMedia, unsentMessageOperations: ¤tUnsentOperations, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations, globalTagsOperations: &self.currentGlobalTagsOperations, pendingActionsOperations: &self.currentPendingMessageActionsOperations, updatedMessageActionsSummaries: &self.currentUpdatedMessageActionsSummaries, updatedMessageTagSummaries: &self.currentUpdatedMessageTagSummaries, invalidateMessageTagSummaries: &self.currentInvalidateMessageTagSummaries, localTagsOperations: &self.currentLocalTagsOperations, processMessages: { messagesByPeerId in
addedMessagesByPeerId = messagesByPeerId
})
for (peerId, peerMessages) in addedMessagesByPeerId {
switch location {
case .Random:
break
case .UpperHistoryBlock:
var earliestByNamespace: [MessageId.Namespace: MessageId] = [:]
for message in peerMessages {
if case let .Id(id) = message.id {
if let currentEarliestId = earliestByNamespace[id.namespace] {
if id < currentEarliestId {
earliestByNamespace[id.namespace] = id
}
} else {
earliestByNamespace[id.namespace] = id
}
}
}
for (_, id) in earliestByNamespace {
self.messageHistoryHoleIndexTable.remove(peerId: id.peerId, namespace: id.namespace, space: .everywhere, range: id.id ... (Int32.max - 1), operations: &self.currentPeerHoleOperations)
}
}
if let bag = self.installedMessageActionsByPeerId[peerId] {
for f in bag.copyItems() {
f(peerMessages, transaction)
}
}
}
return addResult
}
addMessages
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func addMessages(messages: [StoreMessage], operationsByPeerId: inout [PeerId: [MessageHistoryOperation]], updatedMedia: inout [MediaId: Media?], unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], updatedPeerReadStateOperations: inout [PeerId: PeerReadStateSynchronizationOperation?], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation], processMessages: (([PeerId : [StoreMessage]]) -> Void)?) -> [Int64: MessageId] {
let messagesByPeerId = self.messagesGroupedByPeerId(messages)
var globallyUniqueIdToMessageId: [Int64: MessageId] = [:]
var globalTagsInitialized = Set<GlobalMessageTags>()
for (peerId, peerMessages) in messagesByPeerId {
var operations: [MessageHistoryIndexOperation] = []
let internalPeerMessages = self.internalStoreMessages(peerMessages)
for message in internalPeerMessages {
if let globallyUniqueId = message.globallyUniqueId {
globallyUniqueIdToMessageId[globallyUniqueId] = message.id
}
if !message.globalTags.isEmpty {
for tag in message.globalTags {
if !globalTagsInitialized.contains(tag) {
self.globalTagsTable.ensureInitialized(tag)
globalTagsInitialized.insert(tag)
}
}
}
}
self.messageHistoryIndexTable.addMessages(internalPeerMessages, operations: &operations)
self.processIndexOperations(peerId, operations: operations, processedOperationsByPeerId: &operationsByPeerId, updatedMedia: &updatedMedia, unsentMessageOperations: &unsentMessageOperations, updatedPeerReadStateOperations: &updatedPeerReadStateOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
}
processMessages?(messagesByPeerId)
return globallyUniqueIdToMessageId
}
processIndexOperations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
private func processIndexOperations(_ peerId: PeerId, operations: [MessageHistoryIndexOperation], processedOperationsByPeerId: inout [PeerId: [MessageHistoryOperation]], updatedMedia: inout [MediaId: Media?], unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], updatedPeerReadStateOperations: inout [PeerId: PeerReadStateSynchronizationOperation?], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) {
let sharedKey = self.key(MessageIndex(id: MessageId(peerId: PeerId(namespace: 0, id: 0), namespace: 0, id: 0), timestamp: 0))
let sharedBuffer = WriteBuffer()
let sharedEncoder = PostboxEncoder()
var outputOperations: [MessageHistoryOperation] = []
var accumulatedRemoveIndices: [(MessageIndex)] = []
var accumulatedAddedIncomingMessageIndices = Set<MessageIndex>()
var updatedCombinedState: CombinedPeerReadState?
var invalidateReadState = false
var updateExistingMedia: [MediaId: Media] = [:]
let commitAccumulatedAddedIndices: () -> Void = {
if !accumulatedAddedIncomingMessageIndices.isEmpty {
let (combinedState, invalidate) = self.readStateTable.addIncomingMessages(peerId, indices: accumulatedAddedIncomingMessageIndices)
if let combinedState = combinedState {
updatedCombinedState = combinedState
}
if invalidate {
invalidateReadState = true
}
accumulatedAddedIncomingMessageIndices.removeAll()
}
}
for operation in operations {
switch operation {
case let .InsertMessage(storeMessage):
processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
let (message, updatedGroupInfos) = self.justInsertMessage(storeMessage, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, localTagsOperations: &localTagsOperations, updateExistingMedia: &updateExistingMedia)
outputOperations.append(.InsertMessage(message))
if !updatedGroupInfos.isEmpty {
outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
}
if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
self.unsentTable.add(message.id, operations: &unsentMessageOperations)
}
let tags = message.tags.rawValue
if tags != 0 {
for i in 0 ..< 32 {
let currentTags = tags >> UInt32(i)
if currentTags == 0 {
break
}
if (currentTags & 1) != 0 {
let tag = MessageTags(rawValue: 1 << UInt32(i))
self.tagsTable.add(tags: tag, index: message.index, updatedSummaries: &updatedMessageTagSummaries, invalidateSummaries: &invalidateMessageTagSummaries)
}
}
}
let globalTags = message.globalTags.rawValue
if globalTags != 0 {
for i in 0 ..< 32 {
let currentTags = globalTags >> UInt32(i)
if currentTags == 0 {
break
}
if (currentTags & 1) != 0 {
let tag = GlobalMessageTags(rawValue: 1 << UInt32(i))
if self.globalTagsTable.addMessage(tag, index: message.index) {
globalTagsOperations.append(.insertMessage(tag, message))
}
}
}
}
if !message.localTags.isEmpty {
self.localTagsTable.set(id: message.id, tags: message.localTags, previousTags: [], operations: &localTagsOperations)
}
if !message.flags.intersection(.IsIncomingMask).isEmpty {
accumulatedAddedIncomingMessageIndices.insert(message.index)
}
case let .InsertExistingMessage(storeMessage):
commitAccumulatedAddedIndices()
processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
if let (message, previousTags) = self.justUpdate(storeMessage.index, message: storeMessage, keepLocalTags: true, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia) {
outputOperations.append(.Remove([(storeMessage.index, previousTags)]))
outputOperations.append(.InsertMessage(message))
if !updatedGroupInfos.isEmpty {
outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
}
}
case let .Remove(index):
commitAccumulatedAddedIndices()
accumulatedRemoveIndices.append(index)
case let .Update(index, storeMessage):
commitAccumulatedAddedIndices()
processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
if let (message, previousTags) = self.justUpdate(index, message: storeMessage, keepLocalTags: false, sharedKey: sharedKey, sharedBuffer: sharedBuffer, sharedEncoder: sharedEncoder, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia) {
outputOperations.append(.Remove([(index, previousTags)]))
outputOperations.append(.InsertMessage(message))
if !updatedGroupInfos.isEmpty {
outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
}
if !message.flags.intersection(.IsIncomingMask).isEmpty {
if index != message.index {
accumulatedRemoveIndices.append(index)
accumulatedAddedIncomingMessageIndices.insert(message.index)
}
}
}
case let .UpdateTimestamp(index, timestamp):
commitAccumulatedAddedIndices()
processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
let tagsAndGlobalTags = self.justUpdateTimestamp(index, timestamp: timestamp, unsentMessageOperations: &unsentMessageOperations, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, updatedGroupInfos: &updatedGroupInfos, localTagsOperations: &localTagsOperations, updatedMedia: &updatedMedia)
outputOperations.append(.UpdateTimestamp(index, timestamp))
if !updatedGroupInfos.isEmpty {
outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
}
if let (_, globalTags) = tagsAndGlobalTags {
if !globalTags.isEmpty {
globalTagsOperations.append(.updateTimestamp(globalTags, index, timestamp))
}
}
}
}
commitAccumulatedAddedIndices()
processIndexOperationsCommitAccumulatedRemoveIndices(peerId: peerId, accumulatedRemoveIndices: &accumulatedRemoveIndices, updatedCombinedState: &updatedCombinedState, invalidateReadState: &invalidateReadState, unsentMessageOperations: &unsentMessageOperations, outputOperations: &outputOperations, globalTagsOperations: &globalTagsOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
if let updatedCombinedState = updatedCombinedState {
outputOperations.append(.UpdateReadState(peerId, updatedCombinedState))
}
if invalidateReadState {
self.synchronizeReadStateTable.set(peerId, operation: .Validate, operations: &updatedPeerReadStateOperations)
}
if processedOperationsByPeerId[peerId] == nil {
processedOperationsByPeerId[peerId] = outputOperations
} else {
processedOperationsByPeerId[peerId]!.append(contentsOf: outputOperations)
}
for (_, media) in updateExistingMedia {
if let id = media.id {
var updatedMessageIndices = Set<MessageIndex>()
self.updateMedia(id, media: media, operationsByPeerId: &processedOperationsByPeerId, updatedMedia: &updatedMedia, updatedMessageIndices: &updatedMessageIndices)
}
}
//self.debugCheckTagIndexIntegrity(peerId: peerId)
}
processIndexOperationsCommitAccumulatedRemoveIndices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private func processIndexOperationsCommitAccumulatedRemoveIndices(peerId: PeerId, accumulatedRemoveIndices: inout [MessageIndex], updatedCombinedState: inout CombinedPeerReadState?, invalidateReadState: inout Bool, unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], outputOperations: inout [MessageHistoryOperation], globalTagsOperations: inout [GlobalMessageHistoryTagsOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) {
if !accumulatedRemoveIndices.isEmpty {
let (combinedState, invalidate) = self.readStateTable.deleteMessages(peerId, indices: accumulatedRemoveIndices, incomingStatsInIndices: { peerId, namespace, indices in
return self.incomingMessageStatsInIndices(peerId, namespace: namespace, indices: indices)
})
if let combinedState = combinedState {
updatedCombinedState = combinedState
}
if invalidate {
invalidateReadState = true
}
let buckets = self.continuousIndexIntervalsForRemoving(accumulatedRemoveIndices)
for bucket in buckets {
var indicesWithMetadata: [(MessageIndex, MessageTags)] = []
var globalIndicesWithMetadata: [(GlobalMessageTags, MessageIndex)] = []
for index in bucket {
let tagsAndGlobalTags = self.justRemove(index, unsentMessageOperations: &unsentMessageOperations, pendingActionsOperations: &pendingActionsOperations, updatedMessageActionsSummaries: &updatedMessageActionsSummaries, updatedMessageTagSummaries: &updatedMessageTagSummaries, invalidateMessageTagSummaries: &invalidateMessageTagSummaries, localTagsOperations: &localTagsOperations)
if let (tags, globalTags) = tagsAndGlobalTags {
indicesWithMetadata.append((index, tags))
if !globalTags.isEmpty {
globalIndicesWithMetadata.append((globalTags, index))
}
} else {
indicesWithMetadata.append((index, MessageTags()))
}
}
assert(bucket.count == indicesWithMetadata.count)
outputOperations.append(.Remove(indicesWithMetadata))
if !globalIndicesWithMetadata.isEmpty {
globalTagsOperations.append(.remove(globalIndicesWithMetadata))
}
var updatedGroupInfos: [MessageId: MessageGroupInfo] = [:]
self.maybeCombineGroupsInNamespace(at: bucket[0], updatedGroupInfos: &updatedGroupInfos)
if !updatedGroupInfos.isEmpty {
outputOperations.append(.UpdateGroupInfos(updatedGroupInfos))
}
}
accumulatedRemoveIndices.removeAll()
}
}
justRemove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private func justRemove(_ index: MessageIndex, unsentMessageOperations: inout [IntermediateMessageHistoryUnsentOperation], pendingActionsOperations: inout [PendingMessageActionsOperation], updatedMessageActionsSummaries: inout [PendingMessageActionsSummaryKey: Int32], updatedMessageTagSummaries: inout [MessageHistoryTagsSummaryKey: MessageHistoryTagNamespaceSummary], invalidateMessageTagSummaries: inout [InvalidatedMessageHistoryTagsSummaryEntryOperation], localTagsOperations: inout [IntermediateMessageHistoryLocalTagsOperation]) -> (MessageTags, GlobalMessageTags)? {
let key = self.key(index)
if let value = self.valueBox.get(self.table, key: key) {
let resultTags: MessageTags
let resultGlobalTags: GlobalMessageTags
let message = self.readIntermediateEntry(key, value: value).message
let embeddedMediaData = message.embeddedMediaData
if embeddedMediaData.length > 4 {
var embeddedMediaCount: Int32 = 0
embeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4)
for _ in 0 ..< embeddedMediaCount {
var mediaLength: Int32 = 0
embeddedMediaData.read(&mediaLength, offset: 0, length: 4)
if let media = PostboxDecoder(buffer: MemoryBuffer(memory: embeddedMediaData.memory + embeddedMediaData.offset, capacity: Int(mediaLength), length: Int(mediaLength), freeWhenDone: false)).decodeRootObject() as? Media {
self.messageMediaTable.removeEmbeddedMedia(media)
}
embeddedMediaData.skip(Int(mediaLength))
}
}
if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
self.unsentTable.remove(index.id, operations: &unsentMessageOperations)
}
if let globallyUniqueId = message.globallyUniqueId {
self.globallyUniqueMessageIdsTable.remove(peerId: message.id.peerId, globallyUniqueId: globallyUniqueId)
}
self.pendingActionsTable.removeMessage(id: message.id, operations: &pendingActionsOperations, updatedSummaries: &updatedMessageActionsSummaries)
for tag in message.tags {
self.tagsTable.remove(tags: tag, index: index, updatedSummaries: &updatedMessageTagSummaries, invalidateSummaries: &invalidateMessageTagSummaries)
}
for tag in message.globalTags {
self.globalTagsTable.remove(tag, index: index)
}
if !message.localTags.isEmpty {
self.localTagsTable.set(id: index.id, tags: [], previousTags: message.localTags, operations: &localTagsOperations)
}
for mediaId in message.referencedMedia {
let _ = self.messageMediaTable.removeReference(mediaId)
}
if self.seedConfiguration.peerNamespacesRequiringMessageTextIndex.contains(message.id.peerId.namespace) {
self.textIndexTable.remove(messageId: message.id)
}
resultTags = message.tags
resultGlobalTags = message.globalTags
self.valueBox.remove(self.table, key: key, secure: true)
return (resultTags, resultGlobalTags)
} else {
return nil
}
}
unsentTable
1
let unsentTable: MessageHistoryUnsentTable
class
MessageHistoryTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private let seedConfiguration: SeedConfiguration
let messageHistoryIndexTable: MessageHistoryIndexTable
let messageHistoryHoleIndexTable: MessageHistoryHoleIndexTable
let messageMediaTable: MessageMediaTable
let historyMetadataTable: MessageHistoryMetadataTable
let globallyUniqueMessageIdsTable: MessageGloballyUniqueIdTable
let unsentTable: MessageHistoryUnsentTable
let tagsTable: MessageHistoryTagsTable
let globalTagsTable: GlobalMessageHistoryTagsTable
let localTagsTable: LocalMessageHistoryTagsTable
let readStateTable: MessageHistoryReadStateTable
let synchronizeReadStateTable: MessageHistorySynchronizeReadStateTable
let textIndexTable: MessageHistoryTextIndexTable
let summaryTable: MessageHistoryTagsSummaryTable
let pendingActionsTable: PendingMessageActionsTable
class
MessageHistoryUnsentTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func add(_ id: MessageId, operations: inout [IntermediateMessageHistoryUnsentOperation]) {
self.valueBox.set(self.table, key: self.key(id), value: MemoryBuffer())
operations.append(.Insert(id))
}
func remove(_ id: MessageId, operations: inout [IntermediateMessageHistoryUnsentOperation]) {
self.valueBox.remove(self.table, key: self.key(id), secure: false)
operations.append(.Remove(id))
}
func get() -> [MessageId] {
var ids: [MessageId] = []
self.valueBox.range(self.table, start: self.lowerBound(), end: self.upperBound(), keys: { key in
ids.append(MessageId(peerId: PeerId(key.getInt64(4 + 4)), namespace: key.getInt32(0), id: key.getInt32(4)))
return true
}, limit: 0)
return ids
}
unsentTable.add
1
2
3
if message.flags.contains(.Unsent) && !message.flags.contains(.Failed) {
self.unsentTable.add(message.id, operations: &unsentMessageOperations)
}
messagesGroupedByPeerId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private func messagesGroupedByPeerId(_ messages: [StoreMessage]) -> [PeerId: [StoreMessage]] {
var dict: [PeerId: [StoreMessage]] = [:]
for message in messages {
let peerId = message.id.peerId
if dict[peerId] == nil {
dict[peerId] = [message]
} else {
dict[peerId]!.append(message)
}
}
return dict
}
internalStoreMessages
1
2
3
4
5
6
7
8
9
10
11
12
13
func internalStoreMessages(_ messages: [StoreMessage]) -> [InternalStoreMessage] {
var internalStoreMessages: [InternalStoreMessage] = []
for message in messages {
switch message.id {
case let .Id(id):
internalStoreMessages.append(InternalStoreMessage(id: id, timestamp: message.timestamp, globallyUniqueId: message.globallyUniqueId, groupingKey: message.groupingKey, flags: message.flags, tags: message.tags, globalTags: message.globalTags, localTags: message.localTags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributes: message.attributes, media: message.media))
case let .Partial(peerId, namespace):
let id = self.historyMetadataTable.getNextMessageIdAndIncrement(peerId, namespace: namespace)
internalStoreMessages.append(InternalStoreMessage(id: id, timestamp: message.timestamp, globallyUniqueId: message.globallyUniqueId, groupingKey: message.groupingKey, flags: message.flags, tags: message.tags, globalTags: message.globalTags, localTags: message.localTags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributes: message.attributes, media: message.media))
}
}
return internalStoreMessages
}
messageHistoryIndexTable
1
let messageHistoryIndexTable: MessageHistoryIndexTable
messageHistoryIndexTable.addMessages
1
self.messageHistoryIndexTable.addMessages(internalPeerMessages, operations: &operations)