class
MediaBox
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
public let basePath: String
private let statusQueue = Queue()
private let concurrentQueue = Queue.concurrentDefaultQueue()
private let dataQueue = Queue()
private let cacheQueue = Queue()
private let timeBasedCleanup: TimeBasedCleanup
private var statusContexts: [WrappedMediaResourceId: ResourceStatusContext] = [:]
private var cachedRepresentationContexts: [CachedMediaResourceRepresentationKey: CachedMediaResourceRepresentationContext] = [:]
private var fileContexts: [WrappedMediaResourceId: MediaBoxFileContext] = [:]
private var wrappedFetchResource = Promise<(MediaResource, Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>>()
public var preFetchedResourcePath: (MediaResource) -> String? = { _ in return nil }
// fetch
public var fetchResource: ((MediaResource, Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>)? {
didSet {
if let fetchResource = self.fetchResource {
wrappedFetchResource.set(.single(fetchResource))
} else {
wrappedFetchResource.set(.never())
}
}
}
// prefetch
public var wrappedFetchCachedResourceRepresentation = Promise<(MediaResource, CachedMediaResourceRepresentation) -> Signal<CachedMediaResourceRepresentationResult, NoError>>()
public var fetchCachedResourceRepresentation: ((MediaResource, CachedMediaResourceRepresentation) -> Signal<CachedMediaResourceRepresentationResult, NoError>)? {
didSet {
if let fetchCachedResourceRepresentation = self.fetchCachedResourceRepresentation {
wrappedFetchCachedResourceRepresentation.set(.single(fetchCachedResourceRepresentation))
} else {
wrappedFetchCachedResourceRepresentation.set(.never())
}
}
}
lazy var ensureDirectoryCreated: Void = {
try! FileManager.default.createDirectory(atPath: self.basePath, withIntermediateDirectories: true, attributes: nil)
try! FileManager.default.createDirectory(atPath: self.basePath + "/cache", withIntermediateDirectories: true, attributes: nil)
try! FileManager.default.createDirectory(atPath: self.basePath + "/short-cache", withIntermediateDirectories: true, attributes: nil)
}()
public init(basePath: String) {
self.basePath = basePath
self.timeBasedCleanup = TimeBasedCleanup(generalPaths: [
self.basePath,
self.basePath + "/cache"
], shortLivedPaths: [
self.basePath + "/short-cache"
])
let _ = self.ensureDirectoryCreated
}
// timebased cleanup
public func setMaxStoreTimes(general: Int32, shortLived: Int32) {
self.timeBasedCleanup.setMaxStoreTimes(general: general, shortLived: shortLived)
}
// path for resource
private func fileNameForId(_ id: MediaResourceId) -> String {
return "\(id.uniqueId)"
}
private func pathForId(_ id: MediaResourceId) -> String {
return "\(self.basePath)/\(fileNameForId(id))"
}
private func storePathsForId(_ id: MediaResourceId) -> ResourceStorePaths {
return ResourceStorePaths(partial: "\(self.basePath)/\(fileNameForId(id))_partial", complete: "\(self.basePath)/\(fileNameForId(id))")
}
private func cachedRepresentationPathForId(_ id: MediaResourceId, representation: CachedMediaResourceRepresentation) -> String {
let cacheString: String
switch representation.keepDuration {
case .general:
cacheString = "cache"
case .shortLived:
cacheString = "short-cache"
}
return "\(self.basePath)/\(cacheString)/\(fileNameForId(id)):\(representation.uniqueId)"
}
// move file
public func storeResourceData(_ id: MediaResourceId, data: Data) {
self.dataQueue.async {
let paths = self.storePathsForId(id)
let _ = try? data.write(to: URL(fileURLWithPath: paths.complete), options: [.atomic])
}
}
public func moveResourceData(_ id: MediaResourceId, fromTempPath: String) {
self.dataQueue.async {
let paths = self.storePathsForId(id)
let _ = try? FileManager.default.moveItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete))
}
}
public func copyResourceData(_ id: MediaResourceId, fromTempPath: String) {
self.dataQueue.async {
let paths = self.storePathsForId(id)
let _ = try? FileManager.default.copyItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete))
}
}
public func moveResourceData(from: MediaResourceId, to: MediaResourceId) {
self.dataQueue.async {
let pathsFrom = self.storePathsForId(from)
let pathsTo = self.storePathsForId(to)
link(pathsFrom.partial, pathsTo.partial)
link(pathsFrom.complete, pathsTo.complete)
}
}
private func maybeCopiedPreFetchedResource(completePath: String, resource: MediaResource) {
if let path = self.preFetchedResourcePath(resource) {
let _ = try? FileManager.default.copyItem(atPath: path, toPath: completePath)
}
}
// resource status
public func resourceStatus(_ resource: MediaResource, approximateSynchronousValue: Bool = false) -> Signal<MediaResourceStatus, NoError> {
let signal = Signal<MediaResourceStatus, NoError> { subscriber in
let disposable = MetaDisposable()
self.concurrentQueue.async {
let paths = self.storePathsForId(resource.id)
if let _ = fileSize(paths.complete) {
self.timeBasedCleanup.touch(paths: [
paths.complete
])
subscriber.putNext(.Local)
subscriber.putCompletion()
} else {
self.maybeCopiedPreFetchedResource(completePath: paths.complete, resource: resource)
if let _ = fileSize(paths.complete) {
self.timeBasedCleanup.touch(paths: [
paths.complete
])
subscriber.putNext(.Local)
subscriber.putCompletion()
return
}
self.statusQueue.async {
let resourceId = WrappedMediaResourceId(resource.id)
let statusContext: ResourceStatusContext
var statusUpdateDisposable: MetaDisposable?
if let current = self.statusContexts[resourceId] {
statusContext = current
} else {
let statusUpdateDisposableValue = MetaDisposable()
statusContext = ResourceStatusContext(disposable: statusUpdateDisposableValue)
self.statusContexts[resourceId] = statusContext
statusUpdateDisposable = statusUpdateDisposableValue
}
let index = statusContext.subscribers.add({ status in
subscriber.putNext(status)
})
if let status = statusContext.status {
subscriber.putNext(status)
}
if let statusUpdateDisposable = statusUpdateDisposable {
let statusQueue = self.statusQueue
self.dataQueue.async {
if let (fileContext, releaseContext) = self.fileContext(for: resource) {
let statusDisposable = fileContext.status(next: { [weak statusContext] value in
statusQueue.async {
if let current = self.statusContexts[resourceId], current === statusContext, current.status != value {
current.status = value
for subscriber in current.subscribers.copyItems() {
subscriber(value)
}
}
}
}, completed: { [weak statusContext] in
statusQueue.async {
if let current = self.statusContexts[resourceId], current === statusContext {
current.subscribers.remove(index)
if current.subscribers.isEmpty {
self.statusContexts.removeValue(forKey: resourceId)
current.disposable.dispose()
}
}
}
}, size: resource.size.flatMap(Int32.init))
statusUpdateDisposable.set(ActionDisposable {
statusDisposable.dispose()
releaseContext()
})
}
}
}
disposable.set(ActionDisposable { [weak statusContext] in
self.statusQueue.async {
if let current = self.statusContexts[WrappedMediaResourceId(resource.id)], current === statusContext {
current.subscribers.remove(index)
if current.subscribers.isEmpty {
self.statusContexts.removeValue(forKey: WrappedMediaResourceId(resource.id))
current.disposable.dispose()
}
}
}
})
}
}
}
return disposable
}
if approximateSynchronousValue {
return Signal<Signal<MediaResourceStatus, NoError>, NoError> { subscriber in
let paths = self.storePathsForId(resource.id)
if let _ = fileSize(paths.complete) {
subscriber.putNext(.single(.Local))
} else {
subscriber.putNext(.single(.Remote) |> then(signal))
}
subscriber.putCompletion()
return EmptyDisposable
} |> switchToLatest
} else {
return signal
}
}
// resource data
public func resourceData(_ resource: MediaResource, pathExtension: String? = nil, option: ResourceDataRequestOption = .complete(waitUntilFetchStatus: false), attemptSynchronously: Bool = false) -> Signal<MediaResourceData, NoError> {
...
self.dataQueue.async {
if let (fileContext, releaseContext) = self.fileContext(for: resource) {
let waitUntilAfterInitialFetch: Bool
switch option {
case let .complete(waitUntilFetchStatus):
waitUntilAfterInitialFetch = waitUntilFetchStatus
case let .incremental(waitUntilFetchStatus):
waitUntilAfterInitialFetch = waitUntilFetchStatus
}
let dataDisposable = fileContext.data(range: 0 ..< Int32.max, waitUntilAfterInitialFetch: waitUntilAfterInitialFetch, next: { value in
self.dataQueue.async {
if value.complete {
if let pathExtension = pathExtension {
let symlinkPath = paths.complete + ".\(pathExtension)"
if fileSize(symlinkPath) == nil {
let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath)
}
subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: value.size, complete: true))
} else {
subscriber.putNext(value)
}
subscriber.putCompletion()
} else {
subscriber.putNext(value)
}
}
})
disposable.set(ActionDisposable {
dataDisposable.dispose()
releaseContext()
})
}
}
...
}
MediaBoxFileContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private let queue: Queue
private let path: String
private let partialPath: String
private let metaPath: String
private var content: MediaBoxFileContent
private let references = CounterBag()
func data(range: Range<Int32>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
switch self.content {
case let .complete(path, size):
next(MediaResourceData(path: path, offset: Int(range.lowerBound), size: min(Int(range.upperBound), size) - Int(range.lowerBound), complete: true))
return EmptyDisposable
case let .partial(file):
return file.data(range: range, waitUntilAfterInitialFetch: waitUntilAfterInitialFetch, next: next)
}
}
enum
MediaBoxFileContent
1
2
case complete(String, Int)
case partial(MediaBoxPartialFile)
MediaBoxPartialFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private let queue: Queue
private let path: String
private let metaPath: String
private let completePath: String
private let completed: (Int32) -> Void
private let metadataFd: ManagedFile
private let fd: ManagedFile
fileprivate let fileMap: MediaBoxFileMap
private var dataRequests = Bag<MediaBoxPartialFileDataRequest>()
private let missingRanges: MediaBoxFileMissingRanges
private let rangeStatusRequests = Bag<((IndexSet) -> Void, () -> Void)>()
private let statusRequests = Bag<((MediaResourceStatus) -> Void, Int32?)>()
private let fullRangeRequests = Bag<Disposable>()
private var currentFetch: (Promise<[(Range<Int>, MediaBoxFetchPriority)]>, Disposable)?
private var processedAtLeastOneFetch: Bool = false
func data(range: Range<Int32>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
...
let index = self.dataRequests.add(MediaBoxPartialFileDataRequest(range: range, waitingUntilAfterInitialFetch: waitingUntilAfterInitialFetch, completion: { data in
next(data)
}))
...
}
ManagedFile
1
2
3
4
5
6
7
8
9
10
private let queue: Queue?
private let fd: Int32
private let mode: ManagedFileMode
public func write(_ data: UnsafeRawPointer, count: Int) -> Int
public func read(_ data: UnsafeMutableRawPointer, _ count: Int) -> Int
public func readData(count: Int) -> Data
public func seek(position: Int64)
public func truncate(count: Int64)
public func getSize() -> Int?
ManagedFileMode
1
2
3
case read
case readwrite
case append
MediaBoxFileMap
1
2
3
4
fileprivate(set) var sum: Int32
private(set) var ranges: IndexSet
private(set) var truncationSize: Int32?
private(set) var progress: Float?
MediaBoxPartialFileDataRequest
1
2
3
let range: Range<Int32>
var waitingUntilAfterInitialFetch: Bool
let completion: (MediaResourceData) -> Void
MediaBoxFileMissingRange
1
2
3
4
5
var range: Range<Int32>
let priority: MediaBoxFetchPriority
var remainingRanges: IndexSet
let error: (MediaResourceDataFetchError) -> Void
let completion: () -> Void
MediaBoxFileMissingRanges
1
2
3
4
5
6
7
8
9
10
11
private var requestedRanges = Bag<MediaBoxFileMissingRange>()
private var missingRangesFlattened = IndexSet()
private var missingRangesByPriority: [MediaBoxFetchPriority: IndexSet] = [:]
private func missingRequestedIntervals() -> [(Range<Int>, MediaBoxFetchPriority)]
func fill(_ range: Range<Int32>) -> ([(Range<Int>, MediaBoxFetchPriority)], [() -> Void])?
func addRequest(fileMap: MediaBoxFileMap, range: Range<Int32>, priority: MediaBoxFetchPriority, error: @escaping (MediaResourceDataFetchError) -> Void, completion: @escaping () -> Void) -> (Int, [(Range<Int>, MediaBoxFetchPriority)]?)
func removeRequest(fileMap: MediaBoxFileMap, index: Int) -> [(Range<Int>, MediaBoxFetchPriority)]?
private func update(fileMap: MediaBoxFileMap) -> [(Range<Int>, MediaBoxFetchPriority)]?
enum
ResourceDataRangeMode
1
2
3
case complete
case incremental
case partial
enum
FetchResourceSourceType
1
2
case local
case remote
enum
FetchResourceError
1
case generic
struct
ResourceStorePaths
1
2
let partial: String
let complete: String
CachedMediaResourceRepresentation
enum
CachedMediaResourceRepresentationResult
1
2
case temporaryPath(String)
case tempFile(TempBoxFile)
enum
CachedMediaRepresentationKeepDuration
1
2
case general
case shortLived
protocol
CachedMediaResourceRepresentation
1
2
3
var uniqueId: String { get }
var keepDuration: CachedMediaRepresentationKeepDuration { get }
func isEqual(to: CachedMediaResourceRepresentation) -> Bool
struct
CachedMediaResourceRepresentationKey
1
2
let resourceId: MediaResourceId
let representation: CachedMediaResourceRepresentation
1
参考另一篇《Telegram-MediaResource》
class
CachedMediaResourceRepresentationContext
1
2
3
4
var currentData: MediaResourceData?
let dataSubscribers = Bag<(MediaResourceData) -> Void>()
let disposable = MetaDisposable()
var initialized = false
ResourceStatusContext
enum
MediaResourceStatus
1
2
3
case Remote
case Local
case Fetching(isActive: Bool, progress: Float)
class
ResourceStatusContext
1
2
3
var status: MediaResourceStatus?
let subscribers = Bag<(MediaResourceStatus) -> Void>()
let disposable: Disposable
ResourceDataContext
struct
MediaResourceData
1
2
3
4
public let path: String
public let offset: Int
public let size: Int
public let complete: Bool
class
ResourceDataContext
1
2
3
4
5
6
var data: MediaResourceData
var processedFetch: Bool = false
let progresiveDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaReso
let completeDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaResour
var fetchDisposable: Disposable?
let fetchSubscribers = Bag<Void>()
MediaResourceDataFetchCopyLocalItem
protocol
MediaResourceDataFetchCopyLocalItem
1
func copyTo(url: URL) -> Bool
MediaBoxFetchPriority
enum
MediaBoxFetchPriority
1
2
3
case `default` = 0
case elevated = 1
case maximum = 2
MediaResourceDataFetchResult
enum
MediaResourceDataFetchResult
1
2
3
4
5
6
7
8
case dataPart(resourceOffset: Int, data: Data, range: Range<Int>, complete: Bool)
case resourceSizeUpdated(Int)
case progressUpdated(Float)
case replaceHeader(data: Data, range: Range<Int>)
case moveLocalFile(path: String)
case moveTempFile(file: TempBoxFile)
case copyLocalItem(MediaResourceDataFetchCopyLocalItem)
case reset
TimeBasedCleanupImpl
1
2
3
suspendAwareDelay
setMaxStoreTimes(general: Int32, shortLived: Int32)