執行緒池
此條目需要擴充。 (2010年4月8日) |
執行緒池(英語:thread pool):一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取局部性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可並行執行的任務。這避免了在處理短時間任務時建立與銷毀執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的並行處理器、處理器核心、主記憶體、網路sockets等的數量。 例如,對於計算密集型任務,執行緒數上限一般取CPU邏輯核心數+2,執行緒數過多會導致額外的執行緒切換開銷。
任務排程以執行執行緒的常見方法是使用同步佇列,稱作任務佇列。池中的執行緒等待佇列中的任務,並把執行完的任務放入完成佇列中。
執行緒池模式一般分為兩種:HS/HA半同步/半非同步模式、L/F領導者與跟隨者模式。
- 半同步/半非同步模式又稱為生產者消費者模式,是比較常見的實現方式,比較簡單。分為同步層、佇列層、非同步層三層。同步層的主執行緒處理工作任務並存入工作佇列,工作執行緒從工作佇列取出任務進行處理,如果工作佇列為空,則取不到任務的工作執行緒進入掛起狀態。由於執行緒間有資料通訊,因此不適於巨量資料量交換的場合。
- 領導者跟隨者模式,線上程池中的執行緒可處在3種狀態之一:領導者leader、追隨者follower或工作者processor。任何時刻執行緒池只有一個領導者執行緒。事件到達時,領導者執行緒負責訊息分離,並從處於追隨者執行緒中選出一個來當繼任領導者,然後將自身設定為工作者狀態去處置該事件。處理完畢後工作者執行緒將自身的狀態置為追隨者。這一模式實現複雜,但避免了執行緒間交換任務資料,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了領導者跟隨者模式實現。
執行緒池的伸縮性對效能有較大的影響。
- 建立太多執行緒,將會浪費一定的資源,有些執行緒未被充分使用。
- 銷毀太多執行緒,將導致之後浪費時間再次建立它們。
- 建立執行緒太慢,將會導致長時間的等待,效能變差。
- 銷毀執行緒太慢,導致其它執行緒資源飢餓。
Windows API的執行緒池函式
Windows作業系統的API提供了一套執行緒池的實現介面。[1]可以方便地建立、使用執行緒池。Windows執行緒池API被設計為一組協同對象, 其中有些對象表示工作單位、計時器、非同步I/O 等等。使用下述使用者模式的對象來管理執行緒池及相關的資料:
- 執行緒池對象(pool object)包含了一組工作執行緒(worker threads)。每個行程可以建立多個執行緒池對象,也可以直接使用行程預設的執行緒池。
- 回呼環境(Threadpool Callback Environment)執行緒池可以關聯多個回呼環境,回呼環境可單獨指定線上程池內的排程優先級。
- 清理群(clean-up group)對象關聯於一個回呼環境。用於追溯(track)執行緒緩衝池回呼。每次建立執行緒池的IO、work、timer、wait等對象(呼叫CreateThreadpool*等函式),會在cleanup group中增加一個成員。關閉執行緒池的IO、work、timer、wait等對象(呼叫CloseThreadpool*等函式),會在cleanup group中刪除相應的成員。
- 執行緒池工作對象(work object) 由一個函式指標和一個被稱為上下文的 void 指標組成,執行緒池每次在執行時將 void 指標送入函式。可非同步多次投寄給執行緒池去執行它的回呼函式。實際上就是以非同步方式呼叫回呼函式。通過
SubmitThreadpoolWork
投寄工作對象到任務佇列。如果不新增的工作對象,則不能更改函式和上下文。 - 執行緒池定時器對象(timer object)在定時器到期時投寄給執行緒池去執行它的回呼函式。 建立(
CreateThreadpoolTimer
)並設定(SetThreadpoolTimer
)定時器對象。當定時器對象到期時,工作執行緒會執行定時器對象的回呼函式。 - 執行緒池等待對象(wait object):在可等待控制代碼(waitable handle,可使用任何核心同步對象的控制代碼)被觸發(signaled)時或逾時到期時,它的回呼函式被投寄給執行緒池去執行。建立(
CreateThreadpoolWait
)並設定(SetThreadpoolWait
)等待對象。執行緒池內部的一個執行緒呼叫WaitForMultipleObjects並傳入由SetThreadpoolWait函式註冊的控制代碼,不斷地組成一個控制代碼組,同時將Wait*函式的bWaitAll設為FALSE,這樣當任何一個控制代碼被觸發,執行緒池就會被喚醒。因WaitForMultipleObjects不允許將同一個控制代碼傳入多次,因此必須確保不會用SetThreadpoolWait來多次註冊同一個控制代碼,但可以呼叫DuplicationHandle複製控制代碼並傳給Set*函式。因WaitForMultipleObjects一次最多只能等待64個核心對象,因此執行緒池實際上為每64個核心對象分配一個執行緒來等待,所以效率比較高。如果要等待超過64個以上的核心對象,系統會每64個核心對象,就開闢一個執行緒來等待這些核心對象。當執行緒池中一個執行緒呼叫了傳入的回呼函式,對應的等待項將進入「不活躍」狀態;這意味著如果在同一個核心對象被觸發後如果想再次呼叫這個回呼函式,需要呼叫SetThreadpoolWait再次註冊;如果傳入的hObject為NULL,將把pWaitItem這個等待項從執行緒中移除。 - 執行緒池I/O對象把檔案控制代碼關聯到執行緒池的I/O完成埠(completion port)。當非同步I/O操作完成,一個工作執行緒取得操作的狀態並呼叫I/O對象的回呼函式。
相關的API函式:[2]
- CreateThreadpool 建立一個執行緒池資料結構
- CloseThreadpool 關閉一個執行緒池
- SetThreadpoolThreadMaximum 設定一個執行緒池的執行緒數量上限。預設執行緒池的最小數量為1,最大數量為500
- SetThreadpoolThreadMinimum 設定一個執行緒池的執行緒數量下限
- InitializeThreadpoolEnvironment 初始化一個回呼環境。作用是回呼環境結構體中的將Version設為1,其餘為0。
- DestroyThreadpoolEnvironment 刪除回呼環境
- SetThreadpoolCallbackPool 把指定的執行緒緩衝池關聯到指定的執行緒池回呼環境。建立執行緒池的IO、work、timer、wait等對象會關聯到一個回呼環境上。如果使用系統預設的回呼環境,需要自行負責回呼函式的DLL保持裝入狀態,自行管理執行緒池內的各種對象的生存期。執行緒池可以關聯多個回呼環境,回呼環境可單獨指定線上程池內的排程優先級。
- SetThreadpoolCallbackLibrary 指示在回呼函式未執行完的時候其所在的DLL庫不能解除安裝
- SetThreadpoolCallbackPriority 設定回呼環境在所屬執行緒池內的優先級
- SetThreadpoolCallbackRunsLong 指示當前回呼函式將執行較長時間。這有助於執行緒池判斷是否新增執行緒
- CreateThreadpoolCleanupGroup 建立一個cleanup group資料結構,關聯到一個回呼環境上
- SetThreadpoolCallbackCleanupGroup 把指定的cleanup group關聯到指定的回呼環境。第三個參數設定CleanupGroupCancelCallback回呼函式。當呼叫清理組的CloseThreadpoolCleanupGroupmembers函式,並為bCancelPendingCallbacks傳入TRUE來清除清理組時,如果此時尚有未被處理的工作項時,則這個回呼函式被呼叫,這個回掉函式的第一個參數pvObjectContext是通過CreateThreadpool*函式傳入的pvContext,第二個參數pvCleanupContext是由CloseThreadpoolCleanupGroupMembers的pvCleanupContext參數傳入的。
- CloseThreadpoolCleanupGroup 關閉一個cleanup group資料結構。cleanup group不能包含任何成員。
- CloseThreadpoolCleanupGroupMembers 把指定cleanup group的所有work、timer、wait等對象釋放掉。對於正在執行的回呼函式,會等待其執行完畢。參數fCancelPendingCallbacks確定是否尚未開始執行的任務執行。
- CreateThreadpoolWork 建立一個執行緒池工作對象的資料結構
- CloseThreadpoolWork 函式通知執行緒池該對象可釋放。
- SubmitThreadpoolWork 投寄(post)一個工作對象到執行緒池中。一個工作執行緒將會呼叫執行這個工作對象的回呼函式。
- TrySubmitThreadpoolCallback 請求執行緒池一個工作執行緒呼叫指定的回呼函式
- CreateThreadpoolTimer 建立一個執行緒池定時器對象的資料結構
- SetThreadpoolTimer 設定一個執行緒定時器對象的參數。當定時器到期時,這個定時器對象的回呼函式被放入任務佇列等待被一個工作執行緒執行。第三個參數設定周期性到時,不論之前放入任務佇列的回呼函式是否執行完畢,總是把回呼函式再一次放入任務佇列等待被執行。如果不希望一個定時器對象的回呼函式的多個實例並行重疊執行,可以不設定周期性到時參數,而在回呼函式末尾呼叫SetThreadpoolTimer重設這個定時器對象,起碼可以簡化除錯。如果呼叫SetThreadpoolTimer時當前定時器對象還沒有到期,則會修改該定時器對象的到期時間等屬性。第二個參數pftDueTime表示首次到時時間,如果為NULL,則取消還未到時的定時器的下次到期,不再向任務佇列放入定時器對象的回呼函式,但已經在任務佇列中(正在被執行)的回呼函式仍會被執行。第四個參數是到時的容限上限,涉及定時器匯聚,如計時器A會在5-7微秒內被觸發,計時器B會在6-8微秒內。因時間有重疊,所以執行緒池只會喚醒一個執行緒來處理這兩個計時器,以減少用兩個執行緒呼叫時產生的額外的執行緒上下文交換的開銷。
- IsThreadpoolTimerSet 判斷給定的定時器對象當前是否被設定
- CreateThreadpoolWait 建立一個執行緒池等待(wait)對象的資料結構
- SetThreadpoolWait 設定一個執行緒等待對象與一個可等待對象(waitable object)的控制代碼繫結並設定逾時時間。當可等待對象的控制代碼被通知(signaled)或指定的逾時到期,這個等待對象的回呼函式會被入列(queue)進入任務佇列中並等待被一個工作執行緒執行。如果可等待對象(即函式的第二個參數)的控制代碼被設定為NULL,(由於可等待對象還沒有signaled所以)執行緒池等待對象的回呼函式還未放入任務佇列的都被取消,但已在任務佇列的回呼函式仍會被工作執行緒執行。注意不要多次使用SetThreadpoolWait來等待同一個hObject。第三個參數pftTimeout指出執行緒池願意花的最長時間來等待核心對象觸發,0是立即返回,負值為相對時間,正值為絕對時間,NULL表示無限等待(執行緒池內部呼叫了WaitForMultipleObjects)。
- CloseThreadpoolWait 關閉一個執行緒等待對象,然後才能關閉可等待對象(waitable object)的控制代碼。
- WaitForThreadpoolWaitCallbacks 等待指定的執行緒池等待對象執行完成或者取消還沒開始執行的執行緒池等待對象。
- WaitForThreadpoolIoCallbacks 等待指定的執行緒池IO對象執行完成或者取消還沒開始執行的執行緒池IO對象。該函式須在另一個執行緒使用,而不能在回呼函式內部使用,因為這會造成死結。如果bCancelPendingCallbacks為TRUE,那麼當請求完成的時候,回呼函式不會被呼叫(如果尚未被呼叫)。
- WaitForThreadpoolTimerCallbacks 等待指定的執行緒池定時器對象執行完成或者取消還沒開始執行的執行緒池定時器對象。
- WaitForThreadpoolWorkCallbacks 等待指定的執行緒池工作對象執行完成或者取消還沒開始執行的執行緒池工作對象。
- CreateThreadpoolIo 建立一個IO完成對象,並繫結一個非同步IO的控制代碼。
- CloseThreadpoolIo 釋放IO完成對象。應等待所有IO操作完成,關閉IO檔案控制代碼後再呼叫本函式。解除IO對象(工作項)與執行緒池的關聯。
- StartThreadpoolIo 初始化非同步IO操作前應先呼叫本函式,否則執行緒池會忽略IO操作的完成並主記憶體洩露。
- CancelThreadpoolIo 在IO失敗或者IO直接同步完成時,取消對執行緒池的通知(notification)
- CallbackMayRunLong 回呼函式通知系統需要執行較長時間。只能在呼叫執行緒的回呼函數裡使用。返回TRUE時,說明執行緒池還有其他執行緒可供使用。FALSE則相反。
- DisassociateCurrentThreadFromCallback 表示與回呼函式關聯的工作項在「邏輯上」完成了,執行當前回呼函式的執行緒與初始化當前回呼函式的對象去關聯。這使得所有等待當前回呼函式完成的執行緒被釋放。任何由於呼叫WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的執行緒能早一些返回,而不必等到執行緒從回呼函式中結束時才返回。
- 下述函式與回呼函式實體指標PTP_CALLBACK_INSTANCE有關,用於在回呼函式內設定當該函式結束時的動作。注意只能有一個動作在回呼函式結束後被執行,不能多次呼叫下述函式,否則最後一次呼叫的函式會覆蓋之前呼叫的下述函式。當執行緒呼叫回呼函式時,Windows會自動傳一個pInstance參數(類型PTP_CALLBACK_INSTANCE)給回呼函式,然後回呼函式將這個參數又傳給如下的函式,執行一些相應的終止操作:
- FreeLibraryWhenCallbackReturns 當回呼函式返回的時候,執行緒池會自動呼叫FreeLibrary,並在參數中傳入指定的HMOUDLE。如果回呼函式是從DLL中載入的,這個函式尤為重要,因為當執行緒執行完畢後,回呼函式不能自己呼叫FreeLibrary,否則回呼函式代碼將從行程中清除,這樣當FreeLibrary試圖返回到回呼函式時,會引發訪問違規
- LeaveCriticalSectionWhenCallbackReturns 當回呼函式返回時,執行緒池會自動呼叫LeavCriticalSection,並在參數中傳入指定的CRITCAL_SECTION結構體。
- ReleaseMutexWhenCallbackReturns 當回呼函式返回時,執行緒池會自動呼叫ReleaseMutex,並在參數中傳入指定的HANDLE
- ReleaseSemaphoreWhenCallbackReturns 當回呼函式返回的時候,執行緒池會自動呼叫ReleaseSemaphore,並在參數中傳入指定的HANDLE
- SetEventWhenCallbackReturns 當回呼函式返回的時候,執行緒池會自動呼叫SetEvent,並在參數中傳入指定的HANDLE
範例
範例程式1如下:
#include <windows.h>
#include <tchar.h>
#include <stdio.h>
//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
// Instance, Parameter, Wait, and WaitResult not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Wait); UNREFERENCED_PARAMETER(WaitResult);
// Do something when the wait is over.
_tprintf(_T("MyWaitCallback: wait is over.\n"));
}
//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
// Instance, Parameter, and Timer not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);
// Do something when the timer fires.
_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}
//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
// Instance, Parameter, and Work not used in this example.
UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);
// Do something when the work callback is invoked.
_tprintf(_T("MyWorkCallback: Task performed.\n"));
}
int main(void)
{
PTP_WAIT Wait = NULL;
PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
HANDLE hEvent = NULL;
UINT i = 0;
UINT rollback = 0;
// Create an auto-reset event and initialized as nonsignaled.
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (NULL == hEvent) {
// Error Handling
return 0;
}
rollback = 1; // CreateEvent succeeded
Wait = CreateThreadpoolWait(waitcallback,
NULL, // 回调函数的输入参数
NULL); // 使用缺省的回调环境
if (NULL == Wait) {
_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
goto new_wait_cleanup;
}
rollback = 2; // CreateThreadpoolWait succeeded
// must re-register the event with the wait object before signaling it each time to trigger the wait callback
// each time before signaling the event to trigger the wait callback.
for (i = 0; i < 5; i++) {
SetThreadpoolWait(Wait, //线程池等待对象
hEvent, //内核等待对象
NULL); //超时设定
SetEvent(hEvent); //触发内核等待对象
// Delay for the waiter thread to act if necessary.
Sleep(500);
// Block here until the callback function is done executing.
WaitForThreadpoolWaitCallbacks(Wait, FALSE);
}
new_wait_cleanup:
switch (rollback) {
case 2:
// Unregister the wait by setting the event to NULL.
SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur
// Close the wait.
CloseThreadpoolWait(Wait);
case 1:
// Close the event.
CloseHandle(hEvent);
default:
break;
}
BOOL bRet = FALSE;
PTP_WORK work = NULL;
PTP_TIMER timer = NULL;
PTP_POOL pool = NULL;
PTP_WORK_CALLBACK workcallback = MyWorkCallback;
PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
TP_CALLBACK_ENVIRON CallBackEnviron;
PTP_CLEANUP_GROUP cleanupgroup = NULL;
FILETIME FileDueTime;
ULARGE_INTEGER ulDueTime;
rollback = 0;
InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用缺省的线程池与缺省的回调环境
// Create a custom, dedicated thread pool.
pool = CreateThreadpool(NULL);
if (NULL == pool) {
_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 1; // pool creation succeeded
// The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
SetThreadpoolThreadMaximum(pool, 1);
bRet = SetThreadpoolThreadMinimum(pool, 1);
if (FALSE == bRet) {
_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
// Create a cleanup group for this thread pool.
cleanupgroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupgroup) {
_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
rollback = 2; // Cleanup group creation succeeded
// Associate the callback environment with our thread pool.
SetThreadpoolCallbackPool(&CallBackEnviron, pool);
// Associate the cleanup group with our thread pool.
// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
cleanupgroup, //Cleanup Group
NULL); //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数
// Create work with the callback environment.
work = CreateThreadpoolWork(workcallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == work) {
_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 3; // Creation of work succeeded
// Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
SubmitThreadpoolWork(work);
// Create a timer with the same callback environment.
timer = CreateThreadpoolTimer(timercallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == timer) {
_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 4; // Timer creation succeeded
// Set the timer to fire in one second.
ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
FileDueTime.dwHighDateTime = ulDueTime.HighPart;
FileDueTime.dwLowDateTime = ulDueTime.LowPart;
SetThreadpoolTimer(timer, //线程池定时器对象
&FileDueTime, //到期时间
0, //周期时期,为0则表示一次性定时器
0); //操作系统调用回调函数的最大延迟时间
// Delay for the timer to be fired
Sleep(1500);
// Wait for all callbacks to finish.
// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group,
// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
FALSE, //为真则取消还未开始执行的pending的回调函数
NULL); //CleanupGroup回调函数的输入参数
// Already cleaned up the work item with the
// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
rollback = 2;
goto main_cleanup;
main_cleanup:
// Clean up any individual pieces manually
// Notice the fall-through structure of the switch.
// Clean up in reverse order.
switch (rollback) {
case 4:
case 3:
// Clean up the cleanup group members.
CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
case 2:
// Clean up the cleanup group.
CloseThreadpoolCleanupGroup(cleanupgroup);
case 1:
// Clean up the pool.
CloseThreadpool(pool);
default:
break;
}
return 0;
}
關於IO執行緒池的一個範例:
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>
#include <iostream>
#include <limits>
void PressEnterToContinue()
{
std::cout << "Press ENTER to continue... " << std::flush;
std::cin.ignore( (std::numeric_limits<std::streamsize>::max)( ), '\n');
}
//////////////////////////////////////////////////////////////////////////
#define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz)
#define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}
#define QMLX_ASSERT(s) if(!(s)){DebugBreak();}
#define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\
(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);
//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 2 //每个线程最大写入次数
#define MAXWRITETHREAD 2 //写入线程的数量
#define OP_READ 0x01 //读操作
#define OP_WRITE 0x02 //写操作
//#pragma pack(show)
//单IO数据
typedef struct __declspec(align(16)) _tagPerIoData {
OVERLAPPED m_ol;
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA, *PPER_IO_DATA;
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio);
//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);
//当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = { 0 };
//IOCP线程池
PTP_IO g_pThreadpoolIo = NULL;
//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID GetAppPath(LPTSTR pszBuffer) {
DWORD dwLen = 0;
if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return;
for (DWORD i = dwLen; i > 0; i--) {
if ('\\' == pszBuffer[i]) {
pszBuffer[i + 1] = '\0';
break;
}
}
}
int _tmain() {
_tsetlocale(LC_ALL, _T("chs"));
TCHAR pFileName[MAX_PATH] = {};
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));
HANDLE ahWThread[MAXWRITETHREAD] = {};
DWORD dwWrited = 0;
//创建文件
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hTxtFile) {
_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
_tsystem(_T("PAUSE"));
return 0;
}
//初始化线程池回调环境
TP_CALLBACK_ENVIRON poolEnv = {};
InitializeThreadpoolEnvironment(&poolEnv);
//创建IOCP线程池
g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback, hTxtFile, &poolEnv);
//启动IOCP线程池
StartThreadpoolIo(g_pThreadpoolIo);
//写入UNICODE文件的前缀码,以便正确打开
PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
QMLX_ASSERT(pIo->m_pData != NULL);
*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
pIo->m_nLen = sizeof(WORD);
//偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen;
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
//等待IOCP线程池完成操作
WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);
//启动写入线程进行日志写入操作
for (int i = 0; i < MAXWRITETHREAD; i++) {
ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
}
//让主线程等待这些写入线程结束
WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);
for (int i = 0; i < MAXWRITETHREAD; i++) {
CloseHandle(ahWThread[i]);
}
//关闭IOCP线程池
CloseThreadpoolIo(g_pThreadpoolIo);
//关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile) {
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
}
_tsystem(_T("PAUSE"));
return 0;
}
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
if (NO_ERROR != IoResult) {
_tprintf(_T("I/O操作出错,错误码:%u\n"), IoResult);
return;
}
PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
DWORD dwCurTimestamp = GetTickCount();
switch (pIo->m_dwOp)
{
case OP_WRITE://写操作结束
{//写入操作结束
_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);
QMLX_SAFEFREE(pIo->m_pData);
QMLX_SAFEFREE(pIo);
}
break;
case OP_READ: //读操作结束
break;
default:
break;
}
}
//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
TCHAR pTxtContext[MAX_LOGLEN] = {};
PPER_IO_DATA pIo = NULL;
size_t szLen = 0;
LPTSTR pWriteText = NULL;
StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);
szLen += 1;
int i = 0;
for (; i < MAXWRITEPERTHREAD; i++) {
pWriteText = (LPTSTR)QMLX_CALLOC(szLen * sizeof(TCHAR));
QMLX_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext);
//为每个操作申请一个“单IO数据”结构体
pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParam;
pIo->m_pData = pWriteText;
pIo->m_nLen = (szLen - 1) * sizeof(TCHAR);
//这里使用原子操作同步文件指针,写入不会相互覆盖
//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
StartThreadpoolIo(g_pThreadpoolIo);
//写入
WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
if (ERROR_IO_PENDING != GetLastError()) {
CancelThreadpoolIo(g_pThreadpoolIo);
}
}
return i;
}
.NET Framework的執行緒池實現
命名空間System.Threading中的類ThreadPool提供一個執行緒池,該執行緒池可用於執行任務、傳送工作項、處理非同步 I/O、代表其他執行緒等待以及處理計時器。[3]
參看
參考文獻
- ^ MSDN:Using the Thread Pool Functions. [2017-02-22]. (原始內容存檔於2017-07-24).
- ^ MSDN:Thread Pool API. [2017-02-28]. (原始內容存檔於2017-10-04).
- ^ MSDN: ThreadPool 类. [2017-02-22]. (原始內容存檔於2017-02-23).