From 9b49a79a727b2f82b70cf900c49c0a56195cabea Mon Sep 17 00:00:00 2001 From: B3n30 Date: Sun, 9 Sep 2018 13:08:57 +0200 Subject: [PATCH 1/4] threadsafe_queue: Add WaitIfEmpty and use it in logging --- src/common/logging/backend.cpp | 18 +++++++----------- src/common/logging/backend.h | 1 + src/common/threadsafe_queue.h | 21 +++++++++++++++++++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 45f2bbac2..9c3bb6b49 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -38,9 +38,7 @@ public: const Impl& operator=(Impl const&) = delete; void PushEntry(Entry e) { - std::lock_guard lock(message_mutex); message_queue.Push(std::move(e)); - message_cv.notify_one(); } void AddBackend(std::unique_ptr backend) { @@ -83,14 +81,13 @@ private: backend->Write(e); } }; - while (true) { - std::unique_lock lock(message_mutex); - message_cv.wait(lock, [&] { return !running || message_queue.Pop(entry); }); - if (!running) { + while (message_queue.PopWait(entry)) { + if (entry.final_entry) { break; } write_logs(entry); } + // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a case // where a system is repeatedly spamming logs even on close. constexpr int MAX_LOGS_TO_WRITE = 100; @@ -102,14 +99,13 @@ private: } ~Impl() { - running = false; - message_cv.notify_one(); + Entry entry; + entry.final_entry = true; + message_queue.Push(entry); backend_thread.join(); } - std::atomic_bool running{true}; - std::mutex message_mutex, writing_mutex; - std::condition_variable message_cv; + std::mutex writing_mutex; std::thread backend_thread; std::vector> backends; Common::MPSCQueue message_queue; diff --git a/src/common/logging/backend.h b/src/common/logging/backend.h index 01e162bb2..f8b39ad0b 100644 --- a/src/common/logging/backend.h +++ b/src/common/logging/backend.h @@ -28,6 +28,7 @@ struct Entry { unsigned int line_num; std::string function; std::string message; + bool final_entry = false; Entry() = default; Entry(Entry&& o) = default; diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index edf13bc49..2a3d26577 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include "common/common_types.h" @@ -41,7 +42,7 @@ public: template void Push(Arg&& t) { // create the element, add it to the queue - write_ptr->current = std::forward(t); + write_ptr->current = std::move(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); @@ -49,6 +50,7 @@ public: write_ptr = new_ptr; if (NeedSize) size++; + cv.notify_one(); } void Pop() { @@ -66,10 +68,11 @@ public: if (Empty()) return false; + ElementPtr* tmpptr = read_ptr; + if (NeedSize) size--; - ElementPtr* tmpptr = read_ptr; read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); @@ -77,6 +80,14 @@ public: return true; } + bool PopWait(T& t) { + if (Empty()) { + std::unique_lock lock(cv_mutex); + cv.wait(lock, [this]() { return !Empty(); }); + } + return Pop(t); + } + // not thread-safe void Clear() { size.store(0); @@ -104,6 +115,8 @@ private: ElementPtr* write_ptr; ElementPtr* read_ptr; std::atomic size; + std::mutex cv_mutex; + std::condition_variable cv; }; // a simple thread-safe, @@ -138,6 +151,10 @@ public: return spsc_queue.Pop(t); } + bool PopWait(T& t) { + return spsc_queue.PopWait(t); + } + // not thread-safe void Clear() { spsc_queue.Clear(); From 7efe60ed239a2cb25b25a0e9b44effe921438e4e Mon Sep 17 00:00:00 2001 From: B3n30 Date: Fri, 5 Oct 2018 11:25:23 +0200 Subject: [PATCH 2/4] Use SPSCQueue::PopWait in Scripting --- src/core/rpc/rpc_server.cpp | 13 +++---------- src/core/rpc/rpc_server.h | 3 --- src/core/rpc/server.cpp | 1 - src/core/rpc/zmq_server.cpp | 3 ++- 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/src/core/rpc/rpc_server.cpp b/src/core/rpc/rpc_server.cpp index 0a3b046ec..9f156fc22 100644 --- a/src/core/rpc/rpc_server.cpp +++ b/src/core/rpc/rpc_server.cpp @@ -106,10 +106,8 @@ void RPCServer::HandleRequestsLoop() { LOG_INFO(RPC_Server, "Request handler started."); - while (true) { - std::unique_lock lock(request_queue_mutex); - request_queue_cv.wait(lock, [&] { return !running || request_queue.Pop(request_packet); }); - if (!running) { + while (request_queue.PopWait(request_packet)) { + if (!request_packet) { break; } HandleSingleRequest(std::move(request_packet)); @@ -117,23 +115,18 @@ void RPCServer::HandleRequestsLoop() { } void RPCServer::QueueRequest(std::unique_ptr request) { - std::unique_lock lock(request_queue_mutex); request_queue.Push(std::move(request)); - request_queue_cv.notify_one(); } void RPCServer::Start() { - running = true; const auto threadFunction = [this]() { HandleRequestsLoop(); }; request_handler_thread = std::thread(threadFunction); server.Start(); } void RPCServer::Stop() { - running = false; - request_queue_cv.notify_one(); - request_handler_thread.join(); server.Stop(); + request_handler_thread.join(); } }; // namespace RPC diff --git a/src/core/rpc/rpc_server.h b/src/core/rpc/rpc_server.h index 62fdb739c..bb57bcdae 100644 --- a/src/core/rpc/rpc_server.h +++ b/src/core/rpc/rpc_server.h @@ -31,10 +31,7 @@ private: Server server; Common::SPSCQueue> request_queue; - bool running = false; std::thread request_handler_thread; - std::mutex request_queue_mutex; - std::condition_variable request_queue_cv; }; } // namespace RPC diff --git a/src/core/rpc/server.cpp b/src/core/rpc/server.cpp index 950881e9b..0ba052017 100644 --- a/src/core/rpc/server.cpp +++ b/src/core/rpc/server.cpp @@ -1,6 +1,5 @@ #include -#include "common/threadsafe_queue.h" #include "core/core.h" #include "core/rpc/rpc_server.h" #include "core/rpc/server.h" diff --git a/src/core/rpc/zmq_server.cpp b/src/core/rpc/zmq_server.cpp index 4825108d7..47885973c 100644 --- a/src/core/rpc/zmq_server.cpp +++ b/src/core/rpc/zmq_server.cpp @@ -52,7 +52,8 @@ void ZMQServer::WorkerLoop() { LOG_WARNING(RPC_Server, "Failed to receive data on ZeroMQ socket"); } } - + std::unique_ptr end_packet = nullptr; + new_request_callback(std::move(end_packet)); // Destroying the socket must be done by this thread. zmq_socket.reset(); } From ad8b9c0429b2e3cf264fe70698efdf4cb1deec20 Mon Sep 17 00:00:00 2001 From: B3n30 Date: Mon, 8 Oct 2018 23:28:54 +0200 Subject: [PATCH 3/4] Adressed review comments --- src/common/logging/backend.cpp | 3 ++- src/common/threadsafe_queue.h | 15 ++++++++------- src/core/rpc/rpc_server.cpp | 5 +---- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 9c3bb6b49..2915efd7b 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -81,7 +81,8 @@ private: backend->Write(e); } }; - while (message_queue.PopWait(entry)) { + while (true) { + entry = message_queue.PopWait(); if (entry.final_entry) { break; } diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 2a3d26577..68955d66d 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -42,7 +42,7 @@ public: template void Push(Arg&& t) { // create the element, add it to the queue - write_ptr->current = std::move(t); + write_ptr->current = std::forward(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); @@ -68,11 +68,10 @@ public: if (Empty()) return false; - ElementPtr* tmpptr = read_ptr; - if (NeedSize) size--; + ElementPtr* tmpptr = read_ptr; read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); @@ -80,12 +79,14 @@ public: return true; } - bool PopWait(T& t) { + T PopWait() { if (Empty()) { std::unique_lock lock(cv_mutex); cv.wait(lock, [this]() { return !Empty(); }); } - return Pop(t); + T t; + Pop(t); + return t; } // not thread-safe @@ -151,8 +152,8 @@ public: return spsc_queue.Pop(t); } - bool PopWait(T& t) { - return spsc_queue.PopWait(t); + T PopWait() { + return spsc_queue.PopWait(); } // not thread-safe diff --git a/src/core/rpc/rpc_server.cpp b/src/core/rpc/rpc_server.cpp index 9f156fc22..819880196 100644 --- a/src/core/rpc/rpc_server.cpp +++ b/src/core/rpc/rpc_server.cpp @@ -106,10 +106,7 @@ void RPCServer::HandleRequestsLoop() { LOG_INFO(RPC_Server, "Request handler started."); - while (request_queue.PopWait(request_packet)) { - if (!request_packet) { - break; - } + while ((request_packet = request_queue.PopWait())) { HandleSingleRequest(std::move(request_packet)); } } From e267377111291381a761d9af2cc0a5d04948359b Mon Sep 17 00:00:00 2001 From: B3n30 Date: Mon, 8 Oct 2018 23:49:07 +0200 Subject: [PATCH 4/4] More fixes --- src/core/rpc/server.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core/rpc/server.cpp b/src/core/rpc/server.cpp index 0ba052017..b3e66d6ff 100644 --- a/src/core/rpc/server.cpp +++ b/src/core/rpc/server.cpp @@ -25,9 +25,13 @@ void Server::Stop() { } void Server::NewRequestCallback(std::unique_ptr new_request) { - LOG_INFO(RPC_Server, "Received request version={} id={} type={} size={}", - new_request->GetVersion(), new_request->GetId(), - static_cast(new_request->GetPacketType()), new_request->GetPacketDataSize()); + if (new_request) { + LOG_INFO(RPC_Server, "Received request version={} id={} type={} size={}", + new_request->GetVersion(), new_request->GetId(), + static_cast(new_request->GetPacketType()), new_request->GetPacketDataSize()); + } else { + LOG_INFO(RPC_Server, "Received end packet"); + } rpc_server.QueueRequest(std::move(new_request)); }