Home telegram - mediaBox
Post
Cancel

telegram - mediaBox

class MediaBox

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
public let basePath: String
private let statusQueue = Queue()
private let concurrentQueue = Queue.concurrentDefaultQueue()
private let dataQueue = Queue()
private let cacheQueue = Queue()
private let timeBasedCleanup: TimeBasedCleanup
private var statusContexts: [WrappedMediaResourceId: ResourceStatusContext] = [:]
private var cachedRepresentationContexts: [CachedMediaResourceRepresentationKey: CachedMediaResourceRepresentationContext] = [:]
private var fileContexts: [WrappedMediaResourceId: MediaBoxFileContext] = [:]
private var wrappedFetchResource = Promise<(MediaResource, Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>>()
public var preFetchedResourcePath: (MediaResource) -> String? = { _ in return nil }

// fetch
public var fetchResource: ((MediaResource, Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>)? {
    didSet {
        if let fetchResource = self.fetchResource {
            wrappedFetchResource.set(.single(fetchResource))
        } else {
            wrappedFetchResource.set(.never())
        }
    }
}

// prefetch
public var wrappedFetchCachedResourceRepresentation = Promise<(MediaResource, CachedMediaResourceRepresentation) -> Signal<CachedMediaResourceRepresentationResult, NoError>>()
public var fetchCachedResourceRepresentation: ((MediaResource, CachedMediaResourceRepresentation) -> Signal<CachedMediaResourceRepresentationResult, NoError>)? {
    didSet {
        if let fetchCachedResourceRepresentation = self.fetchCachedResourceRepresentation {
            wrappedFetchCachedResourceRepresentation.set(.single(fetchCachedResourceRepresentation))
        } else {
            wrappedFetchCachedResourceRepresentation.set(.never())
        }
    }
}


lazy var ensureDirectoryCreated: Void = {
    try! FileManager.default.createDirectory(atPath: self.basePath, withIntermediateDirectories: true, attributes: nil)
    try! FileManager.default.createDirectory(atPath: self.basePath + "/cache", withIntermediateDirectories: true, attributes: nil)
    try! FileManager.default.createDirectory(atPath: self.basePath + "/short-cache", withIntermediateDirectories: true, attributes: nil)
}()


public init(basePath: String) {
    self.basePath = basePath
    
    self.timeBasedCleanup = TimeBasedCleanup(generalPaths: [
        self.basePath,
        self.basePath + "/cache"
    ], shortLivedPaths: [
        self.basePath + "/short-cache"
    ])
    
    let _ = self.ensureDirectoryCreated
}

// timebased cleanup
public func setMaxStoreTimes(general: Int32, shortLived: Int32) {
    self.timeBasedCleanup.setMaxStoreTimes(general: general, shortLived: shortLived)
}

// path for resource
private func fileNameForId(_ id: MediaResourceId) -> String {
    return "\(id.uniqueId)"
}
private func pathForId(_ id: MediaResourceId) -> String {
    return "\(self.basePath)/\(fileNameForId(id))"
}
private func storePathsForId(_ id: MediaResourceId) -> ResourceStorePaths {
    return ResourceStorePaths(partial: "\(self.basePath)/\(fileNameForId(id))_partial", complete: "\(self.basePath)/\(fileNameForId(id))")
}

private func cachedRepresentationPathForId(_ id: MediaResourceId, representation: CachedMediaResourceRepresentation) -> String {
    let cacheString: String
    switch representation.keepDuration {
        case .general:
            cacheString = "cache"
        case .shortLived:
            cacheString = "short-cache"
    }
    return "\(self.basePath)/\(cacheString)/\(fileNameForId(id)):\(representation.uniqueId)"
}

// move file
public func storeResourceData(_ id: MediaResourceId, data: Data) {
    self.dataQueue.async {
        let paths = self.storePathsForId(id)
        let _ = try? data.write(to: URL(fileURLWithPath: paths.complete), options: [.atomic])
    }
}
public func moveResourceData(_ id: MediaResourceId, fromTempPath: String) {
    self.dataQueue.async {
        let paths = self.storePathsForId(id)
        let _ = try? FileManager.default.moveItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete))
    }
}
public func copyResourceData(_ id: MediaResourceId, fromTempPath: String) {
    self.dataQueue.async {
        let paths = self.storePathsForId(id)
        let _ = try? FileManager.default.copyItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete))
    }
}
public func moveResourceData(from: MediaResourceId, to: MediaResourceId) {
    self.dataQueue.async {
        let pathsFrom = self.storePathsForId(from)
        let pathsTo = self.storePathsForId(to)
        link(pathsFrom.partial, pathsTo.partial)
        link(pathsFrom.complete, pathsTo.complete)
    }
}

