Home telegram - ffmpeg - mediaFrameSource
Post
Cancel

telegram - ffmpeg - mediaFrameSource

protocol MediaFrameSource

1
2
3
4
func addEventSink(_ f: @escaping (MediaTrackEvent) -> Void) -> Int
func removeEventSink(_ index: Int)
func generateFrames(until timestamp: Double)
func seek(timestamp: Double) -> Signal<QueueLocalObject<MediaFrameSourceSeekResult>, MediaFrameSourceSeekError>

class FFMpegMediaFrameSource:MediaFrameSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
private let queue: Queue
private let postbox: Postbox
private let resourceReference: MediaResourceReference
private let tempFilePath: String?
private let streamable: Bool
private let stallDuration: Double
private let lowWaterDuration: Double
private let highWaterDuration: Double
private let video: Bool
private let preferSoftwareDecoding: Bool
private let fetchAutomatically: Bool
private let maximumFetchSize: Int?

/// 用来线程保活,while循环
private let taskQueue: ThreadTaskQueue
private let thread: Thread

private let eventSinkBag = Bag<(MediaTrackEvent) -> Void>()
private var generatingFrames = false
private var requestedFrameGenerationTimestamp: Double?

@objc private static func threadEntry(_ taskQueue: ThreadTaskQueue) {
    autoreleasepool {
      // 线程context, 用作解码
        let context = FFMpegMediaFrameSourceContext(thread: Thread.current)
        let localStorage = Thread.current.threadDictionary
        localStorage["FFMpegMediaFrameSourceContext"] = context
        taskQueue.loop()
        
        Thread.current.threadDictionary.removeObject(forKey: "FFMpegMediaFrameSourceContext")
    }
}

