Home telegram - network
Post
Cancel

telegram - network

Network

enum ConnectionStatus

1
2
3
4
case waitingForNetwork
case connecting(proxyAddress: String?, proxyHasConnectionIssues: Bool)
case updating(proxyAddress: String?)
case online(proxyAddress: String?)

struct MTProtoConnectionFlags:OptionSet

1
2
3
4
5
6
7
let rawValue: Int

static let NetworkAvailable = MTProtoConnectionFlags(rawValue: 1)
static let Connected = MTProtoConnectionFlags(rawValue: 2)
static let UpdatingConnectionContext = MTProtoConnectionFlags(rawValue: 4)
static let PerformingServiceTasks = MTProtoConnectionFlags(rawValue: 8)
static let ProxyHasConnectionIssues = MTProtoConnectionFlags(rawValue: 16)

struct MTProtoConnectionInfo

1
2
var flags: MTProtoConnectionFlags
var proxyAddress: String?

class MTProtoConnectionStatusDelegate:MTProtoDelegate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
var action: (MTProtoConnectionInfo) -> () = { _ in }
let info = Atomic<MTProtoConnectionInfo>(value: MTProtoConnectionInfo(flags: [], proxyAddress: nil))

// network availability changed
@objc func mtProtoNetworkAvailabilityChanged(_ mtProto: MTProto!, isNetworkAvailable: Bool) {
    self.action(self.info.modify { info in
        var info = info
        if isNetworkAvailable {
            info.flags = info.flags.union([.NetworkAvailable])
        } else {
            info.flags = info.flags.subtracting([.NetworkAvailable])
        }
        return info
    })
}

// connection state changed
@objc func mtProtoConnectionStateChanged(_ mtProto: MTProto!, state: MTProtoConnectionState!) {
    self.action(self.info.modify { info in
        var info = info
        if let state = state {
            if state.isConnected {
                info.flags.insert(.Connected)
                //朋友圈网络改变
                info.flags.remove(.ProxyHasConnectionIssues)
            } else {
                info.flags.remove(.Connected)
                if state.proxyHasConnectionIssues {
                    info.flags.insert(.ProxyHasConnectionIssues)
                } else {
                    info.flags.remove(.ProxyHasConnectionIssues)
                }
            }
        } else {
            info.flags.remove(.Connected)
            info.flags.remove(.ProxyHasConnectionIssues)
        }
        info.proxyAddress = state?.proxyAddress
        return info
    })
}

// connection context update state changed
@objc func mtProtoConnectionContextUpdateStateChanged(_ mtProto: MTProto!, isUpdatingConnectionContext: Bool) {
    self.action(self.info.modify { info in
        var info = info
        if isUpdatingConnectionContext {
            info.flags = info.flags.union([.UpdatingConnectionContext])
        } else {
            info.flags = info.flags.subtracting([.UpdatingConnectionContext])
        }
        return info
    })
}

// service tasks state changed
@objc func mtProtoServiceTasksStateChanged(_ mtProto: MTProto!, isPerformingServiceTasks: Bool) {
    self.action(self.info.modify { info in
        var info = info
        if isPerformingServiceTasks {
            info.flags = info.flags.union([.PerformingServiceTasks])
        } else {
            info.flags = info.flags.subtracting([.PerformingServiceTasks])
        }
        return info
    })
}

struct NetworkContextProxyId

1
2
3
private let ip: String
private let port: Int
private let secret: Data

class Network

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private let queue: Queue

// datacenter id
public let datacenterId: Int

// context
public let context: MTContext

// proto
let mtProto: MTProto

// message service
let requestService: MTRequestMessageService
let basePath: String

// connection status delegate
private let connectionStatusDelegate: MTProtoConnectionStatusDelegate

// app data disposable
private let appDataDisposable: Disposable

// multiplexed reqeust manager
private var _multiplexedRequestManager: MultiplexedRequestManager?
var multiplexedRequestManager: MultiplexedRequestManager {
    return self._multiplexedRequestManager!
}

private let _contextProxyId: ValuePromise<NetworkContextProxyId?>
var contextProxyId: Signal<NetworkContextProxyId?, NoError> {
    return self._contextProxyId.get()
}

