ADD: added test for direct zmq tcp and a first version of the server and client class

This commit is contained in:
Henry Winkel
2023-07-05 17:05:03 +02:00
parent f03c00a1a8
commit 828d038d5f
6 changed files with 289 additions and 11 deletions

View File

@@ -93,6 +93,8 @@ add_library(whisper-com STATIC
src/DirectCommunicationServer.cpp src/DirectCommunicationServer.cpp
src/DirectCommunicationClient.cpp src/DirectCommunicationClient.cpp
) )
@@ -151,5 +153,9 @@ IF (${TEST_WHISPER_COMMUNICATION_LIBRARY})
target_link_libraries(test_InternalUDPListener Catch2::Catch2 whisper-com loguru) target_link_libraries(test_InternalUDPListener Catch2::Catch2 whisper-com loguru)
catch_discover_tests(test_InternalUDPListener) catch_discover_tests(test_InternalUDPListener)
add_executable(test_DirectComms tests/test_DirectComms.cpp)
target_link_libraries(test_DirectComms Catch2::Catch2 whisper-com loguru)
catch_discover_tests(test_DirectComms)
ENDIF() ENDIF()

View File

@@ -1 +1,48 @@
#pragma once #pragma once
#include "WHISPER/threadSafeQueue.hpp"
#include "zmq.hpp"
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <sys/types.h>
#include <thread>
#include <vector>
namespace DirectCommunication
{
class DirectCommunicationClient
{
public:
DirectCommunicationClient(ushort port, std::string ServerAddress);
~DirectCommunicationClient();
void sendMessage(std::string msg);
std::unique_ptr<std::string> getLatestMessage();
private:
ushort port_;
std::string serverAddress_;
zmq::context_t context_;
zmq::socket_t socket_;
void workerFunc_();
std::thread Worker_;
std::atomic_bool stopWorker_;
WHISPER::threadSafeQueue<std::string> receivedMessages_;
};
}

View File

@@ -27,20 +27,22 @@ namespace DirectCommunication
void sendMessage(std::string msg); void sendMessage(std::string msg);
std::unique_ptr<std::string> getLatestMessage(); std::unique_ptr<std::string> getLatestMessage();
int countClients();
private: private:
ushort port_; ushort port_;
zmq::context_t context_; zmq::context_t context_;
zmq::socket_t socket_; zmq::socket_t socket_;
void worker_(); void workerFunc_();
std::thread Worker_; std::thread Worker_;
std::atomic_bool stopWorker_; std::atomic_bool stopWorker_;
WHISPER::threadSafeQueue<std::string> receivedMessages_; WHISPER::threadSafeQueue<std::string> receivedMessages_;
std::vector<std::uint32_t> connectedClients_; std::vector<std::uint32_t> connectedClients_;
bool hasClient(std::uint32_t); bool hasClient(std::uint32_t clientId);
}; };

View File

@@ -1 +1,90 @@
#include <DirectCommunicationClient.hpp> #include <DirectCommunicationClient.hpp>
#include "zmq.hpp"
#include <memory>
#include <string>
#include <sys/types.h>
#include <thread>
#include <loguru.hpp>
namespace DirectCommunication
{
DirectCommunicationClient::DirectCommunicationClient(ushort port, std::string ServerAddress):port_(port),serverAddress_(ServerAddress)
{
context_ = zmq::context_t(1);
socket_ = zmq::socket_t(context_,zmq::socket_type::client);
socket_.connect("tcp://"+serverAddress_+":"+std::to_string(port_));
socket_.set(zmq::sockopt::rcvtimeo,100);
Worker_ = std::thread(&DirectCommunicationClient::workerFunc_,this);
stopWorker_ = false;
}
DirectCommunicationClient::~DirectCommunicationClient()
{
sendMessage("CLOSE");
stopWorker_ = true;
Worker_.join();
socket_.close();
}
void DirectCommunicationClient::sendMessage(std::string msg)
{
zmq::message_t zmqMsg(msg.begin(),msg.end());
socket_.send(zmqMsg,zmq::send_flags::dontwait);
}
std::unique_ptr<std::string> DirectCommunicationClient::getLatestMessage()
{
if (receivedMessages_.size() > 0)
{
std::string msg;
receivedMessages_.get(msg);
return std::make_unique<std::string>(std::move(msg));
}
return nullptr;
}
void DirectCommunicationClient::workerFunc_()
{
while (stopWorker_ == false)
{
zmq::message_t msg;
auto recv = socket_.recv(msg,zmq::recv_flags::none);
if (recv > 0)
{
if (msg.to_string() == "CLOSE")
{
socket_.disconnect("tcp://"+serverAddress_+":"+std::to_string(port_));
}else
{
receivedMessages_.addElement(msg.to_string());
}
}
}
}
}

