diff --git a/CMakeLists.txt b/CMakeLists.txt index 68cb879..9917cf8 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,6 +93,8 @@ add_library(whisper-com STATIC src/DirectCommunicationServer.cpp src/DirectCommunicationClient.cpp + + ) @@ -151,5 +153,9 @@ IF (${TEST_WHISPER_COMMUNICATION_LIBRARY}) target_link_libraries(test_InternalUDPListener Catch2::Catch2 whisper-com loguru) catch_discover_tests(test_InternalUDPListener) + add_executable(test_DirectComms tests/test_DirectComms.cpp) + target_link_libraries(test_DirectComms Catch2::Catch2 whisper-com loguru) + catch_discover_tests(test_DirectComms) + ENDIF() diff --git a/include/DirectCommunicationClient.hpp b/include/DirectCommunicationClient.hpp index 7b9637e..e1c61dc 100644 --- a/include/DirectCommunicationClient.hpp +++ b/include/DirectCommunicationClient.hpp @@ -1 +1,48 @@ -#pragma once \ No newline at end of file +#pragma once + + + + +#include "WHISPER/threadSafeQueue.hpp" +#include "zmq.hpp" +#include +#include +#include +#include +#include +#include +#include + + + +namespace DirectCommunication +{ + + class DirectCommunicationClient + { + public: + DirectCommunicationClient(ushort port, std::string ServerAddress); + ~DirectCommunicationClient(); + + void sendMessage(std::string msg); + std::unique_ptr getLatestMessage(); + + private: + ushort port_; + std::string serverAddress_; + + zmq::context_t context_; + zmq::socket_t socket_; + + void workerFunc_(); + std::thread Worker_; + std::atomic_bool stopWorker_; + + WHISPER::threadSafeQueue receivedMessages_; + + + + }; + + +} \ No newline at end of file diff --git a/include/DirectCommunicationServer.hpp b/include/DirectCommunicationServer.hpp index b921c32..c3c315b 100644 --- a/include/DirectCommunicationServer.hpp +++ b/include/DirectCommunicationServer.hpp @@ -27,20 +27,22 @@ namespace DirectCommunication void sendMessage(std::string msg); std::unique_ptr getLatestMessage(); + int countClients(); + private: ushort port_; zmq::context_t context_; zmq::socket_t socket_; - void worker_(); + void workerFunc_(); std::thread Worker_; std::atomic_bool stopWorker_; WHISPER::threadSafeQueue receivedMessages_; std::vector connectedClients_; - bool hasClient(std::uint32_t); + bool hasClient(std::uint32_t clientId); }; diff --git a/src/DirectCommunicationClient.cpp b/src/DirectCommunicationClient.cpp index 8308dc1..d4cc912 100644 --- a/src/DirectCommunicationClient.cpp +++ b/src/DirectCommunicationClient.cpp @@ -1 +1,90 @@ -#include \ No newline at end of file +#include + + +#include "zmq.hpp" +#include +#include +#include +#include +#include + + +namespace DirectCommunication +{ + DirectCommunicationClient::DirectCommunicationClient(ushort port, std::string ServerAddress):port_(port),serverAddress_(ServerAddress) + { + + context_ = zmq::context_t(1); + socket_ = zmq::socket_t(context_,zmq::socket_type::client); + + socket_.connect("tcp://"+serverAddress_+":"+std::to_string(port_)); + socket_.set(zmq::sockopt::rcvtimeo,100); + + Worker_ = std::thread(&DirectCommunicationClient::workerFunc_,this); + stopWorker_ = false; + + + } + + + DirectCommunicationClient::~DirectCommunicationClient() + { + sendMessage("CLOSE"); + stopWorker_ = true; + Worker_.join(); + + socket_.close(); + } + + void DirectCommunicationClient::sendMessage(std::string msg) + { + + zmq::message_t zmqMsg(msg.begin(),msg.end()); + + socket_.send(zmqMsg,zmq::send_flags::dontwait); + + + } + + std::unique_ptr DirectCommunicationClient::getLatestMessage() + { + if (receivedMessages_.size() > 0) + { + std::string msg; + receivedMessages_.get(msg); + return std::make_unique(std::move(msg)); + } + return nullptr; + } + + void DirectCommunicationClient::workerFunc_() + { + + while (stopWorker_ == false) + { + zmq::message_t msg; + auto recv = socket_.recv(msg,zmq::recv_flags::none); + + if (recv > 0) + { + + if (msg.to_string() == "CLOSE") + { + + socket_.disconnect("tcp://"+serverAddress_+":"+std::to_string(port_)); + + }else + { + receivedMessages_.addElement(msg.to_string()); + } + + + } + } + + } + + + + +} \ No newline at end of file diff --git a/src/DirectCommunicationServer.cpp b/src/DirectCommunicationServer.cpp index 95521ba..9286532 100644 --- a/src/DirectCommunicationServer.cpp +++ b/src/DirectCommunicationServer.cpp @@ -1,6 +1,10 @@ #include "zmq.hpp" #include +#include +#include #include +#include +#include namespace DirectCommunication @@ -9,10 +13,13 @@ namespace DirectCommunication { context_ = zmq::context_t(1); - socket_ = zmq::socket_t(context_,zmq::socket_type::server); - + socket_.bind("tcp://*:"+std::to_string(port_)); + socket_.set(zmq::sockopt::rcvtimeo,100); + + Worker_ = std::thread(&DirectCommunicationServer::workerFunc_,this); + stopWorker_ = false; } @@ -20,29 +27,82 @@ namespace DirectCommunication DirectCommunicationServer::~DirectCommunicationServer() { + sendMessage("CLOSE"); + stopWorker_ = true; + Worker_.join(); + + socket_.close(); } void DirectCommunicationServer::sendMessage(std::string msg) { - + if (connectedClients_.size() > 0) + { + for (auto it = connectedClients_.begin(); it != connectedClients_.end(); ++it) { + zmq::message_t zmqMsg(msg.begin(),msg.end()); + zmqMsg.set_routing_id(*it); + socket_.send(zmqMsg,zmq::send_flags::dontwait); + } + } } std::unique_ptr DirectCommunicationServer::getLatestMessage() { - + if (receivedMessages_.size() > 0) + { + std::string msg; + receivedMessages_.get(msg); + return std::make_unique(std::move(msg)); + } + return nullptr; } - void DirectCommunicationServer::worker_() + int DirectCommunicationServer::countClients() { + return connectedClients_.size(); + } + void DirectCommunicationServer::workerFunc_() + { + LOG_S(INFO)<<"worker started"; + while (stopWorker_ == false) + { + zmq::message_t msg; + auto recv = socket_.recv(msg,zmq::recv_flags::none); + + if (recv > 0) + { + if (!hasClient(msg.routing_id())) + { + connectedClients_.push_back(msg.routing_id()); + LOG_S(INFO)<< "new Client connected ID: " < +#define CATCH_CONFIG_MAIN + +#include + +#include +#include +#include + +#include + +/** +* @brief brief test description +* @ingroup group +*/ +SCENARIO("A test scenario","[keywords]") +{ + GIVEN("Preliminaries") + { + DirectCommunication::DirectCommunicationServer server(5555); + // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + DirectCommunication::DirectCommunicationClient client(5555,"127.0.0.1"); + + std::string msg1 = "hello client"; + std::string msg2 = "hello client again"; + + + WHEN("doing something") + { + client.sendMessage("hello server"); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + auto msg1FromClient = server.getLatestMessage(); + LOG_S(INFO)<< *msg1FromClient.get(); + LOG_S(INFO)<< server.countClients(); + THEN("expecting something to happen") + { + REQUIRE(server.countClients() == 1 ); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + REQUIRE(*msg1FromClient.get() == "hello server"); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + server.sendMessage(msg1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + REQUIRE(*client.getLatestMessage().get() == msg1); + + server.sendMessage(msg2); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + REQUIRE(*client.getLatestMessage().get() == msg2); + + + } // THEN + } // WHEN + } // GIVEN +} // SCENARIO