private let _connectionStatus: Promise<ConnectionStatus>
public var connectionStatus: Signal<ConnectionStatus, NoError> {
    return self._connectionStatus.get() |> distinctUntilChanged
}

// drop connection status
public func dropConnectionStatus() {
    _connectionStatus.set(.single(.waitingForNetwork))
}

// should keep connection
public let shouldKeepConnection = Promise<Bool>(false)
private let shouldKeepConnectionDisposable = MetaDisposable()

// should explicitely keep worker connections
public let shouldExplicitelyKeepWorkerConnections = Promise<Bool>(false)

// should keep background download connections
public let shouldKeepBackgroundDownloadConnections = Promise<Bool>(false)

// mock connection status
public var mockConnectionStatus: ConnectionStatus? {
    didSet {
        if let mockConnectionStatus = self.mockConnectionStatus {
            self._connectionStatus.set(.single(mockConnectionStatus))
        }
    }
}

// logged out
var loggedOut: (() -> Void)?

// soft auth reset error received
var didReceiveSoftAuthResetError: (() -> Void)?

// vip state
public var vipState = false
public var vipStateUpdated: ((Bool) -> Void)?

Initialization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
fileprivate init(queue: Queue, datacenterId: Int, context: MTContext, mtProto: MTProto, requestService: MTRequestMessageService, connectionStatusDelegate: MTProtoConnectionStatusDelegate, _connectionStatus: Promise<ConnectionStatus>, basePath: String, appDataDisposable: Disposable) {
    self.queue = queue
    self.datacenterId = datacenterId
    self.context = context
    self._contextProxyId = ValuePromise((context.apiEnvironment.socksProxySettings as MTSocksProxySettings?).flatMap(NetworkContextProxyId.init(settings:)), ignoreRepeated: true)
    self.mtProto = mtProto
    self.requestService = requestService
    self.connectionStatusDelegate = connectionStatusDelegate
    self._connectionStatus = _connectionStatus
    self.appDataDisposable = appDataDisposable
    self.basePath = basePath

    super.init()

    self.requestService.didReceiveSoftAuthResetError = { [weak self] in
        self?.didReceiveSoftAuthResetError?()
    }

    let _contextProxyId = self._contextProxyId
    context.add(NetworkHelper(requestPublicKeys: { [weak self] id in
        if let strongSelf = self {
          // get publickeys from cdn config
            return strongSelf.request(Api.functions.help.getCdnConfig())
            |> map(Optional.init)
            |> `catch` { _ -> Signal<Api.CdnConfig?, NoError> in
                return .single(nil)
            }
            |> map { result -> NSArray in
                let array = NSMutableArray()
                if let result = result {
                    switch result {
                    case let .cdnConfig(publicKeys):
                        for key in publicKeys {
                            switch key {
                            case let .cdnPublicKey(dcId, publicKey):
                                if id == Int(dcId) {
                                    let dict = NSMutableDictionary()
                                    dict["key"] = publicKey
                                    dict["fingerprint"] = MTRsaFingerprint(publicKey)
                                    array.add(dict)
                                }
                            }
                        }
                    }
                }
                return array
            }
        } else {
            return .never()
        }
    }, isContextNetworkAccessAllowed: { [weak self] in // network access allowed
        if let strongSelf = self {
          // keep connection
            return strongSelf.shouldKeepConnection.get() |> distinctUntilChanged
        } else {
            return .single(false)
        }
    }, contextProxyIdUpdated: { value in // proxy updated
        _contextProxyId.set(value)
    }))
    requestService.delegate = self

  // request manager
    self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag, continueInBackground in
        if let strongSelf = self {
          // datacenter id
            let datacenterId: Int
          // id cdn
            let isCdn: Bool
          // is media
            let isMedia: Bool = true
            switch target {
            case let .main(id):
                datacenterId = id
                isCdn = false
            case let .cdn(id):
                datacenterId = id
                isCdn = true
            }
          // make worker
            return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, vipState: strongSelf.vipState, isMedia: isMedia, tag: tag, continueInBackground: continueInBackground)
        }
        return nil
    })

    let shouldKeepConnectionSignal = self.shouldKeepConnection.get()
        |> distinctUntilChanged |> deliverOn(queue)
    self.shouldKeepConnectionDisposable.set(shouldKeepConnectionSignal.start(next: { [weak self] value in
        if let strongSelf = self {
            if value {
                Logger.shared.log("Network", "Resume network connection")
              // resume if should keep connection
                strongSelf.mtProto.resume()
            } else {
              // drop transport(pause) if should not keeep connection
                Logger.shared.log("Network", "Pause network connection")
                strongSelf.mtProto.pause()
            }
        }
    }))

    self.vipStateUpdated = { [weak self] vip in
        if let strongSelf = self {
            strongSelf.vipState = vip
        }
    }
}

