環形緩衝區

圓形緩衝區(circular buffer),也稱作圓形隊列(circular queue),循環緩衝區(cyclic buffer),環形緩衝區(ring buffer),是一種用於表示一個固定尺寸、頭尾相連的緩衝區的數據結構,適合緩存數據流

圓形緩衝區的概念圖示。計算機內存是線性地址空間,因此需要採用下述技術來邏輯實現圓形緩衝區

用法

圓形緩衝區的一個有用特性是:當一個數據元素被用掉後,其餘數據元素不需要移動其存儲位置。相反,一個非圓形緩衝區(例如一個普通的隊列)在用掉一個數據元素後,其餘數據元素需要向前搬移。換句話說,圓形緩衝區適合實現先進先出緩衝區,而非圓形緩衝區適合後進先出緩衝區。


圓形緩衝區適合於事先明確了緩衝區的最大容量的情形。擴展一個圓形緩衝區的容量,需要搬移其中的數據。因此一個緩衝區如果需要經常調整其容量,用鍊表實現更為合適。

寫操作覆蓋圓形緩衝區中未被處理的數據在某些情況下是允許的。特別是在多媒體處理時。例如,音頻的生產者可以覆蓋掉聲卡尚未來得及處理的音頻數據。

工作過程

一個圓形緩衝區最初為空並有預定的長度。例如,這是一個具有七個元素空間的圓形緩衝區,其中底部的單線與箭頭表示「頭尾相接」形成一個圓形地址空間:

 

假定1被寫入緩衝區中部(對於圓形緩衝區來說,最初的寫入位置在哪裡是無關緊要的):

 

再寫入2個元素,分別是2 & 3 — 被追加在1之後:

 

如果兩個元素被處理,那麼是緩衝區中最老的兩個元素被移除。在本例中,1 & 2被移除,緩衝區中只剩下3:

 

如果緩衝區中有7個元素,則是滿的:

 

如果緩衝區是滿的,又要寫入新的數據,一種策略是覆蓋掉最老的數據。此例中,2個新數據— A & B — 寫入,覆蓋了3 & 4:

 

也可以採取其他策略,禁止覆蓋緩衝區的數據,採取返回一個錯誤碼或者拋出異常

最終,如果從緩衝區中移除2個數據,不是3 & 4 而是 5 & 6 。因為 A & B 已經覆蓋了3 & 4:

 

圓形緩衝區工作機制

由於計算機內存是線性地址空間,因此圓形緩衝區需要特別的設計才可以從邏輯上實現。

讀指針與寫指針

一般的,圓形緩衝區需要4個指針:

  • 在內存中實際開始位置;
  • 在內存中實際結束位置,也可以用緩衝區長度代替;
  • 存儲在緩衝區中的有效數據的開始位置(讀指針);
  • 存儲在緩衝區中的有效數據的結尾位置(寫指針)。

讀指針、寫指針可以用整型值來表示。

下例為一個未滿的緩衝區的讀寫指針:

 

下例為一個滿的緩衝區的讀寫指針:

 

區分緩衝區滿或者空

緩衝區是滿、或是空,都有可能出現讀指針與寫指針指向同一位置:

 

有多種策略用於檢測緩衝區是滿、或是空.

總是保持一個存儲單元為空

緩衝區中總是有一個存儲單元保持未使用狀態。緩衝區最多存入 個數據。如果讀寫指針指向同一位置,則緩衝區為空。如果讀指針位於寫指針的相鄰後一個位置,則緩衝區為滿。這種策略的優點是簡單、粗暴;缺點是語義上實際可存數據量與緩衝區容量不一致,測試緩衝區是否滿需要做取餘數計算。

使用數據計數

這種策略不使用顯式的寫指針,而是保持着緩衝區內存儲的數據的計數。因此測試緩衝區是空是滿非常簡單;對性能影響可以忽略。缺點是讀寫操作都需要修改這個存儲數據計數,對於多線程訪問緩衝區需要並發控制

鏡像指示位

