线程池
此条目需要扩展。 (2010年4月8日) |
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数上限一般取CPU逻辑核心数+2,线程数过多会导致额外的线程切换开销。
任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。
线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
- 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。
- 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。
线程池的伸缩性对性能有较大的影响。
- 创建太多线程,将会浪费一定的资源,有些线程未被充分使用。
- 销毁太多线程,将导致之后浪费时间再次创建它们。
- 创建线程太慢,将会导致长时间的等待,性能变差。
- 销毁线程太慢,导致其它线程资源饥饿。
Windows API的线程池函数
Windows操作系统的API提供了一套线程池的实现接口。[1]可以方便地创建、使用线程池。Windows线程池API被设计为一组协同对象, 其中有些对象表示工作单位、计时器、异步I/O 等等。使用下述用户模式的对象来管理线程池及相关的数据:
- 线程池对象(pool object)包含了一组工作线程(worker threads)。每个进程可以创建多个线程池对象,也可以直接使用进程缺省的线程池。
- 回调环境(Threadpool Callback Environment)线程池可以关联多个回调环境,回调环境可单独指定在线程池内的调度优先级。
- 清理群(clean-up group)对象关联于一个回调环境。用于追溯(track)线程缓冲池回调。每次创建线程池的IO、work、timer、wait等对象(调用CreateThreadpool*等函数),会在cleanup group中增加一个成员。关闭线程池的IO、work、timer、wait等对象(调用CloseThreadpool*等函数),会在cleanup group中删除相应的成员。
- 线程池工作对象(work object) 由一个函数指针和一个被称为上下文的 void 指针组成,线程池每次在执行时将 void 指针送入函数。可异步多次投寄给线程池去执行它的回调函数。实际上就是以异步方式调用回调函数。通过
SubmitThreadpoolWork
投寄工作对象到任务队列。如果不创建新的工作对象,则不能更改函数和上下文。 - 线程池定时器对象(timer object)在定时器到期时投寄给线程池去执行它的回调函数。 创建(
CreateThreadpoolTimer
)并设定(SetThreadpoolTimer
)定时器对象。当定时器对象到期时,工作线程会执行定时器对象的回调函数。 - 线程池等待对象(wait object):在可等待句柄(waitable handle,可使用任何内核同步对象的句柄)被触发(signaled)时或超时到期时,它的回调函数被投寄给线程池去执行。创建(
CreateThreadpoolWait
)并设定(SetThreadpoolWait
)等待对象。线程池内部的一个线程调用WaitForMultipleObjects并传入由SetThreadpoolWait函数注册的句柄,不断地组成一个句柄组,同时将Wait*函数的bWaitAll设为FALSE,这样当任何一个句柄被触发,线程池就会被唤醒。因WaitForMultipleObjects不允许将同一个句柄传入多次,因此必须确保不会用SetThreadpoolWait来多次注册同一个句柄,但可以调用DuplicationHandle复制句柄并传给Set*函数。因WaitForMultipleObjects一次最多只能等待64个内核对象,因此线程池实际上为每64个内核对象分配一个线程来等待,所以效率比较高。如果要等待超过64个以上的内核对象,系统会每64个内核对象,就开辟一个线程来等待这些内核对象。当线程池中一个线程调用了传入的回调函数,对应的等待项将进入“不活跃”状态;这意味着如果在同一个内核对象被触发后如果想再次调用这个回调函数,需要调用SetThreadpoolWait再次注册;如果传入的hObject为NULL,将把pWaitItem这个等待项从线程中移除。 - 线程池I/O对象把文件句柄关联到线程池的I/O完成端口(completion port)。当异步I/O操作完成,一个工作线程获取操作的状态并调用I/O对象的回调函数。
相关的API函数:[2]
- CreateThreadpool 创建一个线程池数据结构
- CloseThreadpool 关闭一个线程池
- SetThreadpoolThreadMaximum 设置一个线程池的线程数量上限。默认线程池的最小数量为1,最大数量为500
- SetThreadpoolThreadMinimum 设置一个线程池的线程数量下限
- InitializeThreadpoolEnvironment 初始化一个回调环境。作用是回调环境结构体中的将Version设为1,其余为0。
- DestroyThreadpoolEnvironment 删除回调环境
- SetThreadpoolCallbackPool 把指定的线程缓冲池关联到指定的线程池回调环境。创建线程池的IO、work、timer、wait等对象会关联到一个回调环境上。如果使用系统默认的回调环境,需要自行负责回调函数的DLL保持装入状态,自行管理线程池内的各种对象的生存期。线程池可以关联多个回调环境,回调环境可单独指定在线程池内的调度优先级。
- SetThreadpoolCallbackLibrary 指示在回调函数未执行完的时候其所在的DLL库不能卸载
- SetThreadpoolCallbackPriority 设定回调环境在所属线程池内的优先级
- SetThreadpoolCallbackRunsLong 指示当前回调函数将运行较长时间。这有助于线程池判断是否创建新线程
- CreateThreadpoolCleanupGroup 创建一个cleanup group数据结构,关联到一个回调环境上
- SetThreadpoolCallbackCleanupGroup 把指定的cleanup group关联到指定的回调环境。第三个参数设置CleanupGroupCancelCallback回调函数。当调用清理组的CloseThreadpoolCleanupGroupmembers函数,并为bCancelPendingCallbacks传入TRUE来清除清理组时,如果此时尚有未被处理的工作项时,则这个回调函数被调用,这个回掉函数的第一个参数pvObjectContext是通过CreateThreadpool*函数传入的pvContext,第二个参数pvCleanupContext是由CloseThreadpoolCleanupGroupMembers的pvCleanupContext参数传入的。
- CloseThreadpoolCleanupGroup 关闭一个cleanup group数据结构。cleanup group不能包含任何成员。
- CloseThreadpoolCleanupGroupMembers 把指定cleanup group的所有work、timer、wait等对象释放掉。对于正在执行的回调函数,会等待其执行完毕。参数fCancelPendingCallbacks确定是否尚未开始执行的任务执行。
- CreateThreadpoolWork 创建一个线程池工作对象的数据结构
- CloseThreadpoolWork 函数通知线程池该对象可释放。
- SubmitThreadpoolWork 投寄(post)一个工作对象到线程池中。一个工作线程将会调用执行这个工作对象的回调函数。
- TrySubmitThreadpoolCallback 请求线程池一个工作线程调用指定的回调函数
- CreateThreadpoolTimer 创建一个线程池定时器对象的数据结构
- SetThreadpoolTimer 设定一个线程定时器对象的参数。当定时器到期时,这个定时器对象的回调函数被放入任务队列等待被一个工作线程执行。第三个参数设定周期性到时,不论之前放入任务队列的回调函数是否执行完毕,总是把回调函数再一次放入任务队列等待被执行。如果不希望一个定时器对象的回调函数的多个实例并发重叠执行,可以不设定周期性到时参数,而在回调函数末尾调用SetThreadpoolTimer重置这个定时器对象,起码可以简化调试。如果调用SetThreadpoolTimer时当前定时器对象还没有到期,则会修改该定时器对象的到期时间等属性。第二个参数pftDueTime表示首次到时时间,如果为NULL,则取消还未到时的定时器的下次到期,不再向任务队列放入定时器对象的回调函数,但已经在任务队列中(正在被执行)的回调函数仍会被执行。第四个参数是到时的容限上限,涉及定时器汇聚,如计时器A会在5-7微秒内被触发,计时器B会在6-8微秒内。因时间有重叠,所以线程池只会唤醒一个线程来处理这两个计时器,以减少用两个线程调用时产生的额外的线程上下文切换的开销。
- IsThreadpoolTimerSet 判断给定的定时器对象当前是否被设置
- CreateThreadpoolWait 创建一个线程池等待(wait)对象的数据结构
- SetThreadpoolWait 设定一个线程等待对象与一个可等待对象(waitable object)的句柄绑定并设定超时时间。当可等待对象的句柄被通知(signaled)或指定的超时到期,这个等待对象的回调函数会被入列(queue)进入任务队列中并等待被一个工作线程执行。如果可等待对象(即函数的第二个参数)的句柄被设置为NULL,(由于可等待对象还没有signaled所以)线程池等待对象的回调函数还未放入任务队列的都被取消,但已在任务队列的回调函数仍会被工作线程执行。注意不要多次使用SetThreadpoolWait来等待同一个hObject。第三个参数pftTimeout指出线程池愿意花的最长时间来等待内核对象触发,0是立即返回,负值为相对时间,正值为绝对时间,NULL表示无限等待(线程池内部调用了WaitForMultipleObjects)。
- CloseThreadpoolWait 关闭一个线程等待对象,然后才能关闭可等待对象(waitable object)的句柄。
- WaitForThreadpoolWaitCallbacks 等待指定的线程池等待对象执行完成或者取消还没开始执行的线程池等待对象。
- WaitForThreadpoolIoCallbacks 等待指定的线程池IO对象执行完成或者取消还没开始执行的线程池IO对象。该函数须在另一个线程使用,而不能在回调函数内部使用,因为这会造成死锁。如果bCancelPendingCallbacks为TRUE,那么当请求完成的时候,回调函数不会被调用(如果尚未被调用)。
- WaitForThreadpoolTimerCallbacks 等待指定的线程池定时器对象执行完成或者取消还没开始执行的线程池定时器对象。
- WaitForThreadpoolWorkCallbacks 等待指定的线程池工作对象执行完成或者取消还没开始执行的线程池工作对象。
- CreateThreadpoolIo 创建一个IO完成对象,并绑定一个异步IO的句柄。
- CloseThreadpoolIo 释放IO完成对象。应等待所有IO操作完成,关闭IO文件句柄后再调用本函数。解除IO对象(工作项)与线程池的关联。
- StartThreadpoolIo 初始化异步IO操作前应先调用本函数,否则线程池会忽略IO操作的完成并内存泄露。
- CancelThreadpoolIo 在IO失败或者IO直接同步完成时,取消对线程池的通知(notification)
- CallbackMayRunLong 回调函数通知系统需要执行较长时间。只能在调用线程的回调函数里使用。返回TRUE时,说明线程池还有其他线程可供使用。FALSE则相反。
- DisassociateCurrentThreadFromCallback 表示与回调函数关联的工作项在“逻辑上”完成了,执行当前回调函数的线程与初始化当前回调函数的对象去关联。这使得所有等待当前回调函数完成的线程被释放。任何由于调用WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的线程能早一些返回,而不必等到线程从回调函数中结束时才返回。
- 下述函数与回调函数实体指针PTP_CALLBACK_INSTANCE有关,用于在回调函数内设置当该函数结束时的动作。注意只能有一个动作在回调函数结束后被执行,不能多次调用下述函数,否则最后一次调用的函数会覆盖之前调用的下述函数。当线程调用回调函数时,Windows会自动传一个pInstance参数(类型PTP_CALLBACK_INSTANCE)给回调函数,然后回调函数将这个参数又传给如下的函数,执行一些相应的终止操作:
- FreeLibraryWhenCallbackReturns 当回调函数返回的时候,线程池会自动调用FreeLibrary,并在参数中传入指定的HMOUDLE。如果回调函数是从DLL中加载的,这个函数尤为重要,因为当线程执行完毕后,回调函数不能自己调用FreeLibrary,否则回调函数代码将从进程中清除,这样当FreeLibrary试图返回到回调函数时,会引发访问违规
- LeaveCriticalSectionWhenCallbackReturns 当回调函数返回时,线程池会自动调用LeavCriticalSection,并在参数中传入指定的CRITCAL_SECTION结构体。
- ReleaseMutexWhenCallbackReturns 当回调函数返回时,线程池会自动调用ReleaseMutex,并在参数中传入指定的HANDLE
- ReleaseSemaphoreWhenCallbackReturns 当回调函数返回的时候,线程池会自动调用ReleaseSemaphore,并在参数中传入指定的HANDLE
- SetEventWhenCallbackReturns 当回调函数返回的时候,线程池会自动调用SetEvent,并在参数中传入指定的HANDLE
示例
示例程序1如下:
#include <windows.h>
#include <tchar.h>
#include <stdio.h>
//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
// Instance, Parameter, Wait, and WaitResult not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Wait); UNREFERENCED_PARAMETER(WaitResult);
// Do something when the wait is over.
_tprintf(_T("MyWaitCallback: wait is over.\n"));
}
//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
// Instance, Parameter, and Timer not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);
// Do something when the timer fires.
_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}
//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
// Instance, Parameter, and Work not used in this example.
UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);
// Do something when the work callback is invoked.
_tprintf(_T("MyWorkCallback: Task performed.\n"));
}
int main(void)
{
PTP_WAIT Wait = NULL;
PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
HANDLE hEvent = NULL;
UINT i = 0;
UINT rollback = 0;
// Create an auto-reset event and initialized as nonsignaled.
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (NULL == hEvent) {
// Error Handling
return 0;
}
rollback = 1; // CreateEvent succeeded
Wait = CreateThreadpoolWait(waitcallback,
NULL, // 回调函数的输入参数
NULL); // 使用缺省的回调环境
if (NULL == Wait) {
_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
goto new_wait_cleanup;
}
rollback = 2; // CreateThreadpoolWait succeeded
// must re-register the event with the wait object before signaling it each time to trigger the wait callback
// each time before signaling the event to trigger the wait callback.
for (i = 0; i < 5; i++) {
SetThreadpoolWait(Wait, //线程池等待对象
hEvent, //内核等待对象
NULL); //超时设定
SetEvent(hEvent); //触发内核等待对象
// Delay for the waiter thread to act if necessary.
Sleep(500);
// Block here until the callback function is done executing.
WaitForThreadpoolWaitCallbacks(Wait, FALSE);
}
new_wait_cleanup:
switch (rollback) {
case 2:
// Unregister the wait by setting the event to NULL.
SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur
// Close the wait.
CloseThreadpoolWait(Wait);
case 1:
// Close the event.
CloseHandle(hEvent);
default:
break;
}
BOOL bRet = FALSE;
PTP_WORK work = NULL;
PTP_TIMER timer = NULL;
PTP_POOL pool = NULL;
PTP_WORK_CALLBACK workcallback = MyWorkCallback;
PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
TP_CALLBACK_ENVIRON CallBackEnviron;
PTP_CLEANUP_GROUP cleanupgroup = NULL;
FILETIME FileDueTime;
ULARGE_INTEGER ulDueTime;
rollback = 0;
InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用缺省的线程池与缺省的回调环境
// Create a custom, dedicated thread pool.
pool = CreateThreadpool(NULL);
if (NULL == pool) {
_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 1; // pool creation succeeded
// The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
SetThreadpoolThreadMaximum(pool, 1);
bRet = SetThreadpoolThreadMinimum(pool, 1);
if (FALSE == bRet) {
_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
// Create a cleanup group for this thread pool.
cleanupgroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupgroup) {
_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
rollback = 2; // Cleanup group creation succeeded
// Associate the callback environment with our thread pool.
SetThreadpoolCallbackPool(&CallBackEnviron, pool);
// Associate the cleanup group with our thread pool.
// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
cleanupgroup, //Cleanup Group
NULL); //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数
// Create work with the callback environment.
work = CreateThreadpoolWork(workcallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == work) {
_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 3; // Creation of work succeeded
// Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
SubmitThreadpoolWork(work);
// Create a timer with the same callback environment.
timer = CreateThreadpoolTimer(timercallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == timer) {
_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 4; // Timer creation succeeded
// Set the timer to fire in one second.
ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
FileDueTime.dwHighDateTime = ulDueTime.HighPart;
FileDueTime.dwLowDateTime = ulDueTime.LowPart;
SetThreadpoolTimer(timer, //线程池定时器对象
&FileDueTime, //到期时间
0, //周期时期,为0则表示一次性定时器
0); //操作系统调用回调函数的最大延迟时间
// Delay for the timer to be fired
Sleep(1500);
// Wait for all callbacks to finish.
// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group,
// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
FALSE, //为真则取消还未开始执行的pending的回调函数
NULL); //CleanupGroup回调函数的输入参数
// Already cleaned up the work item with the
// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
rollback = 2;
goto main_cleanup;
main_cleanup:
// Clean up any individual pieces manually
// Notice the fall-through structure of the switch.
// Clean up in reverse order.
switch (rollback) {
case 4:
case 3:
// Clean up the cleanup group members.
CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
case 2:
// Clean up the cleanup group.
CloseThreadpoolCleanupGroup(cleanupgroup);
case 1:
// Clean up the pool.
CloseThreadpool(pool);
default:
break;
}
return 0;
}
关于IO线程池的一个示例:
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>
#include <iostream>
#include <limits>
void PressEnterToContinue()
{
std::cout << "Press ENTER to continue... " << std::flush;
std::cin.ignore( (std::numeric_limits<std::streamsize>::max)( ), '\n');
}
//////////////////////////////////////////////////////////////////////////
#define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz)
#define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}
#define QMLX_ASSERT(s) if(!(s)){DebugBreak();}
#define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\
(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);
//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 2 //每个线程最大写入次数
#define MAXWRITETHREAD 2 //写入线程的数量
#define OP_READ 0x01 //读操作
#define OP_WRITE 0x02 //写操作
//#pragma pack(show)
//单IO数据
typedef struct __declspec(align(16)) _tagPerIoData {
OVERLAPPED m_ol;
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA, *PPER_IO_DATA;
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio);
//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);
//当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = { 0 };
//IOCP线程池
PTP_IO g_pThreadpoolIo = NULL;
//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID GetAppPath(LPTSTR pszBuffer) {
DWORD dwLen = 0;
if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return;
for (DWORD i = dwLen; i > 0; i--) {
if ('\\' == pszBuffer[i]) {
pszBuffer[i + 1] = '\0';
break;
}
}
}
int _tmain() {
_tsetlocale(LC_ALL, _T("chs"));
TCHAR pFileName[MAX_PATH] = {};
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));
HANDLE ahWThread[MAXWRITETHREAD] = {};
DWORD dwWrited = 0;
//创建文件
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hTxtFile) {
_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
_tsystem(_T("PAUSE"));
return 0;
}
//初始化线程池回调环境
TP_CALLBACK_ENVIRON poolEnv = {};
InitializeThreadpoolEnvironment(&poolEnv);
//创建IOCP线程池
g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback, hTxtFile, &poolEnv);
//启动IOCP线程池
StartThreadpoolIo(g_pThreadpoolIo);
//写入UNICODE文件的前缀码,以便正确打开
PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
QMLX_ASSERT(pIo->m_pData != NULL);
*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
pIo->m_nLen = sizeof(WORD);
//偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen;
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
//等待IOCP线程池完成操作
WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);
//启动写入线程进行日志写入操作
for (int i = 0; i < MAXWRITETHREAD; i++) {
ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
}
//让主线程等待这些写入线程结束
WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);
for (int i = 0; i < MAXWRITETHREAD; i++) {
CloseHandle(ahWThread[i]);
}
//关闭IOCP线程池
CloseThreadpoolIo(g_pThreadpoolIo);
//关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile) {
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
}
_tsystem(_T("PAUSE"));
return 0;
}
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
if (NO_ERROR != IoResult) {
_tprintf(_T("I/O操作出错,错误码:%u\n"), IoResult);
return;
}
PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
DWORD dwCurTimestamp = GetTickCount();
switch (pIo->m_dwOp)
{
case OP_WRITE://写操作结束
{//写入操作结束
_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);
QMLX_SAFEFREE(pIo->m_pData);
QMLX_SAFEFREE(pIo);
}
break;
case OP_READ: //读操作结束
break;
default:
break;
}
}
//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
TCHAR pTxtContext[MAX_LOGLEN] = {};
PPER_IO_DATA pIo = NULL;
size_t szLen = 0;
LPTSTR pWriteText = NULL;
StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);
szLen += 1;
int i = 0;
for (; i < MAXWRITEPERTHREAD; i++) {
pWriteText = (LPTSTR)QMLX_CALLOC(szLen * sizeof(TCHAR));
QMLX_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext);
//为每个操作申请一个“单IO数据”结构体
pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParam;
pIo->m_pData = pWriteText;
pIo->m_nLen = (szLen - 1) * sizeof(TCHAR);
//这里使用原子操作同步文件指针,写入不会相互覆盖
//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
StartThreadpoolIo(g_pThreadpoolIo);
//写入
WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
if (ERROR_IO_PENDING != GetLastError()) {
CancelThreadpoolIo(g_pThreadpoolIo);
}
}
return i;
}
.NET Framework的线程池实现
命名空间System.Threading中的类ThreadPool提供一个线程池,该线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。[3]
参看
参考文献
- ^ MSDN:Using the Thread Pool Functions. [2017-02-22]. (原始内容存档于2017-07-24).
- ^ MSDN:Thread Pool API. [2017-02-28]. (原始内容存档于2017-10-04).
- ^ MSDN: ThreadPool 类. [2017-02-22]. (原始内容存档于2017-02-23).