Network
enum
ConnectionStatus
1
2
3
4
case waitingForNetwork
case connecting(proxyAddress: String?, proxyHasConnectionIssues: Bool)
case updating(proxyAddress: String?)
case online(proxyAddress: String?)
struct
MTProtoConnectionFlags:
OptionSet
1
2
3
4
5
6
7
let rawValue: Int
static let NetworkAvailable = MTProtoConnectionFlags(rawValue: 1)
static let Connected = MTProtoConnectionFlags(rawValue: 2)
static let UpdatingConnectionContext = MTProtoConnectionFlags(rawValue: 4)
static let PerformingServiceTasks = MTProtoConnectionFlags(rawValue: 8)
static let ProxyHasConnectionIssues = MTProtoConnectionFlags(rawValue: 16)
struct
MTProtoConnectionInfo
1
2
var flags: MTProtoConnectionFlags
var proxyAddress: String?
class
MTProtoConnectionStatusDelegate:
MTProtoDelegate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
var action: (MTProtoConnectionInfo) -> () = { _ in }
let info = Atomic<MTProtoConnectionInfo>(value: MTProtoConnectionInfo(flags: [], proxyAddress: nil))
// network availability changed
@objc func mtProtoNetworkAvailabilityChanged(_ mtProto: MTProto!, isNetworkAvailable: Bool) {
self.action(self.info.modify { info in
var info = info
if isNetworkAvailable {
info.flags = info.flags.union([.NetworkAvailable])
} else {
info.flags = info.flags.subtracting([.NetworkAvailable])
}
return info
})
}
// connection state changed
@objc func mtProtoConnectionStateChanged(_ mtProto: MTProto!, state: MTProtoConnectionState!) {
self.action(self.info.modify { info in
var info = info
if let state = state {
if state.isConnected {
info.flags.insert(.Connected)
//朋友圈网络改变
info.flags.remove(.ProxyHasConnectionIssues)
} else {
info.flags.remove(.Connected)
if state.proxyHasConnectionIssues {
info.flags.insert(.ProxyHasConnectionIssues)
} else {
info.flags.remove(.ProxyHasConnectionIssues)
}
}
} else {
info.flags.remove(.Connected)
info.flags.remove(.ProxyHasConnectionIssues)
}
info.proxyAddress = state?.proxyAddress
return info
})
}
// connection context update state changed
@objc func mtProtoConnectionContextUpdateStateChanged(_ mtProto: MTProto!, isUpdatingConnectionContext: Bool) {
self.action(self.info.modify { info in
var info = info
if isUpdatingConnectionContext {
info.flags = info.flags.union([.UpdatingConnectionContext])
} else {
info.flags = info.flags.subtracting([.UpdatingConnectionContext])
}
return info
})
}
// service tasks state changed
@objc func mtProtoServiceTasksStateChanged(_ mtProto: MTProto!, isPerformingServiceTasks: Bool) {
self.action(self.info.modify { info in
var info = info
if isPerformingServiceTasks {
info.flags = info.flags.union([.PerformingServiceTasks])
} else {
info.flags = info.flags.subtracting([.PerformingServiceTasks])
}
return info
})
}
struct
NetworkContextProxyId
1
2
3
private let ip: String
private let port: Int
private let secret: Data
class
Network
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private let queue: Queue
// datacenter id
public let datacenterId: Int
// context
public let context: MTContext
// proto
let mtProto: MTProto
// message service
let requestService: MTRequestMessageService
let basePath: String
// connection status delegate
private let connectionStatusDelegate: MTProtoConnectionStatusDelegate
// app data disposable
private let appDataDisposable: Disposable
// multiplexed reqeust manager
private var _multiplexedRequestManager: MultiplexedRequestManager?
var multiplexedRequestManager: MultiplexedRequestManager {
return self._multiplexedRequestManager!
}
private let _contextProxyId: ValuePromise<NetworkContextProxyId?>
var contextProxyId: Signal<NetworkContextProxyId?, NoError> {
return self._contextProxyId.get()
}
private let _connectionStatus: Promise<ConnectionStatus>
public var connectionStatus: Signal<ConnectionStatus, NoError> {
return self._connectionStatus.get() |> distinctUntilChanged
}
// drop connection status
public func dropConnectionStatus() {
_connectionStatus.set(.single(.waitingForNetwork))
}
// should keep connection
public let shouldKeepConnection = Promise<Bool>(false)
private let shouldKeepConnectionDisposable = MetaDisposable()
// should explicitely keep worker connections
public let shouldExplicitelyKeepWorkerConnections = Promise<Bool>(false)
// should keep background download connections
public let shouldKeepBackgroundDownloadConnections = Promise<Bool>(false)
// mock connection status
public var mockConnectionStatus: ConnectionStatus? {
didSet {
if let mockConnectionStatus = self.mockConnectionStatus {
self._connectionStatus.set(.single(mockConnectionStatus))
}
}
}
// logged out
var loggedOut: (() -> Void)?
// soft auth reset error received
var didReceiveSoftAuthResetError: (() -> Void)?
// vip state
public var vipState = false
public var vipStateUpdated: ((Bool) -> Void)?
Initialization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
fileprivate init(queue: Queue, datacenterId: Int, context: MTContext, mtProto: MTProto, requestService: MTRequestMessageService, connectionStatusDelegate: MTProtoConnectionStatusDelegate, _connectionStatus: Promise<ConnectionStatus>, basePath: String, appDataDisposable: Disposable) {
self.queue = queue
self.datacenterId = datacenterId
self.context = context
self._contextProxyId = ValuePromise((context.apiEnvironment.socksProxySettings as MTSocksProxySettings?).flatMap(NetworkContextProxyId.init(settings:)), ignoreRepeated: true)
self.mtProto = mtProto
self.requestService = requestService
self.connectionStatusDelegate = connectionStatusDelegate
self._connectionStatus = _connectionStatus
self.appDataDisposable = appDataDisposable
self.basePath = basePath
super.init()
self.requestService.didReceiveSoftAuthResetError = { [weak self] in
self?.didReceiveSoftAuthResetError?()
}
let _contextProxyId = self._contextProxyId
context.add(NetworkHelper(requestPublicKeys: { [weak self] id in
if let strongSelf = self {
// get publickeys from cdn config
return strongSelf.request(Api.functions.help.getCdnConfig())
|> map(Optional.init)
|> `catch` { _ -> Signal<Api.CdnConfig?, NoError> in
return .single(nil)
}
|> map { result -> NSArray in
let array = NSMutableArray()
if let result = result {
switch result {
case let .cdnConfig(publicKeys):
for key in publicKeys {
switch key {
case let .cdnPublicKey(dcId, publicKey):
if id == Int(dcId) {
let dict = NSMutableDictionary()
dict["key"] = publicKey
dict["fingerprint"] = MTRsaFingerprint(publicKey)
array.add(dict)
}
}
}
}
}
return array
}
} else {
return .never()
}
}, isContextNetworkAccessAllowed: { [weak self] in // network access allowed
if let strongSelf = self {
// keep connection
return strongSelf.shouldKeepConnection.get() |> distinctUntilChanged
} else {
return .single(false)
}
}, contextProxyIdUpdated: { value in // proxy updated
_contextProxyId.set(value)
}))
requestService.delegate = self
// request manager
self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag, continueInBackground in
if let strongSelf = self {
// datacenter id
let datacenterId: Int
// id cdn
let isCdn: Bool
// is media
let isMedia: Bool = true
switch target {
case let .main(id):
datacenterId = id
isCdn = false
case let .cdn(id):
datacenterId = id
isCdn = true
}
// make worker
return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, vipState: strongSelf.vipState, isMedia: isMedia, tag: tag, continueInBackground: continueInBackground)
}
return nil
})
let shouldKeepConnectionSignal = self.shouldKeepConnection.get()
|> distinctUntilChanged |> deliverOn(queue)
self.shouldKeepConnectionDisposable.set(shouldKeepConnectionSignal.start(next: { [weak self] value in
if let strongSelf = self {
if value {
Logger.shared.log("Network", "Resume network connection")
// resume if should keep connection
strongSelf.mtProto.resume()
} else {
// drop transport(pause) if should not keeep connection
Logger.shared.log("Network", "Pause network connection")
strongSelf.mtProto.pause()
}
}
}))
self.vipStateUpdated = { [weak self] vip in
if let strongSelf = self {
strongSelf.vipState = vip
}
}
}
Global Time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public var globalTime: TimeInterval {
return self.context.globalTime()
}
public var currentGlobalTime: Signal<Double, NoError> {
return Signal { subscriber in
self.context.performBatchUpdates({
subscriber.putNext(self.context.globalTime())
subscriber.putCompletion()
})
return EmptyDisposable
}
}
public func getApproximateRemoteTimestamp() -> Int32 {
return Int32(self.context.globalTime())
}
Authorization Required
1
2
3
4
5
public func requestMessageServiceAuthorizationRequired(_ requestMessageService: MTRequestMessageService!) {
Logger.shared.log("Network", "requestMessageServiceAuthorizationRequired")
// logg out
self.loggedOut?()
}
Make Download Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func download(datacenterId: Int, isMedia: Bool, isCdn: Bool = false, tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
return self.worker(datacenterId: datacenterId, isCdn: isCdn, vipState: self.vipState, isMedia: isMedia, tag: tag)
}
func upload(tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
return self.worker(datacenterId: self.datacenterId, isCdn: false, vipState: self.vipState, isMedia: false, tag: tag)
}
func background() -> Signal<Download, NoError> {
return self.worker(datacenterId: self.datacenterId, isCdn: false, vipState: self.vipState, isMedia: false, tag: nil)
}
private func makeWorker(datacenterId: Int, isCdn: Bool, vipState: Bool, isMedia: Bool, tag: MediaResourceFetchTag?, continueInBackground: Bool = false) -> Download {
let queue = Queue.mainQueue()
let shouldKeepWorkerConnection: Signal<Bool, NoError> = combineLatest(queue: queue, self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get(), self.shouldKeepBackgroundDownloadConnections.get())
|> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections, shouldKeepBackgroundDownloadConnections -> Bool in
return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections || (continueInBackground && shouldKeepBackgroundDownloadConnections)
}
|> distinctUntilChanged
return Download(queue: self.queue, datacenterId: datacenterId, isMedia: isMedia, isCdn: isCdn, vipState: vipState, context: self.context, masterDatacenterId: self.datacenterId, usageInfo: usageCalculationInfo(basePath:
self.basePath, category: (tag as? TelegramMediaResourceFetchTag)?.statsCategory), shouldKeepConnection: shouldKeepWorkerConnection)
}
private func worker(datacenterId: Int, isCdn: Bool, vipState: Bool, isMedia: Bool, tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
return Signal { [weak self] subscriber in
if let strongSelf = self {
subscriber.putNext(strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, vipState: vipState, isMedia: isMedia, tag: tag))
}
subscriber.putCompletion()
return ActionDisposable {
}
}
}
Merge DataCenter Address
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public func mergeBackupDatacenterAddress(datacenterId: Int32, host: String, port: Int32, secret: Data?) {
self.context.performBatchUpdates {
let address = MTDatacenterAddress(ip: host, port: UInt16(port), preferForMedia: false, restrictToTcp: false, cdn: false, preferForProxy: false, secret: secret)
self.context.addAddressForDatacenter(withId: Int(datacenterId), address: address)
/*let currentScheme = self.context.transportSchemeForDatacenter(withId: Int(datacenterId), media: false, isProxy: false)
if let currentScheme = currentScheme, currentScheme.address.isEqual(to: address) {
} else {
let scheme = MTTransportScheme(transport: MTTcpTransport.self, address: address, media: false)
self.context.updateTransportSchemeForDatacenter(withId: Int(datacenterId), transportScheme: scheme, media: false, isProxy: false)
}*/
let currentSchemes = self.context.transportSchemesForDatacenter(withId: Int(datacenterId), media: false, enforceMedia: false, isProxy: false)
var found = false
for scheme in currentSchemes {
if scheme.address.isEqual(to: address) {
found = true
break
}
}
if !found {
let scheme = MTTransportScheme(transport: MTTcpTransport.self, address: address, media: false)
self.context.updateTransportSchemeForDatacenter(withId: Int(datacenterId), transportScheme: scheme, media: false, isProxy: false)
}
}
}
struct
NetworkRequestAdditionalInfo:
OptionSet
1
2
public static let acknowledgement = NetworkRequestAdditionalInfo(rawValue: 1 << 0)
public static let progress = NetworkRequestAdditionalInfo(rawValue: 1 << 1)
Request With AdditionalInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public func requestWithAdditionalInfo<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), info: NetworkRequestAdditionalInfo, tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<NetworkRequestResult<T>, MTRpcError> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
// does not depends on passworkd entry
request.dependsOnPasswordEntry = false
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
return true
}
// acknowledgement received
request.acknowledgementReceived = {
if info.contains(.acknowledgement) {
subscriber.putNext(.acknowledged)
}
}
// progess
request.progressUpdated = { progress, packetSize in
if info.contains(.progress) {
subscriber.putNext(.progress(progress, Int32(clamping: packetSize)))
}
}
// completion
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? T {
subscriber.putNext(.result(result))
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
if let tag = tag {
request.shouldDependOnRequest = { other in
if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag {
return tag.shouldDependOn(other: otherTag)
}
return false
}
}
let internalId: Any! = request.internalId
requestService.add(request)
return ActionDisposable { [weak requestService] in
requestService?.removeRequest(byInternalId: internalId)
}
}
}
Request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
// does not depends on passworkd entry
request.dependsOnPasswordEntry = false
// should continue execution with error context
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
// not automatic flood wait
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
return true
}
// completed
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? T {
subscriber.putNext(result)
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
if let tag = tag {
request.shouldDependOnRequest = { other in
if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag {
return tag.shouldDependOn(other: otherTag)
}
return false
}
}
let internalId: Any! = request.internalId
requestService.add(request)
return ActionDisposable { [weak requestService] in
requestService?.removeRequest(byInternalId: internalId)
}
}
}