From f794f4c5d07f8d855bb86521353b8c379ceaefec Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Wed, 16 Aug 2023 15:01:23 +0200 Subject: [PATCH] ADD: added message callback to udp listener --- include/WHISPER/InternalUDPListener.hpp | 30 ++++++++++++++++--------- include/WHISPER/InternalUDPService.hpp | 1 + src/WHISPER/InternalUDPListener.cpp | 20 ++++++++++++++--- src/main.cpp | 4 ++-- src/mainRecv.cpp | 2 +- tests/test_InternalUDPListener.cpp | 2 +- tests/test_test.cpp | 2 +- 7 files changed, 42 insertions(+), 19 deletions(-) diff --git a/include/WHISPER/InternalUDPListener.hpp b/include/WHISPER/InternalUDPListener.hpp index a91aadf..d5f1a00 100644 --- a/include/WHISPER/InternalUDPListener.hpp +++ b/include/WHISPER/InternalUDPListener.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #define ZMQ_BUILD_DRAFT_API 1 #include @@ -18,6 +19,22 @@ namespace WHISPER { class InternalUDPListener{ + + public: + InternalUDPListener(std::uint16_t port); + ~InternalUDPListener(); + void connect(std::shared_ptr> receiver); + void start(); + void stop(); + + std::uint16_t getPort(); + + void subscribe(WHISPER::MsgTopics topic); + void unsubscribe(WHISPER::MsgTopics topic); + + void registerMessageCallback(std::function MessageHandle); + + private: std::uint16_t port_; //zeromq io contex @@ -38,17 +55,8 @@ namespace WHISPER { std::uint16_t checkPort(std::uint16_t port); - public: - InternalUDPListener(std::uint16_t port); - ~InternalUDPListener(); - void connect(std::shared_ptr> receiver); - void start(); - void stop(); - - std::uint16_t getPort(); - - void subscribe(WHISPER::MsgTopics topic); - void unsubscribe(WHISPER::MsgTopics topic); + std::function MessageHandle_ = nullptr; + std::atomic_bool useHandl_ = false;; diff --git a/include/WHISPER/InternalUDPService.hpp b/include/WHISPER/InternalUDPService.hpp index 5d1a698..70556e6 100644 --- a/include/WHISPER/InternalUDPService.hpp +++ b/include/WHISPER/InternalUDPService.hpp @@ -34,6 +34,7 @@ namespace WHISPER { public: InternalUDPService(std::string id, std::uint16_t port, std::string destinationAdress, std::string myAdress); + ~InternalUDPService(); private: /// ip address of the destination diff --git a/src/WHISPER/InternalUDPListener.cpp b/src/WHISPER/InternalUDPListener.cpp index f4e642a..08250b4 100644 --- a/src/WHISPER/InternalUDPListener.cpp +++ b/src/WHISPER/InternalUDPListener.cpp @@ -57,10 +57,16 @@ void InternalUDPListener::listen() ///res is the size of the received message or error; cant figure out what value is set when ther is an error res = receiverSocket_->recv(msg,zmq::recv_flags::none); - // std::string message = msg.to_string(); - WHISPER::Message receivedMessage(msg.to_string()); + // std::string message = msg.to_string(); - receiverQueue_->addElement(receivedMessage); + if (useHandl_ == true) + { + auto i = std::async(std::launch::async, MessageHandle_, msg.to_string()); + } else + { + WHISPER::Message receivedMessage(msg.to_string()); + receiverQueue_->addElement(receivedMessage); + } } listening_ = false; @@ -118,6 +124,14 @@ void InternalUDPListener::unsubscribe(WHISPER::MsgTopics topic) } } + +void InternalUDPListener::registerMessageCallback(std::function MessageHandle) +{ + this->MessageHandle_ = MessageHandle; + useHandl_ = true; +} + + std::uint16_t InternalUDPListener::checkPort(std::uint16_t port) { std::uint16_t localPort = port; diff --git a/src/main.cpp b/src/main.cpp index 5b8f7d3..7ea7cd4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -44,13 +44,13 @@ int main() // WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178"); auto receiver = std::make_shared>(); - WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178"); + WHISPER::InternalUDPService service("01",8000,"192.168.1.255","192.168.1.178"); service.connect(receiver); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); // service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); - WHISPER::StringData data(0,1,WHISPER::SHIP,"hello world"); + WHISPER::StringData data("01","hello world"); service.publish(data.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); diff --git a/src/mainRecv.cpp b/src/mainRecv.cpp index 652fb43..347a875 100644 --- a/src/mainRecv.cpp +++ b/src/mainRecv.cpp @@ -48,7 +48,7 @@ int main() auto receiver = std::make_shared>(); - WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178"); + WHISPER::InternalUDPService service("02",8000,"192.168.1.255","192.168.1.178"); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); diff --git a/tests/test_InternalUDPListener.cpp b/tests/test_InternalUDPListener.cpp index 54d880a..5b31c65 100644 --- a/tests/test_InternalUDPListener.cpp +++ b/tests/test_InternalUDPListener.cpp @@ -47,7 +47,7 @@ SCENARIO("A test scenario","[keywords]") listener->subscribe(WHISPER::DATA); // listener->start(); // listener->stop(); - auto stringMsg = WHISPER::StringData(1,1,WHISPER::SourceType::SENSOR,"hello world"); + auto stringMsg = WHISPER::StringData("11","hello world"); sender->send(stringMsg,WHISPER::DATA); LOG_S(INFO)<<"wating"; std::this_thread::sleep_for(std::chrono::milliseconds(200)); diff --git a/tests/test_test.cpp b/tests/test_test.cpp index 5cd649c..1f56fbc 100644 --- a/tests/test_test.cpp +++ b/tests/test_test.cpp @@ -34,7 +34,7 @@ SCENARIO("A test scenario","[keywords]") { GIVEN("Preliminaries") { - WHISPER::InternalUDPService service(0,1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1"); + WHISPER::InternalUDPService service("01",8000,"127.0.0.255","127.0.0.1"); auto receiver = std::make_shared>(); service.connect(receiver) ; // service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);