private func maybeCopiedPreFetchedResource(completePath: String, resource: MediaResource) {
    if let path = self.preFetchedResourcePath(resource) {
        let _ = try? FileManager.default.copyItem(atPath: path, toPath: completePath)
    }
}

// resource status
public func resourceStatus(_ resource: MediaResource, approximateSynchronousValue: Bool = false) -> Signal<MediaResourceStatus, NoError> {
    let signal = Signal<MediaResourceStatus, NoError> { subscriber in
        let disposable = MetaDisposable()
        
        self.concurrentQueue.async {
            let paths = self.storePathsForId(resource.id)
            
            if let _ = fileSize(paths.complete) {
                self.timeBasedCleanup.touch(paths: [
                    paths.complete
                ])
                subscriber.putNext(.Local)
                subscriber.putCompletion()
            } else {
                self.maybeCopiedPreFetchedResource(completePath: paths.complete, resource: resource)
                if let _ = fileSize(paths.complete) {
                    self.timeBasedCleanup.touch(paths: [
                        paths.complete
                    ])
                    subscriber.putNext(.Local)
                    subscriber.putCompletion()
                    return
                }
                
                self.statusQueue.async {
                    let resourceId = WrappedMediaResourceId(resource.id)
                    let statusContext: ResourceStatusContext
                    var statusUpdateDisposable: MetaDisposable?
                    if let current = self.statusContexts[resourceId] {
                        statusContext = current
                    } else {
                        let statusUpdateDisposableValue = MetaDisposable()
                        statusContext = ResourceStatusContext(disposable: statusUpdateDisposableValue)
                        self.statusContexts[resourceId] = statusContext
                        statusUpdateDisposable = statusUpdateDisposableValue
                    }
                    
                    let index = statusContext.subscribers.add({ status in
                        subscriber.putNext(status)
                    })
                    
                    if let status = statusContext.status {
                        subscriber.putNext(status)
                    }
                    
                    if let statusUpdateDisposable = statusUpdateDisposable {
                        let statusQueue = self.statusQueue
                        self.dataQueue.async {
                            if let (fileContext, releaseContext) = self.fileContext(for: resource) {
                                let statusDisposable = fileContext.status(next: { [weak statusContext] value in
                                    statusQueue.async {
                                        if let current = self.statusContexts[resourceId], current === statusContext, current.status != value {
                                            current.status = value
                                            for subscriber in current.subscribers.copyItems() {
                                                subscriber(value)
                                            }
                                        }
                                    }
                                }, completed: { [weak statusContext] in
                                    statusQueue.async {
                                        if let current = self.statusContexts[resourceId], current ===  statusContext {
                                            current.subscribers.remove(index)
                                            if current.subscribers.isEmpty {
                                                self.statusContexts.removeValue(forKey: resourceId)
                                                current.disposable.dispose()
                                            }
                                        }
                                    }
                                }, size: resource.size.flatMap(Int32.init))
                                statusUpdateDisposable.set(ActionDisposable {
                                    statusDisposable.dispose()
                                    releaseContext()
                                })
                            }
                        }
                    }
                    
                    disposable.set(ActionDisposable { [weak statusContext] in
                        self.statusQueue.async {
                            if let current = self.statusContexts[WrappedMediaResourceId(resource.id)], current ===  statusContext {
                                current.subscribers.remove(index)
                                if current.subscribers.isEmpty {
                                    self.statusContexts.removeValue(forKey: WrappedMediaResourceId(resource.id))
                                    current.disposable.dispose()
                                }
                            }
                        }
                    })
                }
            }
        }
        
        return disposable
    }
    if approximateSynchronousValue {
        return Signal<Signal<MediaResourceStatus, NoError>, NoError> { subscriber in
            let paths = self.storePathsForId(resource.id)
            if let _ = fileSize(paths.complete) {
                subscriber.putNext(.single(.Local))
            } else {
                subscriber.putNext(.single(.Remote) |> then(signal))
            }
            subscriber.putCompletion()
            return EmptyDisposable
        } |> switchToLatest
    } else {
        return signal
    }
}

