/*
* Process Hacker -
* thread pool / work queue
*
* Copyright (C) 2009-2016 wj32
*
* This file is part of Process Hacker.
*
* Process Hacker is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Process Hacker is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Process Hacker. If not, see .
*/
#include
#include
#include
#include
static PH_INITONCE PhWorkQueueInitOnce = PH_INITONCE_INIT;
static PH_FREE_LIST PhWorkQueueItemFreeList;
static PH_INITONCE PhGlobalWorkQueueInitOnce = PH_INITONCE_INIT;
static PH_WORK_QUEUE PhGlobalWorkQueue;
#ifdef DEBUG
PPH_LIST PhDbgWorkQueueList;
PH_QUEUED_LOCK PhDbgWorkQueueListLock = PH_QUEUED_LOCK_INIT;
#endif
/**
* Initializes a work queue.
*
* \param WorkQueue A work queue object.
* \param MinimumThreads The suggested minimum number of threads to keep alive, even when there is
* no work to be performed.
* \param MaximumThreads The suggested maximum number of threads to create.
* \param NoWorkTimeout The number of milliseconds after which threads without work will terminate.
*/
VOID PhInitializeWorkQueue(
_Out_ PPH_WORK_QUEUE WorkQueue,
_In_ ULONG MinimumThreads,
_In_ ULONG MaximumThreads,
_In_ ULONG NoWorkTimeout
)
{
if (PhBeginInitOnce(&PhWorkQueueInitOnce))
{
PhInitializeFreeList(&PhWorkQueueItemFreeList, sizeof(PH_WORK_QUEUE_ITEM), 32);
#ifdef DEBUG
PhDbgWorkQueueList = PhCreateList(4);
#endif
PhEndInitOnce(&PhWorkQueueInitOnce);
}
PhInitializeRundownProtection(&WorkQueue->RundownProtect);
WorkQueue->Terminating = FALSE;
InitializeListHead(&WorkQueue->QueueListHead);
PhInitializeQueuedLock(&WorkQueue->QueueLock);
PhInitializeCondition(&WorkQueue->QueueEmptyCondition);
WorkQueue->MinimumThreads = MinimumThreads;
WorkQueue->MaximumThreads = MaximumThreads;
WorkQueue->NoWorkTimeout = NoWorkTimeout;
PhInitializeQueuedLock(&WorkQueue->StateLock);
WorkQueue->SemaphoreHandle = NULL;
WorkQueue->CurrentThreads = 0;
WorkQueue->BusyCount = 0;
#ifdef DEBUG
PhAcquireQueuedLockExclusive(&PhDbgWorkQueueListLock);
PhAddItemList(PhDbgWorkQueueList, WorkQueue);
PhReleaseQueuedLockExclusive(&PhDbgWorkQueueListLock);
#endif
}
/**
* Frees resources used by a work queue.
*
* \param WorkQueue A work queue object.
*/
VOID PhDeleteWorkQueue(
_Inout_ PPH_WORK_QUEUE WorkQueue
)
{
PLIST_ENTRY listEntry;
PPH_WORK_QUEUE_ITEM workQueueItem;
#ifdef DEBUG
ULONG index;
#endif
#ifdef DEBUG
PhAcquireQueuedLockExclusive(&PhDbgWorkQueueListLock);
if ((index = PhFindItemList(PhDbgWorkQueueList, WorkQueue)) != -1)
PhRemoveItemList(PhDbgWorkQueueList, index);
PhReleaseQueuedLockExclusive(&PhDbgWorkQueueListLock);
#endif
// Wait for all worker threads to exit.
WorkQueue->Terminating = TRUE;
MemoryBarrier();
if (WorkQueue->SemaphoreHandle)
NtReleaseSemaphore(WorkQueue->SemaphoreHandle, WorkQueue->CurrentThreads, NULL);
PhWaitForRundownProtection(&WorkQueue->RundownProtect);
// Free all un-executed work items.
listEntry = WorkQueue->QueueListHead.Flink;
while (listEntry != &WorkQueue->QueueListHead)
{
workQueueItem = CONTAINING_RECORD(listEntry, PH_WORK_QUEUE_ITEM, ListEntry);
listEntry = listEntry->Flink;
PhpDestroyWorkQueueItem(workQueueItem);
}
if (WorkQueue->SemaphoreHandle)
NtClose(WorkQueue->SemaphoreHandle);
}
/**
* Waits for all queued work items to be executed.
*
* \param WorkQueue A work queue object.
*/
VOID PhWaitForWorkQueue(
_Inout_ PPH_WORK_QUEUE WorkQueue
)
{
PhAcquireQueuedLockExclusive(&WorkQueue->QueueLock);
while (!IsListEmpty(&WorkQueue->QueueListHead))
PhWaitForCondition(&WorkQueue->QueueEmptyCondition, &WorkQueue->QueueLock, NULL);
PhReleaseQueuedLockExclusive(&WorkQueue->QueueLock);
}
/**
* Queues a work item to a work queue.
*
* \param WorkQueue A work queue object.
* \param Function A function to execute.
* \param Context A user-defined value to pass to the function.
*/
VOID PhQueueItemWorkQueue(
_Inout_ PPH_WORK_QUEUE WorkQueue,
_In_ PUSER_THREAD_START_ROUTINE Function,
_In_opt_ PVOID Context
)
{
PhQueueItemWorkQueueEx(WorkQueue, Function, Context, NULL, NULL);
}
/**
* Queues a work item to a work queue.
*
* \param WorkQueue A work queue object.
* \param Function A function to execute.
* \param Context A user-defined value to pass to the function.
* \param DeleteFunction A callback function that is executed when the work queue item is about to
* be freed.
* \param Environment Execution environment parameters (e.g. priority).
*/
VOID PhQueueItemWorkQueueEx(
_Inout_ PPH_WORK_QUEUE WorkQueue,
_In_ PUSER_THREAD_START_ROUTINE Function,
_In_opt_ PVOID Context,
_In_opt_ PPH_WORK_QUEUE_ITEM_DELETE_FUNCTION DeleteFunction,
_In_opt_ PPH_WORK_QUEUE_ENVIRONMENT Environment
)
{
PPH_WORK_QUEUE_ITEM workQueueItem;
workQueueItem = PhpCreateWorkQueueItem(Function, Context, DeleteFunction, Environment);
// Enqueue the work item.
PhAcquireQueuedLockExclusive(&WorkQueue->QueueLock);
InsertTailList(&WorkQueue->QueueListHead, &workQueueItem->ListEntry);
_InterlockedIncrement(&WorkQueue->BusyCount);
PhReleaseQueuedLockExclusive(&WorkQueue->QueueLock);
// Signal the semaphore once to let a worker thread continue.
NtReleaseSemaphore(PhpGetSemaphoreWorkQueue(WorkQueue), 1, NULL);
PHLIB_INC_STATISTIC(WqWorkItemsQueued);
// Check if all worker threads are currently busy, and if we can create more threads.
if (WorkQueue->BusyCount >= WorkQueue->CurrentThreads &&
WorkQueue->CurrentThreads < WorkQueue->MaximumThreads)
{
// Lock and re-check.
PhAcquireQueuedLockExclusive(&WorkQueue->StateLock);
if (WorkQueue->CurrentThreads < WorkQueue->MaximumThreads)
PhpCreateWorkQueueThread(WorkQueue);
PhReleaseQueuedLockExclusive(&WorkQueue->StateLock);
}
}
VOID PhInitializeWorkQueueEnvironment(
_Out_ PPH_WORK_QUEUE_ENVIRONMENT Environment
)
{
PhpGetDefaultWorkQueueEnvironment(Environment);
}
/** Returns a pointer to the default shared work queue. */
PPH_WORK_QUEUE PhGetGlobalWorkQueue(
VOID
)
{
if (PhBeginInitOnce(&PhGlobalWorkQueueInitOnce))
{
PhInitializeWorkQueue(
&PhGlobalWorkQueue,
0,
3,
1000
);
PhEndInitOnce(&PhGlobalWorkQueueInitOnce);
}
return &PhGlobalWorkQueue;
}
VOID PhpGetDefaultWorkQueueEnvironment(
_Out_ PPH_WORK_QUEUE_ENVIRONMENT Environment
)
{
memset(Environment, 0, sizeof(PH_WORK_QUEUE_ENVIRONMENT));
Environment->BasePriority = 0;
Environment->IoPriority = IoPriorityNormal;
Environment->PagePriority = MEMORY_PRIORITY_NORMAL;
Environment->ForceUpdate = FALSE;
}
VOID PhpUpdateWorkQueueEnvironment(
_Inout_ PPH_WORK_QUEUE_ENVIRONMENT CurrentEnvironment,
_In_ PPH_WORK_QUEUE_ENVIRONMENT NewEnvironment
)
{
if (CurrentEnvironment->BasePriority != NewEnvironment->BasePriority || NewEnvironment->ForceUpdate)
{
LONG increment;
increment = NewEnvironment->BasePriority;
if (NT_SUCCESS(NtSetInformationThread(NtCurrentThread(), ThreadBasePriority,
&increment, sizeof(LONG))))
{
CurrentEnvironment->BasePriority = NewEnvironment->BasePriority;
}
}
if (WindowsVersion >= WINDOWS_VISTA)
{
if (CurrentEnvironment->IoPriority != NewEnvironment->IoPriority || NewEnvironment->ForceUpdate)
{
IO_PRIORITY_HINT ioPriority;
ioPriority = NewEnvironment->IoPriority;
if (NT_SUCCESS(NtSetInformationThread(NtCurrentThread(), ThreadIoPriority,
&ioPriority, sizeof(IO_PRIORITY_HINT))))
{
CurrentEnvironment->IoPriority = NewEnvironment->IoPriority;
}
}
if (CurrentEnvironment->PagePriority != NewEnvironment->PagePriority || NewEnvironment->ForceUpdate)
{
ULONG pagePriority;
pagePriority = NewEnvironment->PagePriority;
if (NT_SUCCESS(NtSetInformationThread(NtCurrentThread(), ThreadPagePriority,
&pagePriority, sizeof(ULONG))))
{
CurrentEnvironment->PagePriority = NewEnvironment->PagePriority;
}
}
}
}
PPH_WORK_QUEUE_ITEM PhpCreateWorkQueueItem(
_In_ PUSER_THREAD_START_ROUTINE Function,
_In_opt_ PVOID Context,
_In_opt_ PPH_WORK_QUEUE_ITEM_DELETE_FUNCTION DeleteFunction,
_In_opt_ PPH_WORK_QUEUE_ENVIRONMENT Environment
)
{
PPH_WORK_QUEUE_ITEM workQueueItem;
workQueueItem = PhAllocateFromFreeList(&PhWorkQueueItemFreeList);
workQueueItem->Function = Function;
workQueueItem->Context = Context;
workQueueItem->DeleteFunction = DeleteFunction;
if (Environment)
workQueueItem->Environment = *Environment;
else
PhpGetDefaultWorkQueueEnvironment(&workQueueItem->Environment);
return workQueueItem;
}
VOID PhpDestroyWorkQueueItem(
_In_ PPH_WORK_QUEUE_ITEM WorkQueueItem
)
{
if (WorkQueueItem->DeleteFunction)
WorkQueueItem->DeleteFunction(WorkQueueItem->Function, WorkQueueItem->Context);
PhFreeToFreeList(&PhWorkQueueItemFreeList, WorkQueueItem);
}
VOID PhpExecuteWorkQueueItem(
_Inout_ PPH_WORK_QUEUE_ITEM WorkQueueItem
)
{
WorkQueueItem->Function(WorkQueueItem->Context);
}
HANDLE PhpGetSemaphoreWorkQueue(
_Inout_ PPH_WORK_QUEUE WorkQueue
)
{
HANDLE semaphoreHandle;
semaphoreHandle = WorkQueue->SemaphoreHandle;
if (!semaphoreHandle)
{
NtCreateSemaphore(&semaphoreHandle, SEMAPHORE_ALL_ACCESS, NULL, 0, MAXLONG);
assert(semaphoreHandle);
if (_InterlockedCompareExchangePointer(
&WorkQueue->SemaphoreHandle,
semaphoreHandle,
NULL
) != NULL)
{
// Someone else created the semaphore before we did.
NtClose(semaphoreHandle);
semaphoreHandle = WorkQueue->SemaphoreHandle;
}
}
return semaphoreHandle;
}
BOOLEAN PhpCreateWorkQueueThread(
_Inout_ PPH_WORK_QUEUE WorkQueue
)
{
HANDLE threadHandle;
// Make sure the structure doesn't get deleted while the thread is running.
if (!PhAcquireRundownProtection(&WorkQueue->RundownProtect))
return FALSE;
threadHandle = PhCreateThread(0, PhpWorkQueueThreadStart, WorkQueue);
if (threadHandle)
{
PHLIB_INC_STATISTIC(WqWorkQueueThreadsCreated);
WorkQueue->CurrentThreads++;
NtClose(threadHandle);
return TRUE;
}
else
{
PHLIB_INC_STATISTIC(WqWorkQueueThreadsCreateFailed);
PhReleaseRundownProtection(&WorkQueue->RundownProtect);
return FALSE;
}
}
NTSTATUS PhpWorkQueueThreadStart(
_In_ PVOID Parameter
)
{
PH_AUTO_POOL autoPool;
PPH_WORK_QUEUE workQueue = (PPH_WORK_QUEUE)Parameter;
PH_WORK_QUEUE_ENVIRONMENT currentEnvironment;
PhInitializeAutoPool(&autoPool);
PhpGetDefaultWorkQueueEnvironment(¤tEnvironment);
while (TRUE)
{
NTSTATUS status;
HANDLE semaphoreHandle;
LARGE_INTEGER timeout;
PPH_WORK_QUEUE_ITEM workQueueItem = NULL;
// Check if we have more threads than the limit.
if (workQueue->CurrentThreads > workQueue->MaximumThreads)
{
BOOLEAN terminate = FALSE;
// Lock and re-check.
PhAcquireQueuedLockExclusive(&workQueue->StateLock);
// Check the minimum as well.
if (workQueue->CurrentThreads > workQueue->MaximumThreads &&
workQueue->CurrentThreads > workQueue->MinimumThreads)
{
workQueue->CurrentThreads--;
terminate = TRUE;
}
PhReleaseQueuedLockExclusive(&workQueue->StateLock);
if (terminate)
break;
}
semaphoreHandle = PhpGetSemaphoreWorkQueue(workQueue);
if (!workQueue->Terminating)
{
// Wait for work.
status = NtWaitForSingleObject(
semaphoreHandle,
FALSE,
PhTimeoutFromMilliseconds(&timeout, workQueue->NoWorkTimeout)
);
}
else
{
status = STATUS_UNSUCCESSFUL;
}
if (status == STATUS_WAIT_0 && !workQueue->Terminating)
{
PLIST_ENTRY listEntry;
// Dequeue the work item.
PhAcquireQueuedLockExclusive(&workQueue->QueueLock);
listEntry = RemoveHeadList(&workQueue->QueueListHead);
if (IsListEmpty(&workQueue->QueueListHead))
PhPulseCondition(&workQueue->QueueEmptyCondition);
PhReleaseQueuedLockExclusive(&workQueue->QueueLock);
// Make sure we got work.
if (listEntry != &workQueue->QueueListHead)
{
workQueueItem = CONTAINING_RECORD(listEntry, PH_WORK_QUEUE_ITEM, ListEntry);
PhpUpdateWorkQueueEnvironment(¤tEnvironment, &workQueueItem->Environment);
PhpExecuteWorkQueueItem(workQueueItem);
_InterlockedDecrement(&workQueue->BusyCount);
PhpDestroyWorkQueueItem(workQueueItem);
}
}
else
{
BOOLEAN terminate = FALSE;
// No work arrived before the timeout passed, or we are terminating, or some error
// occurred. Terminate the thread.
PhAcquireQueuedLockExclusive(&workQueue->StateLock);
if (workQueue->Terminating || workQueue->CurrentThreads > workQueue->MinimumThreads)
{
workQueue->CurrentThreads--;
terminate = TRUE;
}
PhReleaseQueuedLockExclusive(&workQueue->StateLock);
if (terminate)
break;
}
PhDrainAutoPool(&autoPool);
}
PhReleaseRundownProtection(&workQueue->RundownProtect);
PhDeleteAutoPool(&autoPool);
return STATUS_SUCCESS;
}