ADD: added message callback to udp listener
This commit is contained in:
@@ -9,6 +9,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <WHISPER/localClients.hpp>
|
#include <WHISPER/localClients.hpp>
|
||||||
|
#include <future>
|
||||||
|
|
||||||
#define ZMQ_BUILD_DRAFT_API 1
|
#define ZMQ_BUILD_DRAFT_API 1
|
||||||
#include <zmq.hpp>
|
#include <zmq.hpp>
|
||||||
@@ -18,6 +19,22 @@ namespace WHISPER {
|
|||||||
|
|
||||||
class InternalUDPListener{
|
class InternalUDPListener{
|
||||||
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
InternalUDPListener(std::uint16_t port);
|
||||||
|
~InternalUDPListener();
|
||||||
|
void connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver);
|
||||||
|
void start();
|
||||||
|
void stop();
|
||||||
|
|
||||||
|
std::uint16_t getPort();
|
||||||
|
|
||||||
|
void subscribe(WHISPER::MsgTopics topic);
|
||||||
|
void unsubscribe(WHISPER::MsgTopics topic);
|
||||||
|
|
||||||
|
void registerMessageCallback(std::function<void(std::string)> MessageHandle);
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::uint16_t port_;
|
std::uint16_t port_;
|
||||||
//zeromq io contex
|
//zeromq io contex
|
||||||
@@ -38,17 +55,8 @@ namespace WHISPER {
|
|||||||
|
|
||||||
std::uint16_t checkPort(std::uint16_t port);
|
std::uint16_t checkPort(std::uint16_t port);
|
||||||
|
|
||||||
public:
|
std::function<void(std::string)> MessageHandle_ = nullptr;
|
||||||
InternalUDPListener(std::uint16_t port);
|
std::atomic_bool useHandl_ = false;;
|
||||||
~InternalUDPListener();
|
|
||||||
void connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver);
|
|
||||||
void start();
|
|
||||||
void stop();
|
|
||||||
|
|
||||||
std::uint16_t getPort();
|
|
||||||
|
|
||||||
void subscribe(WHISPER::MsgTopics topic);
|
|
||||||
void unsubscribe(WHISPER::MsgTopics topic);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ namespace WHISPER {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
InternalUDPService(std::string id, std::uint16_t port, std::string destinationAdress, std::string myAdress);
|
InternalUDPService(std::string id, std::uint16_t port, std::string destinationAdress, std::string myAdress);
|
||||||
|
|
||||||
~InternalUDPService();
|
~InternalUDPService();
|
||||||
private:
|
private:
|
||||||
/// ip address of the destination
|
/// ip address of the destination
|
||||||
|
|||||||
@@ -58,9 +58,15 @@ void InternalUDPListener::listen()
|
|||||||
///res is the size of the received message or error; cant figure out what value is set when ther is an error
|
///res is the size of the received message or error; cant figure out what value is set when ther is an error
|
||||||
res = receiverSocket_->recv(msg,zmq::recv_flags::none);
|
res = receiverSocket_->recv(msg,zmq::recv_flags::none);
|
||||||
// std::string message = msg.to_string();
|
// std::string message = msg.to_string();
|
||||||
WHISPER::Message receivedMessage(msg.to_string());
|
|
||||||
|
|
||||||
receiverQueue_->addElement(receivedMessage);
|
if (useHandl_ == true)
|
||||||
|
{
|
||||||
|
auto i = std::async(std::launch::async, MessageHandle_, msg.to_string());
|
||||||
|
} else
|
||||||
|
{
|
||||||
|
WHISPER::Message receivedMessage(msg.to_string());
|
||||||
|
receiverQueue_->addElement(receivedMessage);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
listening_ = false;
|
listening_ = false;
|
||||||
@@ -118,6 +124,14 @@ void InternalUDPListener::unsubscribe(WHISPER::MsgTopics topic)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void InternalUDPListener::registerMessageCallback(std::function<void(std::string)> MessageHandle)
|
||||||
|
{
|
||||||
|
this->MessageHandle_ = MessageHandle;
|
||||||
|
useHandl_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
std::uint16_t InternalUDPListener::checkPort(std::uint16_t port)
|
std::uint16_t InternalUDPListener::checkPort(std::uint16_t port)
|
||||||
{
|
{
|
||||||
std::uint16_t localPort = port;
|
std::uint16_t localPort = port;
|
||||||
|
|||||||
@@ -44,13 +44,13 @@ int main()
|
|||||||
// WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
|
// WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
|
||||||
|
|
||||||
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
||||||
WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178");
|
WHISPER::InternalUDPService service("01",8000,"192.168.1.255","192.168.1.178");
|
||||||
service.connect(receiver);
|
service.connect(receiver);
|
||||||
|
|
||||||
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
||||||
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
||||||
|
|
||||||
WHISPER::StringData data(0,1,WHISPER::SHIP,"hello world");
|
WHISPER::StringData data("01","hello world");
|
||||||
service.publish(data.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
service.publish(data.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ int main()
|
|||||||
|
|
||||||
|
|
||||||
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
||||||
WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178");
|
WHISPER::InternalUDPService service("02",8000,"192.168.1.255","192.168.1.178");
|
||||||
|
|
||||||
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
||||||
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ SCENARIO("A test scenario","[keywords]")
|
|||||||
listener->subscribe(WHISPER::DATA);
|
listener->subscribe(WHISPER::DATA);
|
||||||
// listener->start();
|
// listener->start();
|
||||||
// listener->stop();
|
// listener->stop();
|
||||||
auto stringMsg = WHISPER::StringData(1,1,WHISPER::SourceType::SENSOR,"hello world");
|
auto stringMsg = WHISPER::StringData("11","hello world");
|
||||||
sender->send(stringMsg,WHISPER::DATA);
|
sender->send(stringMsg,WHISPER::DATA);
|
||||||
LOG_S(INFO)<<"wating";
|
LOG_S(INFO)<<"wating";
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ SCENARIO("A test scenario","[keywords]")
|
|||||||
{
|
{
|
||||||
GIVEN("Preliminaries")
|
GIVEN("Preliminaries")
|
||||||
{
|
{
|
||||||
WHISPER::InternalUDPService service(0,1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1");
|
WHISPER::InternalUDPService service("01",8000,"127.0.0.255","127.0.0.1");
|
||||||
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
|
||||||
service.connect(receiver) ;
|
service.connect(receiver) ;
|
||||||
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
|
||||||
|
|||||||
Reference in New Issue
Block a user