// resource data
public func resourceData(_ resource: MediaResource, pathExtension: String? = nil, option: ResourceDataRequestOption = .complete(waitUntilFetchStatus: false), attemptSynchronously: Bool = false) -> Signal<MediaResourceData, NoError> {
  ...
  self.dataQueue.async {
                        if let (fileContext, releaseContext) = self.fileContext(for: resource) {
                            let waitUntilAfterInitialFetch: Bool
                            switch option {
                                case let .complete(waitUntilFetchStatus):
                                    waitUntilAfterInitialFetch = waitUntilFetchStatus
                                case let .incremental(waitUntilFetchStatus):
                                    waitUntilAfterInitialFetch = waitUntilFetchStatus
                            }
                            let dataDisposable = fileContext.data(range: 0 ..< Int32.max, waitUntilAfterInitialFetch: waitUntilAfterInitialFetch, next: { value in
                                self.dataQueue.async {
                                    if value.complete {
                                        if let pathExtension = pathExtension {
                                            let symlinkPath = paths.complete + ".\(pathExtension)"
                                            if fileSize(symlinkPath) == nil {
                                                let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath)
                                            }
                                            subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: value.size, complete: true))
                                        } else {
                                            subscriber.putNext(value)
                                        }
                                        subscriber.putCompletion()
                                    } else {
                                        subscriber.putNext(value)
                                    }
                                }
                            })
                            disposable.set(ActionDisposable {
                                dataDisposable.dispose()
                                releaseContext()
                            })
                        }
                    }
  ...
}

MediaBoxFileContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private let queue: Queue
private let path: String
private let partialPath: String
private let metaPath: String

private var content: MediaBoxFileContent
private let references = CounterBag()

func data(range: Range<Int32>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
    switch self.content {
      case let .complete(path, size):
          next(MediaResourceData(path: path, offset: Int(range.lowerBound), size: min(Int(range.upperBound), size) - Int(range.lowerBound), complete: true))
          return EmptyDisposable
      case let .partial(file):
          return file.data(range: range, waitUntilAfterInitialFetch: waitUntilAfterInitialFetch, next: next)
  }
}

enum MediaBoxFileContent

1
2
case complete(String, Int)
case partial(MediaBoxPartialFile)

MediaBoxPartialFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private let queue: Queue
private let path: String
private let metaPath: String
private let completePath: String
private let completed: (Int32) -> Void
private let metadataFd: ManagedFile
private let fd: ManagedFile
fileprivate let fileMap: MediaBoxFileMap
private var dataRequests = Bag<MediaBoxPartialFileDataRequest>()
private let missingRanges: MediaBoxFileMissingRanges
private let rangeStatusRequests = Bag<((IndexSet) -> Void, () -> Void)>()
private let statusRequests = Bag<((MediaResourceStatus) -> Void, Int32?)>()

private let fullRangeRequests = Bag<Disposable>()

private var currentFetch: (Promise<[(Range<Int>, MediaBoxFetchPriority)]>, Disposable)?
private var processedAtLeastOneFetch: Bool = false

func data(range: Range<Int32>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
  ...
  let index = self.dataRequests.add(MediaBoxPartialFileDataRequest(range: range, waitingUntilAfterInitialFetch: waitingUntilAfterInitialFetch, completion: { data in
            next(data)
        }))
  ...
}

ManagedFile

1
2
3
4
5
6
7
8
9
10
private let queue: Queue?
private let fd: Int32
private let mode: ManagedFileMode

public func write(_ data: UnsafeRawPointer, count: Int) -> Int
public func read(_ data: UnsafeMutableRawPointer, _ count: Int) -> Int
public func readData(count: Int) -> Data
public func seek(position: Int64)
public func truncate(count: Int64)
public func getSize() -> Int?

ManagedFileMode

1
2
3
case read
case readwrite
case append

MediaBoxFileMap

1
2
3
4
fileprivate(set) var sum: Int32
private(set) var ranges: IndexSet
private(set) var truncationSize: Int32?
private(set) var progress: Float?

MediaBoxPartialFileDataRequest

