移动端音视频 VOL.3 – 使用线程读取AVPacket
写在前面的话
锵锵锵!我们又迎来了我们的第三期。 本期的内容将会稍长,并将会介绍的内容是,数据包(AVPacket)的读取。 这次小休假,去了杭州吃了臭豆腐,还参加了死党的婚礼。 回去真是很花钱呢!回去真是很花钱呢!回去真是很花钱呢!大出血~~~~~
一个队列数据结构
在正式进行包的读取之前,需要额外做一件事情,我们设计并实现一个数据结构,用来存储读取到的AVPacket(其实就是一个缓存),因为播放音视频的时候,并不是使用的时候才进行读取(这样就来不及了)。 满足先进先出(FIFO)原则的数据结构是队列,并且它需要具有以下功能:
- 初始化队列(init);
- 放进一个包(put);
- 取出一个包(get);
- 判断是否需要放进一个包(enough);
- 清空队列(flush);
- 销毁队列(destory)。
将会使用链表来实现这个先进先出队列,所以它其实本质是一个链表,但是既然ffplay把它命名为“Queue”,接下来我都会称他为队列。此外,为了多线程实行的时候的的安全问题,需要一个锁的机制。 其实作者在ffmepgtutorial中书写的结构类参考了ffplay的很多地方,本文稍后会提到。
基本数据结构
先造一个基本的队列数据结构:
#import <libavformat/avformat.h>
#import <libavcodec/avcodec.h>
typedef struct AVPacketNode{
// AVPacket数据包
AVPacket pkt;
// 下一个节点
struct AVPacketNode* next;
} AVPacketNode;
typedef struct AVPacketQueue{
// 头结点&末尾结点
AVPacketNode *first_node, *last_node;
// 节点数量
int nb_packets;
// 所有包暂用的内存大小
int size;
// 所有包的时长
int64_t duration;
// 锁
dispatch_semaphore_t mutex;
// 停止标志
int request_abort;
} AVPacketQueue;
这里的数据结构非常简单,首先是一个链表节点(AVPacketNode),每个节点中将会存储AVPacket数据包,并且具有一个指向下一个节点的指针。 然后,对于整个链表(AVPacketQueue),我们用两个指针变量分别指向其头部和尾部,同时,包含一些基本变量用来存储当前链表的长度,内容占有的大小,总时长等,最后,还需要一个信号量来完成其锁的机制。
初始化&放入包
初始化这个队列,并且放入包的函数如下:
// 初始化队列
static int av_packet_queue_init(AVPacketQueue* q){
memset((void*)q, 0, sizeof(AVPacketQueue));
q->mutex = dispatch_semaphore_create(1);
return 0;
}
// 私有:放入数据包
static int av_packet_queue_put_private(AVPacketQueue* q, AVPacket* packet){
AVPacketNode *packetNode = av_malloc(sizeof(AVPacketNode));
if(!packetNode){
return -1;
}
packetNode->pkt = *packet;
packetNode->next = NULL;
if(!q->first_node){
q->first_node = packetNode;
}
else{
q->last_node->next = packetNode;
}
q->last_node = packetNode;
q->nb_packets += 1;
q->size += (packet->size + sizeof(AVPacketNode));
q->duration += packet->duration;
return 0;
}
// 公开:放入数据包
static int av_packet_queue_put(AVPacketQueue* q, AVPacket* packet){
int ret = 0;
dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
ret = av_packet_queue_put_private(q, packet);
dispatch_semaphore_signal(q->mutex);
return ret;
}
这部分没有需要什么特别说明的,包含了一些基本的链表操作,唯一需要注意的是av_packet_queue_put_private与av_packet_queue_put的区别,前者是基本逻辑,后者是在前者的基础上增加了一道信号锁。
放入包&检测包的数量
static int av_packet_queue_put_nullpacket(AVPacketQueue* q, int stream_index){
AVPacket *packet = NULL;
av_init_packet(packet);
packet->data = NULL;
packet->size = 0;
packet->stream_index = stream_index;
return av_packet_queue_put(q, packet);
}
static int av_stream_has_enough_packets(AVPacketQueue *q, AVStream* stream, int stream_id){
return stream_id < 0
stream->disposition == AV_DISPOSITION_ATTACHED_PIC
(q->nb_packets > MIN_FRAMES && (!q->duration av_q2d(stream->time_base) * q->duration > 1.0));
}
av_packet_queue_put_nullpacket用来放入一个空的数据包,av_stream_has_enough_packets用来判读某个数据流的数据包已经足够了,是否需要读取新的数据包到队列。 观察一下av_stream_has_enough_packets的判断条件,经过调查,ffplay的源码中也是这样考虑的,ffplay的源码在这里。 这篇文章也稍微讲述了一些ffplay的该部分内容:ffplay read线程分析 —— 知乎。 这三个条件分别为:
- 是否读取(打开)了流,stream_id < 0为没有读取(打开)到流;
- 是否该流是封面图,有些音频文件会包含一路流用来存储那种专辑封面图,相信读者都看见过以前的mp3文件总是包含这种功能,AV_DISPOSITION_ATTACHED_PIC意味着这路流是一张图片;
- 是否帧数(数据包)大于MIN_FRAMES(这里设置为25)并且时长已经大于1s了,这里笔者有点不理解的是“!q->duration”这个表达式,它与后者用“或”相连接,也就是说这里一旦duration的值为0也被允许,不知何意。
取得包
static int av_packet_queue_get(AVPacketQueue* q, AVPacket* packet, int block){
int ret;
dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
while(1){
if(q->request_abort){
ret = -1;
break;
}
AVPacketNode* node = q->first_node;
if(node){
q->first_node = q->first_node->next;
if(!q->first_node){
q->last_node = NULL;
}
q->nb_packets -= 1;
q->size -= (node->pkt.size + sizeof(node));
q->duration -= node->pkt.duration;
if(packet){
*packet = node->pkt;
}
ret = 1;
break;
}
else if(!block){
ret = 0;
break;
}
else{
dispatch_semaphore_signal(q->mutex);
usleep(10000);
dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
}
}
dispatch_semaphore_signal(q->mutex);
return ret;
}
av_packet_queue_get用来取得数据包,形参中的packet就是输出,而block则用来标记 —— 如果当前队列为空,是否需要循环等待到直到获得一个数据包。 该函数也采用了简单的信号锁的机制来保证线程安全问题。
销毁队列
static void av_packet_queue_flush(AVPacketQueue* q){
AVPacketNode *currentNode, *nextNode;
dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
for(currentNode = q->first_node;currentNode;currentNode = nextNode){
nextNode = currentNode->next;
av_packet_unref(¤tNode->pkt);
av_freep(¤tNode);
}
q->first_node = NULL;
q->last_node = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
dispatch_semaphore_signal(q->mutex);
}
static void av_packet_destory(AVPacketQueue* q){
av_packet_queue_flush(q);
}
上面是销毁队列(其实是链表啦)用的函数,简单循环,一次完成。
读取包的工作 —— 多线程操作
在这再说明一次,上面的所有内容都围绕着AVPacketQueue这个数据结构,这个数据结构形如链表,内部包含多个AVPacketNode节点,而每个节点中都包含一个AVPacket,其他的所有函数都是用来操作这个数据结构的。当然你也可以使用自己设计的数据结构来保存一系列AVPacket。 接下来就是正式的读包步骤了,来理一理需要做哪几件事。 我们需要一个一直存在的线程,负责从最开始的AVFormatContext中获得数据包,它应当每隔一小段时间检查一下AVPacketQueue是否是满的,如果是满的,就什么也不做,如果是不满的,就读取一些数据包到AVPacketQueue,直到所以数据包都读完。
分开的队列&初始化
分别为音频流和视频流建立变量。注意!AVPacketQueue是一个结构体。 同时,还有一些基本的属性接下来会用到。
@interface AVPacketTestUnit()
@property(nonatomic, strong) MRThread* readThread;
@property(nonatomic, copy) NSString* contentPath;
@property(nonatomic, assign) BOOL packetBufferIsFull;
@property(nonatomic, assign) BOOL packetBufferIsEmpty;
@property(nonatomic, strong) dispatch_block_t onPacketBufferFullBlock;
@property(nonatomic, strong) dispatch_block_t onPacketBufferEmptyBlock;
@property(nonatomic, strong) NSTimer *timer;
@property(nonatomic, assign) NSInteger second;
@end
@implementation AVPacketTestUnit{
AVPacketQueue audioq;
AVPacketQueue videoq;
int audio_stream_index;
int video_stream_index;
AVStream* audioStream;
AVStream* videoStream;
int eof;
}
待会线程会把一个一个的数据包分别读进audioq与videoq里。 然后是一些基本的初始化步骤。
- (void)prepareToRead{
if(self.readThread){
NSLog(@"thread has been created.");
}
av_register_all();
av_packet_queue_init(&audioq);
av_packet_queue_init(&videoq);
audio_stream_index = -1;
video_stream_index = -1;
self.readThread = [[MRThread alloc] initWithTarget:self selector:@selector(readPacketFunc) object:nil];
self.readThread.name = @"readPacketThread";
}
还记得我们的番外篇文章吗?这里的线程类用到的就是ffmpegTutorial的作者书写的MRThread,我们之前文章中的线程类就是他书写的。 至此,用来缓存数据包的类,用来执行读取数据包函数的线程,以及一些必要的属性都已经全部准备好了。
打开视频文件
线程需要做哪些事情?哪些事情是执行一次的?哪些事情是循环的? 显而易见,参照我们的上一节内容,首先还是要按部就班的打开一个视频文件,读取流信息…等等。
- (void)readPacketFunc{
self.contentPath = [[NSBundle mainBundle] pathForResource:@"sm25392237" ofType:@".mp4"];
const char* clangContentPath = [self.contentPath cStringUsingEncoding:kCFStringEncodingUTF8];
AVFormatContext* formatCtx = avformat_alloc_context();
if(!formatCtx){
NSLog(@"alloc AVFormatContext failed.");
return;
}
// 其实上面这步不做也可以
// AVFormatContext* formatCtx = NULL;
if(0 != avformat_open_input(&formatCtx, clangContentPath, NULL, NULL)){
avformat_close_input(&formatCtx);
NSLog(@"open file failed.");
return;
}
formatCtx->probesize = 512 * 1024;
formatCtx->max_analyze_duration = 5 * AV_TIME_BASE;
if(0 != avformat_find_stream_info(formatCtx, NULL)){
avformat_close_input(&formatCtx);
NSLog(@"can not find any stream.");
return;
}
for(int i = 0; i < formatCtx->nb_streams;i ++){
AVStream* stream = formatCtx->streams[i];
AVCodecContext* codecCtx = avcodec_alloc_context3(NULL);
if(!codecCtx){
continue;
}
int ret = avcodec_parameters_to_context(codecCtx, stream->codecpar);
if(ret < 0){
return;
}
enum AVMediaType mediaType = codecCtx->codec_type;
switch (mediaType) {
case AVMEDIA_TYPE_AUDIO:
audioStream = stream;
audio_stream_index = i;
break;
case AVMEDIA_TYPE_VIDEO:
videoStream = stream;
video_stream_index = i;
break;
case AVMEDIA_TYPE_ATTACHMENT:
NSLog(@"attachment stream.");
break;
default:
NSLog(@"other streams.");
break;
}
}
[self readPacketLoop:formatCtx];
avformat_close_input(&formatCtx);
}
注意第6行的内容,本次新发现了一个不一样的点。源代码中,有这样的描述: 在执行avformat_open_input方法时,传入的AVFormatContext(二级指针)参数可以有两个情况:要么这是一个空指针,要么这是一个被分配内存的指针。所以这两种情况都可以。 所以一般打开一个视频文件的步骤是这样的:
- 初始化AVFormateContext;
- 调用avformate_open_input打开视频文件;
- 设定AVFormatContext的probesize和max_analyze_duration;
- 调用avformat_find_stream_info确认是否找到流;
- 枚举每一个AVStream,AVStream保存在AVFormatContext的streams中,数量为nb_streams个;
- Done!
循环读包
然后会执行的是一个一直循环的函数,用来进行数据包的读取。
- (void)run{
self.onPacketBufferFullBlock = ^(){
};
self.onPacketBufferEmptyBlock = ^(){
};
[self prepareToRead];
[self readPacket];
self.timer = [NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(onTimer) userInfo:nil repeats:YES];
self.second = 0;
[[NSRunLoop mainRunLoop] addTimer:self.timer forMode:NSRunLoopCommonModes];
}
- (void)onTimer{
// 每过3秒消耗一次数据包
// 随机决定“消耗一个数据包”还是“消耗所有数据包”
if(self.second % 3 == 0){
int randomNumber = arc4random_uniform(2);
if(randomNumber == 0){
// 消耗所有数据包
av_packet_queue_flush(&audioq);
av_packet_queue_flush(&videoq);
self.packetBufferIsFull = NO;
self.packetBufferIsEmpty = YES;
NSLog(@"# %ds - %@full - comsume all packets,now audioq = %d, videoq = %d", (int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
}
else{
// 消耗一个数据包
AVPacket audioPkt;
int audioqNotEmpty = av_packet_queue_get(&audioq, &audioPkt, 0);
if(audioqNotEmpty){
av_packet_unref(&audioPkt);
}
AVPacket videoPkt;
int videoqNotEmpty = av_packet_queue_get(&videoq, &videoPkt, 0);
if(videoqNotEmpty){
av_packet_unref(&videoPkt);
}
self.packetBufferIsFull = NO;
if(!audioqNotEmpty && !videoqNotEmpty){
self.packetBufferIsEmpty = YES;
}
NSLog(@"# %ds - %@full - comsume one packet, now audioq = %d, videoq = %d", (int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
}
}
else{
NSLog(@"# %ds - %@full - now audioq = %d, videoq = %d",(int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
}
self.second += 1;
}
基本想法是这样,创建一个死循环,每隔一定时间(10ms)尝试读取一次数据包放入队列中,如果队列没满,那么读取,如果队列满了,那么再等待一定时间。注意到首先的代码块是判断是否队列是否满了(第8行到第19行),然后才是真正的读取数据包并放入队列的操作。 接下来是av_read_frame函数的调用,这是真正从AVFormatContext中读取数据包的函数,我打算下一节内容详细写一写关于这个函数的内容(大概。 再读取一个数据包时,进行了是否到文件末尾的判断,这里的这个if判断其实也来自于ffplay的源代码。 至此,我们的读包行为已经完备了,以上的全部就是如何读取一个数据包的所应该包含的数据结构+流程函数。
测试代码
新建一个测试单元来测试一下我们的代码是否正确。 不停的读取数据包放入队列,然后使用一个简单的NSTimer来定时输出状态,并且每过3秒会消耗一次数据包。 Debug区域的输出信息如下: Bingo!结果符合代码的想法。 至此,我们的基本的读包流程就全部结束了。稍等我阅读时间完下一章节后,再来书写移动端音视频的第四期内容。 感谢各位!