Merge pull request #431 from yuriks/thread-queue-cleanup

Common: Clean up ThreadQueueList
This commit is contained in:
bunnei 2015-01-07 17:58:31 -05:00
commit e6864a1f41
2 changed files with 82 additions and 152 deletions

View File

@ -4,55 +4,35 @@
#pragma once #pragma once
#include <array>
#include <deque>
#include <boost/range/algorithm_ext/erase.hpp>
#include "common/common.h" #include "common/common.h"
namespace Common { namespace Common {
template<class IdType> template<class T, unsigned int N>
struct ThreadQueueList { struct ThreadQueueList {
// Number of queues (number of priority levels starting at 0.) // TODO(yuriks): If performance proves to be a problem, the std::deques can be replaced with
static const int NUM_QUEUES = 128; // (dynamically resizable) circular buffers to remove their overhead when
// inserting and popping.
// Initial number of threads a single queue can handle. typedef unsigned int Priority;
static const int INITIAL_CAPACITY = 32;
struct Queue { // Number of priority levels. (Valid levels are [0..NUM_QUEUES).)
// Next ever-been-used queue (worse priority.) static const Priority NUM_QUEUES = N;
Queue *next;
// First valid item in data.
int first;
// One after last valid item in data.
int end;
// A too-large array with room on the front and end.
IdType *data;
// Size of data array.
int capacity;
};
ThreadQueueList() { ThreadQueueList() {
memset(queues, 0, sizeof(queues)); first = nullptr;
first = invalid();
}
~ThreadQueueList() {
for (int i = 0; i < NUM_QUEUES; ++i)
{
if (queues[i].data != nullptr)
free(queues[i].data);
}
} }
// Only for debugging, returns priority level. // Only for debugging, returns priority level.
int contains(const IdType uid) { Priority contains(const T& uid) {
for (int i = 0; i < NUM_QUEUES; ++i) for (Priority i = 0; i < NUM_QUEUES; ++i) {
{ Queue& cur = queues[i];
if (queues[i].data == nullptr) if (std::find(cur.data.cbegin(), cur.data.cend(), uid) != cur.data.cend()) {
continue;
Queue *cur = &queues[i];
for (int j = cur->first; j < cur->end; ++j)
{
if (cur->data[j] == uid)
return i; return i;
} }
} }
@ -60,157 +40,107 @@ struct ThreadQueueList {
return -1; return -1;
} }
inline IdType pop_first() { T pop_first() {
Queue *cur = first; Queue *cur = first;
while (cur != invalid()) while (cur != nullptr) {
{ if (!cur->data.empty()) {
if (cur->end - cur->first > 0) auto tmp = std::move(cur->data.front());
return cur->data[cur->first++]; cur->data.pop_front();
cur = cur->next; return tmp;
}
cur = cur->next_nonempty;
} }
//_dbg_assert_msg_(SCEKERNEL, false, "ThreadQueueList should not be empty."); return T();
return 0;
} }
inline IdType pop_first_better(u32 priority) { T pop_first_better(Priority priority) {
Queue *cur = first; Queue *cur = first;
Queue *stop = &queues[priority]; Queue *stop = &queues[priority];
while (cur < stop) while (cur < stop) {
{ if (!cur->data.empty()) {
if (cur->end - cur->first > 0) auto tmp = std::move(cur->data.front());
return cur->data[cur->first++]; cur->data.pop_front();
cur = cur->next; return tmp;
}
cur = cur->next_nonempty;
} }
return 0; return T();
} }
inline void push_front(u32 priority, const IdType threadID) { void push_front(Priority priority, const T& thread_id) {
Queue *cur = &queues[priority]; Queue *cur = &queues[priority];
cur->data[--cur->first] = threadID; cur->data.push_front(thread_id);
if (cur->first == 0)
rebalance(priority);
} }
inline void push_back(u32 priority, const IdType threadID) { void push_back(Priority priority, const T& thread_id) {
Queue *cur = &queues[priority]; Queue *cur = &queues[priority];
cur->data[cur->end++] = threadID; cur->data.push_back(thread_id);
if (cur->end == cur->capacity)
rebalance(priority);
} }
inline void remove(u32 priority, const IdType threadID) { void remove(Priority priority, const T& thread_id) {
Queue *cur = &queues[priority]; Queue *cur = &queues[priority];
//_dbg_assert_msg_(SCEKERNEL, cur->next != NULL, "ThreadQueueList::Queue should already be linked up."); boost::remove_erase(cur->data, thread_id);
for (int i = cur->first; i < cur->end; ++i)
{
if (cur->data[i] == threadID)
{
int remaining = --cur->end - i;
if (remaining > 0)
memmove(&cur->data[i], &cur->data[i + 1], remaining * sizeof(IdType));
return;
}
} }
// Wasn't there. void rotate(Priority priority) {
}
inline void rotate(u32 priority) {
Queue *cur = &queues[priority]; Queue *cur = &queues[priority];
//_dbg_assert_msg_(SCEKERNEL, cur->next != NULL, "ThreadQueueList::Queue should already be linked up.");
if (cur->end - cur->first > 1) if (cur->data.size() > 1) {
{ cur->data.push_back(std::move(cur->data.front()));
cur->data[cur->end++] = cur->data[cur->first++]; cur->data.pop_front();
if (cur->end == cur->capacity)
rebalance(priority);
} }
} }
inline void clear() { void clear() {
for (int i = 0; i < NUM_QUEUES; ++i) queues.fill(Queue());
{ first = nullptr;
if (queues[i].data != nullptr)
free(queues[i].data);
}
memset(queues, 0, sizeof(queues));
first = invalid();
} }
inline bool empty(u32 priority) const { bool empty(Priority priority) const {
const Queue *cur = &queues[priority]; const Queue *cur = &queues[priority];
return cur->first == cur->end; return cur->data.empty();
} }
inline void prepare(u32 priority) { void prepare(Priority priority) {
Queue* cur = &queues[priority]; Queue* cur = &queues[priority];
if (cur->next == nullptr) if (cur->next_nonempty == UnlinkedTag())
link(priority, INITIAL_CAPACITY); link(priority);
} }
private: private:
Queue *invalid() const { struct Queue {
return (Queue *) -1; // Points to the next active priority, skipping over ones that have never been used.
Queue* next_nonempty = UnlinkedTag();
// Double-ended queue of threads in this priority level
std::deque<T> data;
};
/// Special tag used to mark priority levels that have never been used.
static Queue* UnlinkedTag() {
return reinterpret_cast<Queue*>(1);
} }
void link(u32 priority, int size) { void link(Priority priority) {
//_dbg_assert_msg_(SCEKERNEL, queues[priority].data == NULL, "ThreadQueueList::Queue should only be initialized once.");
if (size <= INITIAL_CAPACITY)
size = INITIAL_CAPACITY;
else
{
int goal = size;
size = INITIAL_CAPACITY;
while (size < goal)
size *= 2;
}
Queue *cur = &queues[priority]; Queue *cur = &queues[priority];
cur->data = (IdType *) malloc(sizeof(IdType) * size);
cur->capacity = size;
cur->first = size / 2;
cur->end = size / 2;
for (int i = (int) priority - 1; i >= 0; --i) for (int i = priority - 1; i >= 0; --i) {
{ if (queues[i].next_nonempty != UnlinkedTag()) {
if (queues[i].next != nullptr) cur->next_nonempty = queues[i].next_nonempty;
{ queues[i].next_nonempty = cur;
cur->next = queues[i].next;
queues[i].next = cur;
return; return;
} }
} }
cur->next = first; cur->next_nonempty = first;
first = cur; first = cur;
} }
void rebalance(u32 priority) {
Queue *cur = &queues[priority];
int size = cur->end - cur->first;
if (size >= cur->capacity - 2) {
IdType *new_data = (IdType *)realloc(cur->data, cur->capacity * 2 * sizeof(IdType));
if (new_data != nullptr) {
cur->capacity *= 2;
cur->data = new_data;
}
}
int newFirst = (cur->capacity - size) / 2;
if (newFirst != cur->first) {
memmove(&cur->data[newFirst], &cur->data[cur->first], size * sizeof(IdType));
cur->first = newFirst;
cur->end = newFirst + size;
}
}
// The first queue that's ever been used. // The first queue that's ever been used.
Queue* first; Queue* first;
// The priority level queues of thread ids. // The priority level queues of thread ids.
Queue queues[NUM_QUEUES]; std::array<Queue, NUM_QUEUES> queues;
}; };
} // namespace } // namespace

View File

@ -75,7 +75,7 @@ public:
static std::vector<Handle> thread_queue; static std::vector<Handle> thread_queue;
// Lists only ready thread ids. // Lists only ready thread ids.
static Common::ThreadQueueList<Handle> thread_ready_queue; static Common::ThreadQueueList<Handle, THREADPRIO_LOWEST+1> thread_ready_queue;
static Handle current_thread_handle; static Handle current_thread_handle;
static Thread* current_thread; static Thread* current_thread;