1
2
3
let range: Range<Int32>
var waitingUntilAfterInitialFetch: Bool
let completion: (MediaResourceData) -> Void

MediaBoxFileMissingRange

1
2
3
4
5
var range: Range<Int32>
let priority: MediaBoxFetchPriority
var remainingRanges: IndexSet
let error: (MediaResourceDataFetchError) -> Void
let completion: () -> Void

MediaBoxFileMissingRanges

1
2
3
4
5
6
7
8
9
10
11
private var requestedRanges = Bag<MediaBoxFileMissingRange>()
private var missingRangesFlattened = IndexSet()
private var missingRangesByPriority: [MediaBoxFetchPriority: IndexSet] = [:]

private func missingRequestedIntervals() -> [(Range<Int>, MediaBoxFetchPriority)] 
func fill(_ range: Range<Int32>) -> ([(Range<Int>, MediaBoxFetchPriority)], [() -> Void])?

func addRequest(fileMap: MediaBoxFileMap, range: Range<Int32>, priority: MediaBoxFetchPriority, error: @escaping (MediaResourceDataFetchError) -> Void, completion: @escaping () -> Void) -> (Int, [(Range<Int>, MediaBoxFetchPriority)]?)

func removeRequest(fileMap: MediaBoxFileMap, index: Int) -> [(Range<Int>, MediaBoxFetchPriority)]?
private func update(fileMap: MediaBoxFileMap) -> [(Range<Int>, MediaBoxFetchPriority)]?

enum ResourceDataRangeMode

1
2
3
case complete
case incremental
case partial

enum FetchResourceSourceType

1
2
case local
case remote

enum FetchResourceError

1
case generic

struct ResourceStorePaths

1
2
let partial: String
let complete: String

CachedMediaResourceRepresentation

enum CachedMediaResourceRepresentationResult

1
2
case temporaryPath(String)
case tempFile(TempBoxFile)

enum CachedMediaRepresentationKeepDuration

1
2
case general
case shortLived

protocol CachedMediaResourceRepresentation

1
2
3
var uniqueId: String { get }
var keepDuration: CachedMediaRepresentationKeepDuration { get }
func isEqual(to: CachedMediaResourceRepresentation) -> Bool

struct CachedMediaResourceRepresentationKey

1
2
let resourceId: MediaResourceId
let representation: CachedMediaResourceRepresentation
1
参考另一篇《Telegram-MediaResource》

class CachedMediaResourceRepresentationContext

1
2
3
4
var currentData: MediaResourceData?
let dataSubscribers = Bag<(MediaResourceData) -> Void>()
let disposable = MetaDisposable()
var initialized = false

ResourceStatusContext

enum MediaResourceStatus

1
2
3
case Remote
case Local
case Fetching(isActive: Bool, progress: Float)

class ResourceStatusContext

1
2
3
var status: MediaResourceStatus?
let subscribers = Bag<(MediaResourceStatus) -> Void>()
let disposable: Disposable

ResourceDataContext

struct MediaResourceData

1
2
3
4
public let path: String
public let offset: Int
public let size: Int
public let complete: Bool

class ResourceDataContext

1
2
3
4
5
6
var data: MediaResourceData
var processedFetch: Bool = false
let progresiveDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaReso
let completeDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaResour
var fetchDisposable: Disposable?
let fetchSubscribers = Bag<Void>()

MediaResourceDataFetchCopyLocalItem

protocol MediaResourceDataFetchCopyLocalItem

1
func copyTo(url: URL) -> Bool

MediaBoxFetchPriority

enum MediaBoxFetchPriority

1
2
3
case `default` = 0
case elevated = 1
case maximum = 2

MediaResourceDataFetchResult

enum MediaResourceDataFetchResult

1
2
3
4
5
6
7
8
case dataPart(resourceOffset: Int, data: Data, range: Range<Int>, complete: Bool)
case resourceSizeUpdated(Int)
case progressUpdated(Float)
case replaceHeader(data: Data, range: Range<Int>)
case moveLocalFile(path: String)
case moveTempFile(file: TempBoxFile)
case copyLocalItem(MediaResourceDataFetchCopyLocalItem)
case reset

TimeBasedCleanupImpl

1
2
3
suspendAwareDelay

setMaxStoreTimes(general: Int32, shortLived: Int32)
This post is licensed under CC BY 4.0 by the author.