緩衝區的長度如果是n,邏輯地址空間則為0至n-1;那麼,規定n至2n-1為鏡像邏輯地址空間。本策略規定讀寫指針的地址空間為0至2n-1,其中低半部分對應於常規的邏輯地址空間,高半部分對應於鏡像邏輯地址空間。當指針值大於等於2n時,使其折返(wrapped)到ptr-2n。使用一位表示寫指針或讀指針是否進入了虛擬的鏡像存儲區:置位表示進入,不置位表示沒進入還在基本存儲區。

 

在讀寫指針的值相同情況下,如果二者的指示位相同,說明緩衝區為空;如果二者的指示位不同,說明緩衝區為滿。這種方法優點是測試緩衝區滿/空很簡單;不需要做取餘數操作;讀寫線程可以分別設計專用算法策略,能實現精緻的並發控制。 缺點是讀寫指針各需要額外的一位作為指示位。

如果緩衝區長度是2的,則本方法可以省略鏡像指示位。如果讀寫指針的值相等,則緩衝區為空;如果讀寫指針相差n,則緩衝區為滿,這可以用條件表達式(寫指針 == (讀指針 異或 緩衝區長度))來判斷。

// This approach adds one bit to end and start pointers
// Circular buffer object
typedef struct {
    int size; // maximum number of elements
    int start; // index of oldest element
    int end; // index at which to write new element
    ElemType *elems; // vector of elements
} CircularBuffer;

void cbInit(CircularBuffer *cb, int size) {
    cb->size = size;
    cb->start = 0;
    cb->end = 0;
    cb->elems = (ElemType *)calloc(cb->size, sizeof(ElemType));
}

void cbPrint(CircularBuffer *cb) {
    printf("size = 0x%x, start = %d, end = %d\n", cb->size, cb->start, cb->end);
}

int cbIsFull(CircularBuffer *cb) {
    return cb->end == (cb->start ^ cb->size); // This inverts the most significant bit of start before comparison
}

int cbIsEmpty(CircularBuffer *cb) {
    return cb->end == cb->start;
}

int cbIncr(CircularBuffer *cb, int p) {
    return (p + 1) & (2 * cb->size - 1); // start and end pointers incrementation is done modulo 2*size
}

void cbWrite(CircularBuffer *cb, ElemType *elem) {
    cb->elems[cb->end & (cb->size - 1)] = *elem;
    if (cbIsFull(cb)) // full, overwrite moves start pointer
        cb->start = cbIncr(cb, cb->start);
    cb->end = cbIncr(cb, cb->end);
}

void cbRead(CircularBuffer *cb, ElemType *elem) {
    *elem = cb->elems[cb->start & (cb->size - 1)];
    cb->start = cbIncr(cb, cb->start);
}

讀/寫 計數

用兩個有符號整型變量分別保存寫入、讀出緩衝區的數據數量。其差值就是緩衝區中尚未被處理的有效數據的數量。這種方法的優點是讀線程、寫線程互不干擾;缺點是需要額外兩個變量。

記錄最後的操作

使用一位記錄最後一次操作是讀還是寫。讀寫指針值相等情況下,如果最後一次操作為寫入,那麼緩衝區是滿的;如果最後一次操作為讀出,那麼緩衝區是空。 這種策略的缺點是讀寫操作共享一個標誌位,多線程時需要並發控制。

POSIX優化實現

#include <sys/mman.h>
#include <stdlib.h>
#include <unistd.h>

#define report_exceptional_condition() abort ()

struct ring_buffer {
    void *address;
    unsigned long count_bytes;
    unsigned long write_offset_bytes;
    unsigned long read_offset_bytes;
};

