From 01e61cd482ffa03548301e6db0fe275b841e4787 Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Wed, 15 Feb 2023 11:15:28 +0100 Subject: [PATCH] ADD: added Listener and Sender --- CMakeLists.txt | 10 ++ include/WHISPER/InternalUDPListener.hpp | 56 +++++++ include/WHISPER/InternalUDPSender.hpp | 39 +++++ src/WHISPER/InternalUDPListener.cpp | 185 ++++++++++++++++++++++++ src/WHISPER/InternalUDPSender.cpp | 41 ++++++ src/WHISPER/InternalUDPService.cpp | 52 ++++--- tests/test_InternalUDPListener.cpp | 72 +++++++++ tests/test_test.cpp | 2 +- 8 files changed, 437 insertions(+), 20 deletions(-) create mode 100644 include/WHISPER/InternalUDPListener.hpp create mode 100644 include/WHISPER/InternalUDPSender.hpp create mode 100644 src/WHISPER/InternalUDPListener.cpp create mode 100644 src/WHISPER/InternalUDPSender.cpp create mode 100644 tests/test_InternalUDPListener.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c1ec89..558b952 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,12 @@ add_library(whisper-com STATIC include/WHISPER/InternalUDPService.hpp src/WHISPER/InternalUDPService.cpp + include/WHISPER/InternalUDPListener.hpp + src/WHISPER/InternalUDPListener.cpp + + include/WHISPER/InternalUDPSender.hpp + src/WHISPER/InternalUDPSender.cpp + include/WHISPER/localClients.hpp src/WHISPER/localClients.cpp @@ -136,5 +142,9 @@ IF (${TEST_WHISPER_COMMUNICATION_LIBRARY}) target_link_libraries(test_test Catch2::Catch2 whisper-com loguru) catch_discover_tests(test_test) + add_executable(test_InternalUDPListener tests/test_InternalUDPListener.cpp) + target_link_libraries(test_InternalUDPListener Catch2::Catch2 whisper-com loguru) + catch_discover_tests(test_InternalUDPListener) + ENDIF() diff --git a/include/WHISPER/InternalUDPListener.hpp b/include/WHISPER/InternalUDPListener.hpp new file mode 100644 index 0000000..a91aadf --- /dev/null +++ b/include/WHISPER/InternalUDPListener.hpp @@ -0,0 +1,56 @@ +#pragma once + + +#include "WHISPER/Messages/Message.hpp" +#include "WHISPER/whisper.hpp" +#include +#include +#include +#include +#include +#include + +#define ZMQ_BUILD_DRAFT_API 1 +#include +#include + +namespace WHISPER { + + class InternalUDPListener{ + + private: + std::uint16_t port_; + //zeromq io contex + zmq::context_t ctx; + ///shared pointer to receiving socket + std::shared_ptr receiverSocket_ = nullptr; + std::thread listeningThread; + + std::atomic listening_ = false; + std::atomic stoplistening_ = false; + + std::vector subscribedTopics; + + std::shared_ptr> receiverQueue_; + + + void listen(); + + std::uint16_t checkPort(std::uint16_t port); + + public: + InternalUDPListener(std::uint16_t port); + ~InternalUDPListener(); + void connect(std::shared_ptr> receiver); + void start(); + void stop(); + + std::uint16_t getPort(); + + void subscribe(WHISPER::MsgTopics topic); + void unsubscribe(WHISPER::MsgTopics topic); + + + + }; +} diff --git a/include/WHISPER/InternalUDPSender.hpp b/include/WHISPER/InternalUDPSender.hpp new file mode 100644 index 0000000..e0186a3 --- /dev/null +++ b/include/WHISPER/InternalUDPSender.hpp @@ -0,0 +1,39 @@ +#pragma once + + +#include "WHISPER/Messages/Message.hpp" +#include "WHISPER/whisper.hpp" +#include +#include +#include +#include +#include +#include + +#define ZMQ_BUILD_DRAFT_API 1 +#include +#include + +namespace WHISPER { + + class InternalUDPSender{ + + private: + /// address to send to + std::string address_; + /// port to send to + std::uint16_t port_; + /// zeromq io contex + zmq::context_t ctx; + /// shared pointer to receiving socket + std::shared_ptr senderSocket_ = nullptr; + + public: + InternalUDPSender(std::string address, std::uint16_t port); + ~InternalUDPSender(); + + void send(WHISPER::Message msg,WHISPER::MsgTopics topic = WHISPER::MANAGEMENT); + + + }; +} diff --git a/src/WHISPER/InternalUDPListener.cpp b/src/WHISPER/InternalUDPListener.cpp new file mode 100644 index 0000000..f4d29e6 --- /dev/null +++ b/src/WHISPER/InternalUDPListener.cpp @@ -0,0 +1,185 @@ +#include "WHISPER/Messages/Message.hpp" +#include +#include +#include +#include +#include +#include +#include + +namespace WHISPER { + +InternalUDPListener::InternalUDPListener(std::uint16_t port) +{ + port_ = checkPort(port); +} + +InternalUDPListener::~InternalUDPListener() +{ + unsubscribe(WHISPER::MsgTopics::MANAGEMENT); + while (subscribedTopics.size() > 0) { + unsubscribe(subscribedTopics.front()); + } + + if (listening_ == true) { + stop(); + } + receiverSocket_->close(); +} + +void InternalUDPListener::connect(std::shared_ptr> receiver) +{ + receiverQueue_ = receiver; + + if (receiverSocket_ == nullptr) + { + ctx = zmq::context_t(2); + receiverSocket_ = std::make_shared(ctx,zmq::socket_type::dish); + + std::string portAsString = std::to_string(port_); + receiverSocket_->bind("udp://*:"+portAsString); + ///used to set a custom time out to the socket + receiverSocket_->set(zmq::sockopt::rcvtimeo,100); + } + subscribe(WHISPER::MsgTopics::MANAGEMENT); + + start(); +} + +void InternalUDPListener::listen() +{ + listening_ = true; + + while (stoplistening_ == false) + { + zmq::recv_result_t res; + zmq::message_t msg; + + ///res is the size of the received message or error; cant figure out what value is set when ther is an error + res = receiverSocket_->recv(msg,zmq::recv_flags::none); + // std::string message = msg.to_string(); + WHISPER::Message receivedMessage(msg.to_string()); + + receiverQueue_->addElement(receivedMessage); + + } + listening_ = false; +} +void InternalUDPListener::start() +{ + if (receiverSocket_ != nullptr) + { + if (listening_ != true) + { + stoplistening_ = false; + listeningThread = std::thread(&InternalUDPListener::listen,this); + while (listening_ != true) + { + } + } + } +} + +void InternalUDPListener::stop() +{ + stoplistening_ = true; + if (listeningThread.joinable()) + { + listeningThread.join(); + } + +} + +void InternalUDPListener::subscribe(WHISPER::MsgTopics topic) +{ + subscribedTopics.push_back(topic); + receiverSocket_->join(WHISPER::MsgTopicsMap[topic].c_str()); + +} + +void InternalUDPListener::unsubscribe(WHISPER::MsgTopics topic) +{ + try + { + receiverSocket_->leave(WHISPER::MsgTopicsMap[topic].c_str()); + for (auto it = subscribedTopics.begin(); it != subscribedTopics.end(); ) + { + if (*it == topic) + { + it = subscribedTopics.erase(it); + }else + { + it++; + } + } + } catch(const std::exception& e) + { + LOG_S(ERROR)< +#include +#include + +namespace WHISPER { + + + /// address to send to + std::string address_; + /// port to send to + std::uint16_t port_; + /// zeromq io contex + zmq::context_t ctx; + /// shared pointer to receiving socket + std::shared_ptr senderSocket_ = nullptr; + + InternalUDPSender::InternalUDPSender(std::string address, std::uint16_t port):address_(address),port_(port) + { + ctx = zmq::context_t(1); + senderSocket_ = std::make_shared(ctx,zmq::socket_type::radio); + senderSocket_->connect("udp://"+address_+":"+std::to_string(port_)); + } + + InternalUDPSender::~InternalUDPSender() + { + senderSocket_->close(); + } + + void InternalUDPSender::send(WHISPER::Message msg,WHISPER::MsgTopics topic) + { + std::string MessageString = msg.serialize(); + zmq::message_t zmqMsg(MessageString.begin(),MessageString.end()); + zmqMsg.set_group(WHISPER::MsgTopicsMap[topic].c_str()); + senderSocket_->send(zmqMsg,zmq::send_flags::none); + + } + + +} \ No newline at end of file diff --git a/src/WHISPER/InternalUDPService.cpp b/src/WHISPER/InternalUDPService.cpp index b788682..2c98860 100644 --- a/src/WHISPER/InternalUDPService.cpp +++ b/src/WHISPER/InternalUDPService.cpp @@ -39,7 +39,8 @@ namespace WHISPER { InternalUDPService::~InternalUDPService() { - if (this->isConnected() == true) { + if (this->isConnected() == true) + { disconnect(); } @@ -55,9 +56,11 @@ namespace WHISPER { std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); - }else if (ownReceivingPort_ == 0) { + }else if (ownReceivingPort_ == 0) + { throw std::invalid_argument( " receiver cant bind to port " ); - }else { + }else + { loopbackSocket = std::make_shared(ctx,zmq::socket_type::radio); loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); @@ -115,9 +118,8 @@ namespace WHISPER { { zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); - - sender.send(tmpmsg,zmq::send_flags::none); + zmq::message_t localmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); @@ -148,11 +150,12 @@ namespace WHISPER { sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]); - if (receivedMessage.deviceId_ != getOwnID()) { - + if (receivedMessage.deviceId_ != getOwnID()) + { int msgType = receivedMessage.msgType_; - if (msgType == WHISPER::JOIN) { + if (msgType == WHISPER::JOIN) + { WHISPER::Join join(message); // LOG_S(INFO)<<"joined id "<myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { LOG_S(INFO)<<"go for it"; bool clientAllreadyIn = false; - if (localclients.size() > 0) { + if (localclients.size() > 0) + { for (auto it = localclients.begin(); it != localclients.end();it++) { - if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_) { + if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_) + { clientAllreadyIn = true; } } } - if (clientAllreadyIn == false) { + if (clientAllreadyIn == false) + { // localClient client; auto tmpAddr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); @@ -201,7 +207,8 @@ namespace WHISPER { if (localclients.size() > 0) { for (auto it = localclients.begin(); it != localclients.end();it++) { - if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_)) { + if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_)) + { it->get()->getClientSocket()->close(); it = localclients.erase(it); LOG_S(INFO)<<"client left"; @@ -226,11 +233,13 @@ namespace WHISPER { { WHISPER::Pong pong(message); - if (localclients.size() > 0) { + if (localclients.size() > 0) + { for (auto it = localclients.begin(); it != localclients.end();it++) { - if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid()) { + if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid()) + { it->get()->setLastResponse(std::time(nullptr)); } } @@ -248,11 +257,13 @@ namespace WHISPER { void InternalUDPService::sendToLocalClients(std::string msg,std::string topic) { - if (localclients.size() > 0 && isGateway() == true) { + if (localclients.size() > 0 && isGateway() == true) + { auto tmpMsg = WHISPER::Message(msg); for (auto it = localclients.begin(); it != localclients.end();it++) { - if (tmpMsg.deviceId_ != it->get()->getID()) { + if (tmpMsg.deviceId_ != it->get()->getID()) + { zmq::message_t tmp(msg.begin(),msg.end()); tmp.set_group(topic.c_str()); it->get()->getClientSocket()->send(tmp,zmq::send_flags::none); @@ -286,11 +297,13 @@ namespace WHISPER { } void InternalUDPService::stopChecker() { - while (clientCheckerisRunning == true ) { + while (clientCheckerisRunning == true ) + { stopClientChecker = true; // std::this_thread::sleep_for(std::chrono::milliseconds(100)); LOG_S(INFO)<<"waiting for checker thread"; - if (clientsCheckerThread.joinable() == true ) { + if (clientsCheckerThread.joinable() == true ) + { clientsCheckerThread.join(); } } @@ -302,7 +315,8 @@ namespace WHISPER { while(stopClientChecker == false) { - if (localclients.size() > 0) { + if (localclients.size() > 0) + { WHISPER::Ping ping(getParentID(),getOwnID(),getOwnDeviceType(),port_); sendToLocalClients(ping.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); diff --git a/tests/test_InternalUDPListener.cpp b/tests/test_InternalUDPListener.cpp new file mode 100644 index 0000000..54d880a --- /dev/null +++ b/tests/test_InternalUDPListener.cpp @@ -0,0 +1,72 @@ +/* +* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +*/ + +/** +* @file +* @copyright 2022 MPLv2 +*/ + +/** +* @defgroup tests +*/ + + +#include "WHISPER/InternalUDPSender.hpp" +#include "WHISPER/Messages/stringData.hpp" +#include +#define CATCH_CONFIG_MAIN + +#include + +#include "WHISPER/InternalUDPListener.hpp" +#include "WHISPER/Messages/Message.hpp" +#include "zmq.hpp" + #include "google/protobuf/message.h" + + +#include + +/** +* @brief brief test description +* @ingroup group +*/ +SCENARIO("A test scenario","[keywords]") +{ + GIVEN("Preliminaries") + { + auto *listener = new WHISPER::InternalUDPListener(8000); + auto sender = std::make_shared("127.0.0.1",8000); + + WHEN("doing something") + { + auto receiver = std::make_shared>(); + listener->connect(receiver) ; + listener->subscribe(WHISPER::DATA); + // listener->start(); + // listener->stop(); + auto stringMsg = WHISPER::StringData(1,1,WHISPER::SourceType::SENSOR,"hello world"); + sender->send(stringMsg,WHISPER::DATA); + LOG_S(INFO)<<"wating"; + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + + LOG_S(INFO)<<"closing"; + + THEN("expecting something to happen") + { + REQUIRE(receiver->size() > 0); + WHISPER::Message msg; + receiver->get(msg); + LOG_S(INFO)<< msg.msgType_; + WHISPER::StringData data(msg.serialize()); + REQUIRE(data.data_ == "hello world"); + + delete listener; + REQUIRE(receiver.unique() == true ); + } // THEN + } // WHEN + } // GIVEN +} // SCENARIO diff --git a/tests/test_test.cpp b/tests/test_test.cpp index bdd515c..5cd649c 100644 --- a/tests/test_test.cpp +++ b/tests/test_test.cpp @@ -36,7 +36,7 @@ SCENARIO("A test scenario","[keywords]") { WHISPER::InternalUDPService service(0,1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1"); auto receiver = std::make_shared>(); - service.connect(receiver) ; + service.connect(receiver) ; // service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); int i = 0;