移动端音视频 VOL.3 – 使用线程读取AVPacket

Apr 28, 2021 舍温 舍温 65 min read 应用开发 iOS , FFmpeg , 音视频

写在前面的话

锵锵锵!我们又迎来了我们的第三期。 本期的内容将会稍长,并将会介绍的内容是,数据包(AVPacket)的读取。 这次小休假,去了杭州吃了臭豆腐,还参加了死党的婚礼。 回去真是很花钱呢!回去真是很花钱呢!回去真是很花钱呢!大出血~~~~~

一个队列数据结构

在正式进行包的读取之前,需要额外做一件事情,我们设计并实现一个数据结构,用来存储读取到的AVPacket(其实就是一个缓存),因为播放音视频的时候,并不是使用的时候才进行读取(这样就来不及了)。 满足先进先出(FIFO)原则的数据结构是队列,并且它需要具有以下功能:

  1. 初始化队列(init);
  2. 放进一个包(put);
  3. 取出一个包(get);
  4. 判断是否需要放进一个包(enough);
  5. 清空队列(flush);
  6. 销毁队列(destory)。

将会使用链表来实现这个先进先出队列,所以它其实本质是一个链表,但是既然ffplay把它命名为“Queue”,接下来我都会称他为队列。此外,为了多线程实行的时候的的安全问题,需要一个锁的机制。 其实作者在ffmepgtutorial中书写的结构类参考了ffplay的很多地方,本文稍后会提到。

基本数据结构

先造一个基本的队列数据结构:

#import <libavformat/avformat.h>
#import <libavcodec/avcodec.h>

typedef struct AVPacketNode{
    // AVPacket数据包
    AVPacket pkt;
    // 下一个节点
    struct AVPacketNode* next;
} AVPacketNode;

typedef struct AVPacketQueue{
    // 头结点&末尾结点
    AVPacketNode *first_node, *last_node;
    
    // 节点数量
    int nb_packets;
    
    // 所有包暂用的内存大小
    int size;
    // 所有包的时长
    int64_t duration;
    
    // 锁
    dispatch_semaphore_t mutex;
    
    // 停止标志
    int request_abort;
} AVPacketQueue;

这里的数据结构非常简单,首先是一个链表节点(AVPacketNode),每个节点中将会存储AVPacket数据包,并且具有一个指向下一个节点的指针。 然后,对于整个链表(AVPacketQueue),我们用两个指针变量分别指向其头部和尾部,同时,包含一些基本变量用来存储当前链表的长度,内容占有的大小,总时长等,最后,还需要一个信号量来完成其锁的机制。

初始化&放入包

初始化这个队列,并且放入包的函数如下:

// 初始化队列
static int av_packet_queue_init(AVPacketQueue* q){
    memset((void*)q, 0, sizeof(AVPacketQueue));
    q->mutex = dispatch_semaphore_create(1);
    
    return 0;
}

// 私有:放入数据包
static int av_packet_queue_put_private(AVPacketQueue* q, AVPacket* packet){
    
    AVPacketNode *packetNode = av_malloc(sizeof(AVPacketNode));
    if(!packetNode){
        return -1;
    }
    
    packetNode->pkt = *packet;
    packetNode->next = NULL;
    if(!q->first_node){
        q->first_node = packetNode;
    }
    else{
        q->last_node->next = packetNode;
    }
    
    q->last_node = packetNode;
    
    q->nb_packets += 1;
    q->size += (packet->size + sizeof(AVPacketNode));
    q->duration += packet->duration;
    
    return 0;
}

// 公开:放入数据包
static int av_packet_queue_put(AVPacketQueue* q, AVPacket* packet){
    int ret = 0;
    
    dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
    ret = av_packet_queue_put_private(q, packet);
    dispatch_semaphore_signal(q->mutex);
    
    return ret;
}

这部分没有需要什么特别说明的,包含了一些基本的链表操作,唯一需要注意的是av_packet_queue_put_private与av_packet_queue_put的区别,前者是基本逻辑,后者是在前者的基础上增加了一道信号锁。

放入包&检测包的数量