// Warning order should be at least 12 for Linux
void ring_buffer_create (struct ring_buffer *buffer, unsigned long order) {
    char path[] = "/dev/shm/ring-buffer-XXXXXX";
    int file_descriptor;
    void *address;
    int status;
    file_descriptor = mkstemp(path);
    if (file_descriptor < 0)
        report_exceptional_condition();
    status = unlink(path);
    if (status)
        report_exceptional_condition();
    buffer->count_bytes = 1UL << order;
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
    status = ftruncate(file_descriptor, buffer->count_bytes);
    if (status)
        report_exceptional_condition();
    buffer->address = mmap (NULL, buffer->count_bytes << 1, PROT_NONE,
                            MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if (buffer->address == MAP_FAILED)
        report_exceptional_condition();
    address =
        mmap(buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
             MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address)
        report_exceptional_condition();
    address = mmap(buffer->address + buffer->count_bytes,
                   buffer->count_bytes, PROT_READ | PROT_WRITE,
                   MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address + buffer->count_bytes)
        report_exceptional_condition();
    status = close(file_descriptor);
    if (status)
        report_exceptional_condition();
}

void ring_buffer_free(struct ring_buffer *buffer) {
    int status;
    status = munmap(buffer->address, buffer->count_bytes << 1);
    if (status)
        report_exceptional_condition ();
}

void *ring_buffer_write_address(struct ring_buffer *buffer) {
    // void pointer arithmetic is a constraint violation.
    return buffer->address + buffer->write_offset_bytes;
}

void ring_buffer_write_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->write_offset_bytes += count_bytes;
}

void *ring_buffer_read_address(struct ring_buffer *buffer) {
    return buffer->address + buffer->read_offset_bytes;
}

void ring_buffer_read_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->read_offset_bytes += count_bytes;
    if (buffer->read_offset_bytes >= buffer->count_bytes) {
        // 如果读指针大于等于缓冲区长度,那些读写指针同时折返回[0, buffer_size]范围内
        buffer->read_offset_bytes -= buffer->count_bytes;
        buffer->write_offset_bytes -= buffer->count_bytes;
    }
}

unsigned long ring_buffer_count_bytes(struct ring_buffer *buffer) {
    return buffer->write_offset_bytes - buffer->read_offset_bytes;
}

unsigned long ring_buffer_count_free_bytes(struct ring_buffer *buffer) {
    return buffer->count_bytes - ring_buffer_count_bytes (buffer);
}

void ring_buffer_clear(struct ring_buffer *buffer) {
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
}

/*  Note, that initial anonymous mmap() can be avoided - after initial mmap() for descriptor fd,
    you can try mmap() with hinted address as (buffer->address + buffer->count_bytes) and if it fails -
    another one with hinted address as (buffer->address - buffer->count_bytes).
    Make sure MAP_FIXED is not used in such case, as under certain situations it could end with segfault.
    The advantage of such approach is, that it avoids requirement to map twice the amount you need initially
    (especially useful e.g. if you want to use hugetlbfs and the allowed amount is limited)
    and in context of gcc/glibc - you can avoid certain feature macros
    (MAP_ANONYMOUS usually requires one of: _BSD_SOURCE, _SVID_SOURCE or _GNU_SOURCE). */

Linux內核的kfifo

在Linux內核文件kfifo.h和kfifo.c中,定義了一個先進先出圓形緩衝區實現。如果只有一個讀線程、一個寫線程,二者沒有共享的被修改的控制變量,那麼可以證明這種情況下不需要並發控制。kfifo就滿足上述條件。kfifo要求緩衝區長度必須為2的冪。讀、寫指針分別是無符號整型變量。把讀寫指針變換為緩衝區內的索引值,僅需要「按位與」操作:(指針值 按位與 (緩衝區長度-1))。這避免了計算代價高昂的「求余」操作。且下述關係總是成立:

讀指針 + 緩衝區存儲的數據長度 == 寫指針

即使在寫指針達到了無符號整型的上界,上溢出後寫指針的值小於讀指針的值,上述關係仍然保持成立(這是因為無符號整型加法的性質)。 kfifo的寫操作,首先計算緩衝區中當前可寫入存儲空間的數據長度:

len = min{待寫入數據長度, 緩衝區長度 - (寫指針 - 讀指針)}

然後,分兩段寫入數據。第一段是從寫指針開始向緩衝區末尾方向;第二段是從緩衝區起始處寫入餘下的可寫入數據,這部分可能數據長度為0即並無實際數據寫入。

外部連結