protocol
MediaFrameSource
1
2
3
4
func addEventSink(_ f: @escaping (MediaTrackEvent) -> Void) -> Int
func removeEventSink(_ index: Int)
func generateFrames(until timestamp: Double)
func seek(timestamp: Double) -> Signal<QueueLocalObject<MediaFrameSourceSeekResult>, MediaFrameSourceSeekError>
class
FFMpegMediaFrameSource:
MediaFrameSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
private let queue: Queue
private let postbox: Postbox
private let resourceReference: MediaResourceReference
private let tempFilePath: String?
private let streamable: Bool
private let stallDuration: Double
private let lowWaterDuration: Double
private let highWaterDuration: Double
private let video: Bool
private let preferSoftwareDecoding: Bool
private let fetchAutomatically: Bool
private let maximumFetchSize: Int?
/// 用来线程保活,while循环
private let taskQueue: ThreadTaskQueue
private let thread: Thread
private let eventSinkBag = Bag<(MediaTrackEvent) -> Void>()
private var generatingFrames = false
private var requestedFrameGenerationTimestamp: Double?
@objc private static func threadEntry(_ taskQueue: ThreadTaskQueue) {
autoreleasepool {
// 线程context, 用作解码
let context = FFMpegMediaFrameSourceContext(thread: Thread.current)
let localStorage = Thread.current.threadDictionary
localStorage["FFMpegMediaFrameSourceContext"] = context
taskQueue.loop()
Thread.current.threadDictionary.removeObject(forKey: "FFMpegMediaFrameSourceContext")
}
}
public init(queue: Queue, postbox: Postbox, resourceReference: MediaResourceReference, tempFilePath: String?, streamable: Bool, video: Bool,
preferSoftwareDecoding: Bool, fetchAutomatically: Bool, maximumFetchSize: Int? = nil, stallDuration: Double = 1.0, lowWaterDuration: Double =
2.0, highWaterDuration: Double = 3.0) {
self.queue = queue
self.postbox = postbox
self.resourceReference = resourceReference
self.tempFilePath = tempFilePath
self.streamable = streamable
self.video = video
self.preferSoftwareDecoding = preferSoftwareDecoding
self.fetchAutomatically = fetchAutomatically
self.maximumFetchSize = maximumFetchSize
self.stallDuration = stallDuration
self.lowWaterDuration = lowWaterDuration
self.highWaterDuration = highWaterDuration
self.taskQueue = ThreadTaskQueue()
// initialize thread
self.thread = Thread(target: FFMpegMediaFrameSource.self, selector: #selector(FFMpegMediaFrameSource.threadEntry(_:)), object: taskQueue)
self.thread.name = "FFMpegMediaFrameSourceContext"
self.thread.start()
super.init()
}
/// 派任务到线程
func performWithContext(_ f: @escaping (FFMpegMediaFrameSourceContext) -> Void) {
assert(self.queue.isCurrent())
taskQueue.enqueue {
if let context = contextForCurrentThread() {
f(context)
}
}
}
private func internalGenerateFrames(until timestamp: Double) {
if self.generatingFrames {
return
}
self.generatingFrames = true
let postbox = self.postbox
let resourceReference = self.resourceReference
let tempFilePath = self.tempFilePath
let queue = self.queue
let streamable = self.streamable
let video = self.video
let preferSoftwareDecoding = self.preferSoftwareDecoding
let fetchAutomatically = self.fetchAutomatically
let maximumFetchSize = self.maximumFetchSize
/// 解码并通知sinkers
self.performWithContext { [weak self] context in
context.initializeState(postbox: postbox, resourceReference: resourceReference, tempFilePath: tempFilePath, streamable: streamable, video:
video, preferSoftwareDecoding: preferSoftwareDecoding, fetchAutomatically: fetchAutomatically, maximumFetchSize: maximumFetchSize)
let (frames, endOfStream) = context.takeFrames(until: timestamp)
queue.async { [weak self] in
if let strongSelf = self {
strongSelf.generatingFrames = false
for sink in strongSelf.eventSinkBag.copyItems() {
sink(.frames(frames))
if endOfStream {
sink(.endOfStream)
}
}
if strongSelf.requestedFrameGenerationTimestamp != nil && !strongSelf.requestedFrameGenerationTimestamp!.isEqual(to: timestamp) {
strongSelf.internalGenerateFrames(until: strongSelf.requestedFrameGenerationTimestamp!)
}
}
}
}
}
// seek
public func seek(timestamp: Double) -> Signal<QueueLocalObject<MediaFrameSourceSeekResult>, MediaFrameSourceSeekError> {
assert(self.queue.isCurrent())
return Signal { subscriber in
let disposable = MetaDisposable()
let queue = self.queue
let postbox = self.postbox
let resourceReference = self.resourceReference
let tempFilePath = self.tempFilePath
let streamable = self.streamable
let video = self.video
let preferSoftwareDecoding = self.preferSoftwareDecoding
let fetchAutomatically = self.fetchAutomatically
let maximumFetchSize = self.maximumFetchSize
let currentSemaphore = Atomic<Atomic<DispatchSemaphore?>?>(value: nil)
disposable.set(ActionDisposable {
currentSemaphore.with({ $0 })?.with({ $0 })?.signal()
})
self.performWithContext { [weak self] context in
let _ = currentSemaphore.swap(context.currentSemaphore)
context.initializeState(postbox: postbox, resourceReference: resourceReference, tempFilePath: tempFilePath, streamable: streamable, video: video, preferSoftwareDecoding: preferSoftwareDecoding, fetchAutomatically: fetchAutomatically, maximumFetchSize: maximumFetchSize)
context.seek(timestamp: timestamp, completed: { streamDescriptionsAndTimestamp in
queue.async {
if let strongSelf = self {
if let (streamDescriptions, timestamp) = streamDescriptionsAndTimestamp {
strongSelf.requestedFrameGenerationTimestamp = nil
subscriber.putNext(QueueLocalObject(queue: queue, generate: {
if let strongSelf = self {
var audioBuffer: MediaTrackFrameBuffer?
var videoBuffer: MediaTrackFrameBuffer?
if let audio = streamDescriptions.audio {
audioBuffer = MediaTrackFrameBuffer(frameSource: strongSelf, decoder: audio.decoder, type: .audio, duration: audio.duration, rotationAngle: 0.0, aspect: 1.0, stallDuration: strongSelf.stallDuration, lowWaterDuration: strongSelf.lowWaterDuration, highWaterDuration: strongSelf.highWaterDuration)
}
var extraDecodedVideoFrames: [MediaTrackFrame] = []
if let video = streamDescriptions.video {
videoBuffer = MediaTrackFrameBuffer(frameSource: strongSelf, decoder: video.decoder, type: .video, duration: video.duration, rotationAngle: video.rotationAngle, aspect: video.aspect, stallDuration: strongSelf.stallDuration, lowWaterDuration: strongSelf.lowWaterDuration, highWaterDuration: strongSelf.highWaterDuration)
for videoFrame in streamDescriptions.extraVideoFrames {
if let decodedFrame = video.decoder.decode(frame: videoFrame) {
extraDecodedVideoFrames.append(decodedFrame)
}
}
}
return MediaFrameSourceSeekResult(buffers: MediaPlaybackBuffers(audioBuffer: audioBuffer, videoBuffer: videoBuffer), extraDecodedVideoFrames: extraDecodedVideoFrames, timestamp: timestamp)
} else {
return MediaFrameSourceSeekResult(buffers: MediaPlaybackBuffers(audioBuffer: nil, videoBuffer: nil), extraDecodedVideoFrames: [], timestamp: timestamp)
}
}))
let _ = currentSemaphore.swap(nil)
subscriber.putCompletion()
} else {
let _ = currentSemaphore.swap(nil)
subscriber.putError(.generic)
}
}
}
})
}
return disposable
}
}