static int av_packet_queue_put_nullpacket(AVPacketQueue* q, int stream_index){
    
    AVPacket *packet = NULL;
    av_init_packet(packet);
    
    packet->data = NULL;
    packet->size = 0;
    packet->stream_index = stream_index;
    
    return av_packet_queue_put(q, packet);
}

static int av_stream_has_enough_packets(AVPacketQueue *q, AVStream* stream, int stream_id){
    return stream_id < 0 
    stream->disposition == AV_DISPOSITION_ATTACHED_PIC 
    (q->nb_packets > MIN_FRAMES && (!q->duration  av_q2d(stream->time_base) * q->duration > 1.0));
}

av_packet_queue_put_nullpacket用来放入一个空的数据包,av_stream_has_enough_packets用来判读某个数据流的数据包已经足够了,是否需要读取新的数据包到队列。 观察一下av_stream_has_enough_packets的判断条件,经过调查,ffplay的源码中也是这样考虑的,ffplay的源码在这里。 这篇文章也稍微讲述了一些ffplay的该部分内容:ffplay read线程分析 —— 知乎。 这三个条件分别为:

  1. 是否读取(打开)了流,stream_id < 0为没有读取(打开)到流;
  2. 是否该流是封面图,有些音频文件会包含一路流用来存储那种专辑封面图,相信读者都看见过以前的mp3文件总是包含这种功能,AV_DISPOSITION_ATTACHED_PIC意味着这路流是一张图片;
  3. 是否帧数(数据包)大于MIN_FRAMES(这里设置为25)并且时长已经大于1s了,这里笔者有点不理解的是“!q->duration”这个表达式,它与后者用“或”相连接,也就是说这里一旦duration的值为0也被允许,不知何意。

取得包

static int av_packet_queue_get(AVPacketQueue* q, AVPacket* packet, int block){
    int ret;
    
    dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
    while(1){
        
        if(q->request_abort){
            ret = -1;
            break;
        }
        
        AVPacketNode* node = q->first_node;
        if(node){
            q->first_node = q->first_node->next;
            if(!q->first_node){
                q->last_node = NULL;
            }
            
            q->nb_packets -= 1;
            q->size -= (node->pkt.size + sizeof(node));
            q->duration -= node->pkt.duration;
            
            if(packet){
                *packet = node->pkt;
            }
            
            ret = 1;
            break;
        }
        else if(!block){
            ret = 0;
            break;
        }
        else{
            dispatch_semaphore_signal(q->mutex);
            usleep(10000);
            dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
        }
        
    }
    
    dispatch_semaphore_signal(q->mutex);
    
    return ret;
}

av_packet_queue_get用来取得数据包,形参中的packet就是输出,而block则用来标记 —— 如果当前队列为空,是否需要循环等待到直到获得一个数据包。 该函数也采用了简单的信号锁的机制来保证线程安全问题。

销毁队列


static void av_packet_queue_flush(AVPacketQueue* q){
        
    AVPacketNode *currentNode, *nextNode;
    
    dispatch_semaphore_wait(q->mutex, DISPATCH_TIME_FOREVER);
    
    for(currentNode = q->first_node;currentNode;currentNode = nextNode){
        nextNode = currentNode->next;
        
        av_packet_unref(&currentNode->pkt);
        av_freep(&currentNode);
    }
    
    q->first_node = NULL;
    q->last_node = NULL;
    q->nb_packets = 0;
    q->size = 0;
    q->duration = 0;
    
    dispatch_semaphore_signal(q->mutex);
}

static void av_packet_destory(AVPacketQueue* q){
    av_packet_queue_flush(q);
}

上面是销毁队列(其实是链表啦)用的函数,简单循环,一次完成。

读取包的工作 —— 多线程操作

在这再说明一次,上面的所有内容都围绕着AVPacketQueue这个数据结构,这个数据结构形如链表,内部包含多个AVPacketNode节点,而每个节点中都包含一个AVPacket,其他的所有函数都是用来操作这个数据结构的。当然你也可以使用自己设计的数据结构来保存一系列AVPacket。 接下来就是正式的读包步骤了,来理一理需要做哪几件事。 我们需要一个一直存在的线程,负责从最开始的AVFormatContext中获得数据包,它应当每隔一小段时间检查一下AVPacketQueue是否是满的,如果是满的,就什么也不做,如果是不满的,就读取一些数据包到AVPacketQueue,直到所以数据包都读完。