View File

@@ -1,6 +1,10 @@
#include "zmq.hpp" #include "zmq.hpp"
#include <DirectCommunicationServer.hpp> #include <DirectCommunicationServer.hpp>
#include <memory>
#include <string>
#include <sys/types.h> #include <sys/types.h>
#include <thread>
#include <loguru.hpp>
namespace DirectCommunication namespace DirectCommunication
@@ -9,10 +13,13 @@ namespace DirectCommunication
{ {
context_ = zmq::context_t(1); context_ = zmq::context_t(1);
socket_ = zmq::socket_t(context_,zmq::socket_type::server); socket_ = zmq::socket_t(context_,zmq::socket_type::server);
socket_.bind("tcp://*:"+std::to_string(port_));
socket_.set(zmq::sockopt::rcvtimeo,100);
Worker_ = std::thread(&DirectCommunicationServer::workerFunc_,this);
stopWorker_ = false;
} }
@@ -20,29 +27,82 @@ namespace DirectCommunication
DirectCommunicationServer::~DirectCommunicationServer() DirectCommunicationServer::~DirectCommunicationServer()
{ {
sendMessage("CLOSE");
stopWorker_ = true;
Worker_.join();
socket_.close();
} }
void DirectCommunicationServer::sendMessage(std::string msg) void DirectCommunicationServer::sendMessage(std::string msg)
{ {
if (connectedClients_.size() > 0)
{
for (auto it = connectedClients_.begin(); it != connectedClients_.end(); ++it) {
zmq::message_t zmqMsg(msg.begin(),msg.end());
zmqMsg.set_routing_id(*it);
socket_.send(zmqMsg,zmq::send_flags::dontwait);
}
}
} }
std::unique_ptr<std::string> DirectCommunicationServer::getLatestMessage() std::unique_ptr<std::string> DirectCommunicationServer::getLatestMessage()
{ {
if (receivedMessages_.size() > 0)
}
void DirectCommunicationServer::worker_()
{ {
std::string msg;
receivedMessages_.get(msg);
return std::make_unique<std::string>(std::move(msg));
}
return nullptr;
} }
int DirectCommunicationServer::countClients()
bool DirectCommunicationServer::hasClient(std::uint32_t)
{ {
return connectedClients_.size();
}
void DirectCommunicationServer::workerFunc_()
{
LOG_S(INFO)<<"worker started";
while (stopWorker_ == false)
{
zmq::message_t msg;
auto recv = socket_.recv(msg,zmq::recv_flags::none);
if (recv > 0)
{
if (!hasClient(msg.routing_id()))
{
connectedClients_.push_back(msg.routing_id());
LOG_S(INFO)<< "new Client connected ID: " <<msg.routing_id();
}
if (msg.to_string() == "CLOSE")
{
auto it = std::find(connectedClients_.begin(), connectedClients_.end(), msg.routing_id());
connectedClients_.erase(it);
}else
{
receivedMessages_.addElement(msg.to_string());
}
}
}
}
bool DirectCommunicationServer::hasClient(std::uint32_t clientId)
{
if ( std::find(connectedClients_.begin(), connectedClients_.end(), clientId) != connectedClients_.end() )
{
return true;
}
else
{
return false;
}
} }

View File

@@ -0,0 +1,74 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
/**
* @file
* @copyright 2022 MPLv2
*/
/**
* @defgroup tests
*/
#include <string>
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
#include <DirectCommunicationClient.hpp>
#include <DirectCommunicationServer.hpp>
#include <loguru.hpp>
#include <memory>
/**
* @brief brief test description
* @ingroup group
*/
SCENARIO("A test scenario","[keywords]")
{
GIVEN("Preliminaries")
{
DirectCommunication::DirectCommunicationServer server(5555);
// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
DirectCommunication::DirectCommunicationClient client(5555,"127.0.0.1");
std::string msg1 = "hello client";
std::string msg2 = "hello client again";
WHEN("doing something")
{
client.sendMessage("hello server");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
auto msg1FromClient = server.getLatestMessage();
LOG_S(INFO)<< *msg1FromClient.get();
LOG_S(INFO)<< server.countClients();
THEN("expecting something to happen")
{
REQUIRE(server.countClients() == 1 );
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
REQUIRE(*msg1FromClient.get() == "hello server");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
server.sendMessage(msg1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(*client.getLatestMessage().get() == msg1);
server.sendMessage(msg2);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
REQUIRE(*client.getLatestMessage().get() == msg2);
} // THEN
} // WHEN
} // GIVEN
} // SCENARIO