Global Time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public var globalTime: TimeInterval {
    return self.context.globalTime()
}

public var currentGlobalTime: Signal<Double, NoError> {
    return Signal { subscriber in
        self.context.performBatchUpdates({
            subscriber.putNext(self.context.globalTime())
            subscriber.putCompletion()
        })
        return EmptyDisposable
    }
}

public func getApproximateRemoteTimestamp() -> Int32 {
    return Int32(self.context.globalTime())
}

Authorization Required

1
2
3
4
5
public func requestMessageServiceAuthorizationRequired(_ requestMessageService: MTRequestMessageService!) {
    Logger.shared.log("Network", "requestMessageServiceAuthorizationRequired")
    // logg out
    self.loggedOut?()
}

Make Download Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func download(datacenterId: Int, isMedia: Bool, isCdn: Bool = false, tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
    return self.worker(datacenterId: datacenterId, isCdn: isCdn, vipState: self.vipState, isMedia: isMedia, tag: tag)
}
func upload(tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
    return self.worker(datacenterId: self.datacenterId, isCdn: false, vipState: self.vipState, isMedia: false, tag: tag)
}
func background() -> Signal<Download, NoError> {
    return self.worker(datacenterId: self.datacenterId, isCdn: false, vipState: self.vipState, isMedia: false, tag: nil)
}


private func makeWorker(datacenterId: Int, isCdn: Bool, vipState: Bool, isMedia: Bool, tag: MediaResourceFetchTag?, continueInBackground: Bool = false) -> Download {
    let queue = Queue.mainQueue()
    let shouldKeepWorkerConnection: Signal<Bool, NoError> = combineLatest(queue: queue, self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get(), self.shouldKeepBackgroundDownloadConnections.get())
    |> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections, shouldKeepBackgroundDownloadConnections -> Bool in
        return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections || (continueInBackground && shouldKeepBackgroundDownloadConnections)
    }
    |> distinctUntilChanged
    return Download(queue: self.queue, datacenterId: datacenterId, isMedia: isMedia, isCdn: isCdn, vipState: vipState, context: self.context, masterDatacenterId: self.datacenterId, usageInfo: usageCalculationInfo(basePath:
self.basePath, category: (tag as? TelegramMediaResourceFetchTag)?.statsCategory), shouldKeepConnection: shouldKeepWorkerConnection)
}


private func worker(datacenterId: Int, isCdn: Bool, vipState: Bool, isMedia: Bool, tag: MediaResourceFetchTag?) -> Signal<Download, NoError> {
    return Signal { [weak self] subscriber in
        if let strongSelf = self {
            subscriber.putNext(strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, vipState: vipState, isMedia: isMedia, tag: tag))
        }
        subscriber.putCompletion()
        
        return ActionDisposable {
            
        }
    }
}

Merge DataCenter Address

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public func mergeBackupDatacenterAddress(datacenterId: Int32, host: String, port: Int32, secret: Data?) {
    self.context.performBatchUpdates {
        let address = MTDatacenterAddress(ip: host, port: UInt16(port), preferForMedia: false, restrictToTcp: false, cdn: false, preferForProxy: false, secret: secret)
        self.context.addAddressForDatacenter(withId: Int(datacenterId), address: address)
        
        /*let currentScheme = self.context.transportSchemeForDatacenter(withId: Int(datacenterId), media: false, isProxy: false)
        if let currentScheme = currentScheme, currentScheme.address.isEqual(to: address) {
        } else {
            let scheme = MTTransportScheme(transport: MTTcpTransport.self, address: address, media: false)
            self.context.updateTransportSchemeForDatacenter(withId: Int(datacenterId), transportScheme: scheme, media: false, isProxy: false)
        }*/
        
        let currentSchemes = self.context.transportSchemesForDatacenter(withId: Int(datacenterId), media: false, enforceMedia: false, isProxy: false)
        var found = false
        for scheme in currentSchemes {
            if scheme.address.isEqual(to: address) {
                found = true
                break
            }
        }
        if !found {
            let scheme = MTTransportScheme(transport: MTTcpTransport.self, address: address, media: false)
            self.context.updateTransportSchemeForDatacenter(withId: Int(datacenterId), transportScheme: scheme, media: false, isProxy: false)
        }
    }
}