public init(queue: Queue, postbox: Postbox, resourceReference: MediaResourceReference, tempFilePath: String?, streamable: Bool, video: Bool,
preferSoftwareDecoding: Bool, fetchAutomatically: Bool, maximumFetchSize: Int? = nil, stallDuration: Double = 1.0, lowWaterDuration: Double =
2.0, highWaterDuration: Double = 3.0) {
    self.queue = queue
    self.postbox = postbox
    self.resourceReference = resourceReference
    self.tempFilePath = tempFilePath
    self.streamable = streamable
    self.video = video
    self.preferSoftwareDecoding = preferSoftwareDecoding
    self.fetchAutomatically = fetchAutomatically
    self.maximumFetchSize = maximumFetchSize
    self.stallDuration = stallDuration
    self.lowWaterDuration = lowWaterDuration
    self.highWaterDuration = highWaterDuration
    
    self.taskQueue = ThreadTaskQueue()
    
  // initialize thread
    self.thread = Thread(target: FFMpegMediaFrameSource.self, selector: #selector(FFMpegMediaFrameSource.threadEntry(_:)), object: taskQueue)
    self.thread.name = "FFMpegMediaFrameSourceContext"
    self.thread.start()
    
    super.init()
}

/// 派任务到线程
func performWithContext(_ f: @escaping (FFMpegMediaFrameSourceContext) -> Void) {
    assert(self.queue.isCurrent())
    
    taskQueue.enqueue {
        if let context = contextForCurrentThread() {
            f(context)
        }
    }
}

private func internalGenerateFrames(until timestamp: Double) {
    if self.generatingFrames {
        return
    }
    
    self.generatingFrames = true
    
    let postbox = self.postbox
    let resourceReference = self.resourceReference
    let tempFilePath = self.tempFilePath
    let queue = self.queue
    let streamable = self.streamable
    let video = self.video
    let preferSoftwareDecoding = self.preferSoftwareDecoding
    let fetchAutomatically = self.fetchAutomatically
    let maximumFetchSize = self.maximumFetchSize
    
  /// 解码并通知sinkers
    self.performWithContext { [weak self] context in
        context.initializeState(postbox: postbox, resourceReference: resourceReference, tempFilePath: tempFilePath, streamable: streamable, video:
video, preferSoftwareDecoding: preferSoftwareDecoding, fetchAutomatically: fetchAutomatically, maximumFetchSize: maximumFetchSize)
        
        let (frames, endOfStream) = context.takeFrames(until: timestamp)
        
        queue.async { [weak self] in
            if let strongSelf = self {
                strongSelf.generatingFrames = false
                
                for sink in strongSelf.eventSinkBag.copyItems() {
                    sink(.frames(frames))
                    if endOfStream {
                        sink(.endOfStream)
                    }
                }
                
                if strongSelf.requestedFrameGenerationTimestamp != nil && !strongSelf.requestedFrameGenerationTimestamp!.isEqual(to: timestamp) {
                    strongSelf.internalGenerateFrames(until: strongSelf.requestedFrameGenerationTimestamp!)
                }
            }
        }
    }
}

// seek
public func seek(timestamp: Double) -> Signal<QueueLocalObject<MediaFrameSourceSeekResult>, MediaFrameSourceSeekError> {
    assert(self.queue.isCurrent())

    return Signal { subscriber in
        let disposable = MetaDisposable()

        let queue = self.queue
        let postbox = self.postbox
        let resourceReference = self.resourceReference
        let tempFilePath = self.tempFilePath
        let streamable = self.streamable
        let video = self.video
        let preferSoftwareDecoding = self.preferSoftwareDecoding
        let fetchAutomatically = self.fetchAutomatically
        let maximumFetchSize = self.maximumFetchSize

        let currentSemaphore = Atomic<Atomic<DispatchSemaphore?>?>(value: nil)

        disposable.set(ActionDisposable {
            currentSemaphore.with({ $0 })?.with({ $0 })?.signal()
        })

        self.performWithContext { [weak self] context in
            let _ = currentSemaphore.swap(context.currentSemaphore)

            context.initializeState(postbox: postbox, resourceReference: resourceReference, tempFilePath: tempFilePath, streamable: streamable, video: video, preferSoftwareDecoding: preferSoftwareDecoding, fetchAutomatically: fetchAutomatically, maximumFetchSize: maximumFetchSize)

            context.seek(timestamp: timestamp, completed: { streamDescriptionsAndTimestamp in
                queue.async {
                    if let strongSelf = self {
                        if let (streamDescriptions, timestamp) = streamDescriptionsAndTimestamp {
                            strongSelf.requestedFrameGenerationTimestamp = nil
                            subscriber.putNext(QueueLocalObject(queue: queue, generate: {
                                if let strongSelf = self {
                                    var audioBuffer: MediaTrackFrameBuffer?
                                    var videoBuffer: MediaTrackFrameBuffer?

                                    if let audio = streamDescriptions.audio {
                                        audioBuffer = MediaTrackFrameBuffer(frameSource: strongSelf, decoder: audio.decoder, type: .audio, duration: audio.duration, rotationAngle: 0.0, aspect: 1.0, stallDuration: strongSelf.stallDuration, lowWaterDuration: strongSelf.lowWaterDuration, highWaterDuration: strongSelf.highWaterDuration)
                                    }

                                    var extraDecodedVideoFrames: [MediaTrackFrame] = []
                                    if let video = streamDescriptions.video {
                                        videoBuffer = MediaTrackFrameBuffer(frameSource: strongSelf, decoder: video.decoder, type: .video, duration: video.duration, rotationAngle: video.rotationAngle, aspect: video.aspect, stallDuration: strongSelf.stallDuration, lowWaterDuration: strongSelf.lowWaterDuration, highWaterDuration: strongSelf.highWaterDuration)
                                        for videoFrame in streamDescriptions.extraVideoFrames {
                                            if let decodedFrame = video.decoder.decode(frame: videoFrame) {
                                                extraDecodedVideoFrames.append(decodedFrame)
                                            }
                                        }
                                    }

                                    return MediaFrameSourceSeekResult(buffers: MediaPlaybackBuffers(audioBuffer: audioBuffer, videoBuffer: videoBuffer), extraDecodedVideoFrames: extraDecodedVideoFrames, timestamp: timestamp)
                                } else {
                                    return MediaFrameSourceSeekResult(buffers: MediaPlaybackBuffers(audioBuffer: nil, videoBuffer: nil), extraDecodedVideoFrames: [], timestamp: timestamp)
                                }
                            }))
                            let _ = currentSemaphore.swap(nil)
                            subscriber.putCompletion()
                        } else {
                            let _ = currentSemaphore.swap(nil)
                            subscriber.putError(.generic)
                        }
                    }
                }
            })
        }

        return disposable
    }
}
This post is licensed under CC BY 4.0 by the author.