分开的队列&初始化

分别为音频流和视频流建立变量。注意!AVPacketQueue是一个结构体。 同时,还有一些基本的属性接下来会用到。

@interface AVPacketTestUnit()

@property(nonatomic, strong) MRThread* readThread;
@property(nonatomic, copy) NSString* contentPath;

@property(nonatomic, assign) BOOL packetBufferIsFull;
@property(nonatomic, assign) BOOL packetBufferIsEmpty;
@property(nonatomic, strong) dispatch_block_t onPacketBufferFullBlock;
@property(nonatomic, strong) dispatch_block_t onPacketBufferEmptyBlock;

@property(nonatomic, strong) NSTimer *timer;
@property(nonatomic, assign) NSInteger second;

@end

@implementation AVPacketTestUnit{
    AVPacketQueue audioq;
    AVPacketQueue videoq;
    
    int audio_stream_index;
    int video_stream_index;
    
    AVStream* audioStream;
    AVStream* videoStream;
    
    int eof;
}

待会线程会把一个一个的数据包分别读进audioq与videoq里。 然后是一些基本的初始化步骤。

- (void)prepareToRead{
    if(self.readThread){
        NSLog(@"thread has been created.");
    }
    
    av_register_all();
    
    av_packet_queue_init(&audioq);
    av_packet_queue_init(&videoq);
    
    audio_stream_index = -1;
    video_stream_index = -1;
    
    self.readThread = [[MRThread alloc] initWithTarget:self selector:@selector(readPacketFunc) object:nil];
    self.readThread.name = @"readPacketThread";
}

还记得我们的番外篇文章吗?这里的线程类用到的就是ffmpegTutorial的作者书写的MRThread,我们之前文章中的线程类就是他书写的。 至此,用来缓存数据包的类,用来执行读取数据包函数的线程,以及一些必要的属性都已经全部准备好了。

打开视频文件

线程需要做哪些事情?哪些事情是执行一次的?哪些事情是循环的? 显而易见,参照我们的上一节内容,首先还是要按部就班的打开一个视频文件,读取流信息…等等。

- (void)readPacketFunc{
    
    self.contentPath = [[NSBundle mainBundle] pathForResource:@"sm25392237" ofType:@".mp4"];
    const char* clangContentPath = [self.contentPath cStringUsingEncoding:kCFStringEncodingUTF8];
    
    AVFormatContext* formatCtx = avformat_alloc_context();
    if(!formatCtx){
        NSLog(@"alloc AVFormatContext failed.");
        return;
    }
    
    // 其实上面这步不做也可以
    // AVFormatContext* formatCtx = NULL;
    if(0 != avformat_open_input(&formatCtx, clangContentPath, NULL, NULL)){
        
        avformat_close_input(&formatCtx);
        NSLog(@"open file failed.");
        return;
    }
    
    formatCtx->probesize = 512 * 1024;
    formatCtx->max_analyze_duration = 5 * AV_TIME_BASE;
    
    if(0 != avformat_find_stream_info(formatCtx, NULL)){
        
        avformat_close_input(&formatCtx);
        NSLog(@"can not find any stream.");
        return;
    }
    
    for(int i = 0; i < formatCtx->nb_streams;i ++){
        AVStream* stream  = formatCtx->streams[i];
        
        AVCodecContext* codecCtx = avcodec_alloc_context3(NULL);
        if(!codecCtx){
            continue;
        }
        
        int ret = avcodec_parameters_to_context(codecCtx, stream->codecpar);
        if(ret < 0){
            return;
        }
        
        enum AVMediaType mediaType = codecCtx->codec_type;
        switch (mediaType) {
            case AVMEDIA_TYPE_AUDIO:
                audioStream = stream;
                audio_stream_index = i;
                break;
            case AVMEDIA_TYPE_VIDEO:
                videoStream = stream;
                video_stream_index = i;
                break;
            case AVMEDIA_TYPE_ATTACHMENT:
                NSLog(@"attachment stream.");
                break;
            default:
                NSLog(@"other streams.");
                break;
        }
    }
        
    [self readPacketLoop:formatCtx];
    avformat_close_input(&formatCtx);
}

