环形缓冲区-- Circular Buffer(Ring Buffer)C/C++ 可用
什么是循环缓冲区
循环缓冲区(也称为环形缓冲区)是固定大小的缓冲区,工作原理就像内存是连续的且可循环的一样。在生成和使用内存时,不需将原来的数据全部重新清理掉,只要调整head/tail 指针即可。当添加数据时,head 指针前进。当使用数据时,tail 指针向前移动。当到达缓冲区的尾部时,指针又回到缓冲区的起始位置。
为什么使用循环缓冲区
循环缓冲区通常用作固定大小的队列。固定大小的队列对于嵌入式系统的开发非常友好,因为开发人员通常会尝试使用静态数据存储的方法而不是动态分配。
引用 Ring buffer basics 文章如下
The ring buffer's first-in first-out data structure is useful tool for transmitting data between asynchronous processes. Here's how to bit bang one in C without C++'s Standard Template Library.
The ring buffer is a circular software queue. This queue has a first-in-first-out (FIFO) data characteristic. These buffers are quite common and are found in many embedded systems. Usually, most developers write these constructs from scratch on an as-needed basis.
The C++ language has the Standard Template Library (STL), which has a very easy-to-use set of class templates. This library enables the developer to create the queue and other lists relatively easily. For the purposes of this article, however, I am assuming that we do not have access to the C++ language.
The ring buffer usually has two indices to the elements within the buffer. The distance between the indices can range from zero (0) to the total number of elements within the buffer. The use of the dual indices means the queue length can shrink to zero, (empty), to the total number of elements, (full). Figure 1 shows the ring structure of the ring buffer, (FIFO) queue.
Figure 1: Structure of a ring buffer.
The data gets PUT at the head index, and the data is read from the tail index. In essence, the newest data “grows” from the head index. The oldest data gets retrieved from the tail index. Figure 2 shows how the head and tail index varies in time using a linear array of elements for the buffer.
Figure 2: Linear buffer implementation of the ring buffer.
Use cases Single process to single process In general, the queue is used to serialize data from one process to another process. The serialization allows some elasticity in time between the processes. In many cases, the queue is used as a data buffer in some hardware interrupt service routine. This buffer will collect the data so that at some later time another process can fetch the data for further processing. This use case is the single process to process buffering case.
This use case is typically found as an interface between some very high priority hardware service buffering data to some lower priority service running in some background loop. This simple buffering use case is shown in Figure 3 .
Figure 3: A single process to process buffer use case
In many cases, there will be a need for two queues for a single interrupt service. Using multiple queues is quite common for device drivers for serial devices such as RS-232, I2C or USB drivers. Multiple processes to single process A little less common is the requirement to serialize many data streams into one receiving streams. These use cases are quite common in multi-threaded operating systems. In this case, there are many client threads requesting some type of serialization from some server or broker thread. The requests or messages are serialized into a single queue which is received by a single process. Figure 4 shows this use case.
Figure 4: Multiple processes to process use case.
Single process to multiple processes The least common use case is the single process to multiple processes case. The difficulty here is to determine where to steer the output in real time. Usually, this is done by tagging the data elements in such a way that a broker can steer the data in some meaningful way. Figure 5 shows the single process to multiple processes use case. Since queues can be readily created, it is usually better to create multiple queues to solve this use case than it would be to use a single queue.
Figure 5: Single process to multiple processes use case.
Figure 6 shows how to reorganize the single process to multiple process use case using a set of cascaded queues. In this case, we have inserted an Rx / Tx Broker Dispatcher service, which will parse the incoming requests to each of the individual process queues.
Figure 6: Single process to multiple process use case using a dispatcher and multiple queues.
Managing overflow One must be able to handle the case where the queue is full and there is still incoming data. This case is known as the overflow condition. There are two methods which handle this case. They are to drop the latest data or to overwrite the oldest data. Either style may be implemented.
缓冲区实现
数据结构
- 基础数据缓冲区
- 缓冲区的最大范围
- “head”指针的当前位置(添加元素时增加)
- “tail”指针的当前位置(读取元素后增加)
- 一个标志位来指示缓冲区是否已满
// The hidden definition of our circular buffer structure
typedef struct circular_buf_t
{
uint8_t *buffer;
size_t head;
size_t tail;
size_t max; //of the buffer
bool full;
} *cbuf_handle_t;
确认缓冲区是否已满
当head == tail, 循环缓冲区的 “full” 和 “empty” 看起来是相同的:head 和 tail 指针是相等的。有两种方法区分 full 和 empty:
使用缓冲区中的一个数据位:
- Full:tail + 1 == head
- Empty:head == tail
使用一个bool标志位和其他逻辑来区分:
- Full:full
- Empty:(head == tail) && (!full)
从上面的数据结构看到我们使用的是通过标志位来判断,那么就需要在get /set 操作方法中更新该标志位
初始化和复位
/// Pass in a storage buffer and size
/// Returns a circular buffer handle
cbuf_handle_t circular_buf_init(uint8_t* buffer, size_t size)
{
assert(buffer && size);
cbuf_handle_t cbuf = malloc(sizeof(circular_buf_t));
assert(cbuf);
cbuf->buffer = buffer;
cbuf->max = size;
circular_buf_reset(cbuf);
assert(circular_buf_empty(cbuf));
return cbuf;
}
/// Reset the circular buffer to empty, head == tail
void circular_buf_reset(cbuf_handle_t cbuf)
{
assert(cbuf);
cbuf->head = 0;
cbuf->tail = 0;
cbuf->full = false;
}
/// Free a circular buffer structure.
/// Does not free data buffer; owner is responsible for that
void circular_buf_free(cbuf_handle_t cbuf)
{
assert(cbuf);
free(cbuf);
}
状态检查
bool circular_buf_full(cbuf_handle_t cbuf)
{
assert(cbuf);
return cbuf->full;
}
bool circular_buf_empty(cbuf_handle_t cbuf)
{
assert(cbuf);
return (!cbuf->full && (cbuf->head == cbuf->tail));
}
size_t circular_buf_capacity(cbuf_handle_t cbuf)
{
assert(cbuf);
return cbuf->max;
}
size_t circular_buf_size(cbuf_handle_t cbuf)
{
assert(cbuf);
size_t size = cbuf->max;
if(!cbuf->full)
{
if(cbuf->head >= cbuf->tail)
{
size = (cbuf->head - cbuf->tail);
}
else
{
size = (cbuf->max + cbuf->head - cbuf->tail);
}
}
return size;
}
数据填充删除
从缓冲区读取数据后删除,即取出 tail 指针位置的值并更新 tail 指针。如果缓冲区是空的,不修改指针值并且返回 error=-1。
从缓冲区读取数据后删除,即取出 tail 指针位置的值并更新 tail 指针。如果缓冲区是空的,不修改指针值并且返回 error=-1。
int circular_buf_get(cbuf_handle_t cbuf, uint8_t * data)
{
assert(cbuf && data && cbuf->buffer);
int r = -1;
if(!circular_buf_empty(cbuf))
{
*data = cbuf->buffer[cbuf->tail];
// 当删除数据时,full 标志置为 flase ,tail 指针向前移一位
cbuf->full = false;
cbuf->tail = (cbuf->tail + 1) % cbuf->max;
r = 0;
}
return r;
}
当读跟不上写速度的时候有两种方法,一种是覆盖旧数据,一种是不做处理,阻塞或者丢弃新数据
阻塞或者丢弃新数据
以丢弃新数据为例, 如果buff 已满了(circular_buf_full = true),则不做任何处理
int circular_buf_put_block(cbuf_handle_t cbuf, uint8_t data)
{
int r = -1;
assert(cbuf && cbuf->buffer);
if(!circular_buf_full(cbuf))
{
cbuf->buffer[cbuf->head] = data;
// 将写指针位置往前移动
cbuf->head = (cbuf->head + 1) % cbuf->max;
cbuf->full = (cbuf->head == cbuf->tail);
r = 0;
}
return r;
}
以Demo为例
100ms 写一次,而400ms 读一次,buff 大小为16B,所以其阻塞住了,并且丢弃了新来写的数据,只有buff 有缓冲的时候才能写
void *read_thread(void *arg)
{
uint8_t data;;
printf("%s %d\n", __FUNCTION__, __LINE__);
while (1)
{
pthread_mutex_lock(&rw_mutex);
circular_buf_get(arg, &data);
pthread_mutex_unlock(&rw_mutex);
printf("%s %d ===>data:0x%x\n", __FUNCTION__, __LINE__, data);
usleep(400*1000);
}
return NULL;
}
int main()
{
pthread_t ntid;
uint8_t data;
uint8_t buff[16] = {0};
cbuf_handle_t handle = circular_buf_init(buff, sizeof(buff));
int ret;
pthread_create(&ntid, NULL, read_thread, handle);
printf("%s %d data:0x%x\n", __FUNCTION__, __LINE__, data);
while (1)
{
usleep(100*1000);
data = rand()%0xff;
pthread_mutex_lock(&rw_mutex);
ret = circular_buf_put_block(handle, data);
pthread_mutex_unlock(&rw_mutex);
if(!ret)
printf("%s %d write data:0x%x\n", __FUNCTION__, __LINE__, data);
}
free(buff);
circular_buf_free(handle);
return 0;
}
打印输出:
main 201 data:0x0
read_thread 179
read_thread 185 ===>data:0x0
main 210 write data:0x29
main 210 write data:0x6b
main 210 write data:0xd6
read_thread 185 ===>data:0x29
main 210 write data:0xeb
main 210 write data:0x2c
main 210 write data:0xa9
main 210 write data:0x3
read_thread 185 ===>data:0x6b
main 210 write data:0x21
main 210 write data:0xbb
main 210 write data:0xef
main 210 write data:0x5f
read_thread 185 ===>data:0xd6
main 210 write data:0x5f
main 210 write data:0x4c
main 210 write data:0xfc
main 210 write data:0x10
read_thread 185 ===>data:0xeb
main 210 write data:0xec
main 210 write data:0xbe
main 210 write data:0xd4
main 210 write data:0xed
read_thread 185 ===>data:0x2c
main 210 write data:0x51
main 210 write data:0x6
read_thread 185 ===>data:0xa9
main 210 write data:0x99
read_thread 185 ===>data:0x3
main 210 write data:0x65
read_thread 185 ===>data:0x21
main 210 write data:0x33
覆盖旧数据
int circular_buf_put_overwrite(cbuf_handle_t cbuf, uint8_t data)
{
assert(cbuf && cbuf->buffer);
cbuf->buffer[cbuf->head] = data;
// 如果buff 满了需要复写读到的数据,那么需要将读位置往前移动
if (cbuf->full)
{
cbuf->tail = (cbuf->tail + 1) % cbuf->max;
}
cbuf->head = (cbuf->head + 1) % cbuf->max;
cbuf->full = (cbuf->head == cbuf->tail);
return 0;
}
同样以该Demo为例
100ms 写一次,而400ms 读一次,buff 大小为16B,将写替换为 circular_buf_put_overwrite 来覆盖旧数据
int main()
{
pthread_t ntid;
uint8_t data;
uint8_t buff[16] = {0};
cbuf_handle_t handle = circular_buf_init(buff, sizeof(buff));
int ret;
pthread_create(&ntid, NULL, read_thread, handle);
printf("%s %d data:0x%x\n", __FUNCTION__, __LINE__, data);
while (1)
{
usleep(100*1000);
data = rand()%0xff;
pthread_mutex_lock(&rw_mutex);
ret = circular_buf_put_overwrite(handle, data);
pthread_mutex_unlock(&rw_mutex);
if(!ret)
printf("%s %d write data:0x%x\n", __FUNCTION__, __LINE__, data);
}
free(buff);
circular_buf_free(handle);
return 0;
}
100ms 写一次,而400ms 读一次,buff 大小为16B,因为复写了,所以buffer满了之后,读到的数据总是16B写之前写的首次数据
输出
main 227 write data:0x33
main 227 write data:0xec
main 227 write data:0x3f
main 227 write data:0x54
read_thread 202 ===>data:0x51
main 227 write data:0x16
main 227 write data:0xa7
main 227 write data:0x22
main 227 write data:0xcd
read_thread 202 ===>data:0x99
main 227 write data:0xcc
main 227 write data:0x8f
main 227 write data:0x60
main 227 write data:0xd4
read_thread 202 ===>data:0x65
main 227 write data:0xf3
main 227 write data:0x4e
main 227 write data:0x4a
main 227 write data:0x60
read_thread 202 ===>data:0x33 // 读到这个数据位置与之前的写位置正好相差16
main 227 write data:0x3d