struct NetworkRequestAdditionalInfo:OptionSet

1
2
public static let acknowledgement = NetworkRequestAdditionalInfo(rawValue: 1 << 0)
public static let progress = NetworkRequestAdditionalInfo(rawValue: 1 << 1)

Request With AdditionalInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    public func requestWithAdditionalInfo<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), info: NetworkRequestAdditionalInfo, tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<NetworkRequestResult<T>, MTRpcError> {
        let requestService = self.requestService
        return Signal { subscriber in
            let request = MTRequest()
            
            request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
                if let result = data.2.parse(Buffer(data: response)) {
                    return BoxedMessage(result)
                }
                return nil
            })
                       // does not depends on passworkd entry
            request.dependsOnPasswordEntry = false
            
            request.shouldContinueExecutionWithErrorContext = { errorContext in
                guard let errorContext = errorContext else {
                    return true
                }
                if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
                    return false
                }
                return true
            }
            // acknowledgement received
            request.acknowledgementReceived = {
                if info.contains(.acknowledgement) {
                    subscriber.putNext(.acknowledged)
                }
            }
            
                       // progess
            request.progressUpdated = { progress, packetSize in
                if info.contains(.progress) {
                    subscriber.putNext(.progress(progress, Int32(clamping: packetSize)))
                }
            }
            
                       // completion
            request.completed = { (boxedResponse, timestamp, error) -> () in
                if let error = error {
                    subscriber.putError(error)
                } else {
                    if let result = (boxedResponse as! BoxedMessage).body as? T {
                        subscriber.putNext(.result(result))
                        subscriber.putCompletion()
                    }
                    else {
                        subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
                    }
                }
            }
            
            if let tag = tag {
                request.shouldDependOnRequest = { other in
                    if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag {
                        return tag.shouldDependOn(other: otherTag)
                    }
                    return false
                }
            }
            
            let internalId: Any! = request.internalId
            
            requestService.add(request)
            
            return ActionDisposable { [weak requestService] in
                requestService?.removeRequest(byInternalId: internalId)
            }
        }
    }

Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> {
        let requestService = self.requestService
        return Signal { subscriber in
            let request = MTRequest()
            request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), shortMetadata: WrappedRequestShortMetadata(shortMetadata: WrappedShortFunctionDescription(data.0)), responseParser: { response in
                if let result = data.2.parse(Buffer(data: response)) {
                    return BoxedMessage(result)
                }
                return nil
            })
            
                       // does not depends on passworkd entry
            request.dependsOnPasswordEntry = false
            
                       // should continue execution with error context
            request.shouldContinueExecutionWithErrorContext = { errorContext in
                guard let errorContext = errorContext else {
                    return true
                }
                                                               // not automatic flood wait
                if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
                    return false
                }
                return true
            }
            
                       // completed
            request.completed = { (boxedResponse, timestamp, error) -> () in
                if let error = error {
                    subscriber.putError(error)
                } else {
                    if let result = (boxedResponse as! BoxedMessage).body as? T {
                        subscriber.putNext(result)
                        subscriber.putCompletion()
                    }
                    else {
                        subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
                    }
                }
            }
            
            if let tag = tag {
                request.shouldDependOnRequest = { other in
                    if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag {
                        return tag.shouldDependOn(other: otherTag)
                    }
                    return false
                }
            }
            
            let internalId: Any! = request.internalId
            
            requestService.add(request)
            
            return ActionDisposable { [weak requestService] in
                requestService?.removeRequest(byInternalId: internalId)
            }
        }
    }
This post is licensed under CC BY 4.0 by the author.