注意第6行的内容,本次新发现了一个不一样的点。源代码中,有这样的描述: 在执行avformat_open_input方法时,传入的AVFormatContext(二级指针)参数可以有两个情况:要么这是一个空指针,要么这是一个被分配内存的指针。所以这两种情况都可以。 所以一般打开一个视频文件的步骤是这样的:

  1. 初始化AVFormateContext;
  2. 调用avformate_open_input打开视频文件;
  3. 设定AVFormatContext的probesize和max_analyze_duration;
  4. 调用avformat_find_stream_info确认是否找到流;
  5. 枚举每一个AVStream,AVStream保存在AVFormatContext的streams中,数量为nb_streams个;
  6. Done!

循环读包

然后会执行的是一个一直循环的函数,用来进行数据包的读取。

- (void)run{
    self.onPacketBufferFullBlock = ^(){
        
    };
    self.onPacketBufferEmptyBlock = ^(){
        
    };
    
    [self prepareToRead];
    [self readPacket];
    
    self.timer = [NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(onTimer) userInfo:nil repeats:YES];
    self.second = 0;
    
    [[NSRunLoop mainRunLoop] addTimer:self.timer forMode:NSRunLoopCommonModes];
}

- (void)onTimer{
    
    // 每过3秒消耗一次数据包
    // 随机决定“消耗一个数据包”还是“消耗所有数据包”
    if(self.second % 3 == 0){
        int randomNumber = arc4random_uniform(2);
        if(randomNumber == 0){
            // 消耗所有数据包
            av_packet_queue_flush(&audioq);
            av_packet_queue_flush(&videoq);
            
            self.packetBufferIsFull = NO;
            self.packetBufferIsEmpty = YES;
            
            NSLog(@"# %ds - %@full - comsume all packets,now audioq = %d, videoq = %d", (int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
        }
        else{
            // 消耗一个数据包
            AVPacket audioPkt;
            int audioqNotEmpty = av_packet_queue_get(&audioq, &audioPkt, 0);
            if(audioqNotEmpty){
                av_packet_unref(&audioPkt);
            }
            
            AVPacket videoPkt;
            int videoqNotEmpty = av_packet_queue_get(&videoq, &videoPkt, 0);
            if(videoqNotEmpty){
                av_packet_unref(&videoPkt);
            }
            
            self.packetBufferIsFull = NO;
            if(!audioqNotEmpty && !videoqNotEmpty){
                self.packetBufferIsEmpty = YES;
            }
            
            NSLog(@"# %ds - %@full - comsume one packet, now audioq = %d, videoq = %d", (int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
        }
    }
    else{
        NSLog(@"# %ds - %@full - now audioq = %d, videoq = %d",(int)self.second, self.packetBufferIsFull ? @"":@"not ", audioq.nb_packets, videoq.nb_packets);
    }
    self.second += 1;
}

基本想法是这样,创建一个死循环,每隔一定时间(10ms)尝试读取一次数据包放入队列中,如果队列没满,那么读取,如果队列满了,那么再等待一定时间。注意到首先的代码块是判断是否队列是否满了(第8行到第19行),然后才是真正的读取数据包并放入队列的操作。 接下来是av_read_frame函数的调用,这是真正从AVFormatContext中读取数据包的函数,我打算下一节内容详细写一写关于这个函数的内容(大概。 再读取一个数据包时,进行了是否到文件末尾的判断,这里的这个if判断其实也来自于ffplay的源代码。 至此,我们的读包行为已经完备了,以上的全部就是如何读取一个数据包的所应该包含的数据结构+流程函数。

测试代码

新建一个测试单元来测试一下我们的代码是否正确。 不停的读取数据包放入队列,然后使用一个简单的NSTimer来定时输出状态,并且每过3秒会消耗一次数据包。 Debug区域的输出信息如下: Bingo!结果符合代码的想法。 至此,我们的基本的读包流程就全部结束了。稍等我阅读时间完下一章节后,再来书写移动端音视频的第四期内容。 感谢各位!

Last updated on