ADD: added usage without a receiver queue

This commit is contained in:
hwinkel
2023-09-26 11:05:50 +02:00
parent 290f7a9540
commit f17566cba0
2 changed files with 38 additions and 9 deletions

View File

@@ -23,7 +23,12 @@ namespace WHISPER {
public: public:
InternalUDPListener(std::uint16_t port, std::string address = "*"); InternalUDPListener(std::uint16_t port, std::string address = "*");
~InternalUDPListener(); ~InternalUDPListener();
[[DEPRECATED]]
void connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver); void connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver);
void connect();
void addReceiverQueue(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver);
void start(); void start();
void stop(); void stop();

View File

@@ -33,7 +33,7 @@ void InternalUDPListener::connect(std::shared_ptr<threadSafeQueue<WHISPER::Messa
if (receiverSocket_ == nullptr) if (receiverSocket_ == nullptr)
{ {
ctx = zmq::context_t(2); ctx = zmq::context_t();
receiverSocket_ = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::dish); receiverSocket_ = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::dish);
std::string portAsString = std::to_string(port_); std::string portAsString = std::to_string(port_);
@@ -46,6 +46,30 @@ void InternalUDPListener::connect(std::shared_ptr<threadSafeQueue<WHISPER::Messa
start(); start();
} }
void InternalUDPListener::connect()
{
if (receiverSocket_ == nullptr)
{
ctx = zmq::context_t();
receiverSocket_ = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::dish);
std::string portAsString = std::to_string(port_);
receiverSocket_->bind("udp://"+address_+":"+portAsString);
///used to set a custom time out to the socket
receiverSocket_->set(zmq::sockopt::rcvtimeo,100);
}
subscribe(WHISPER::MsgTopics::MANAGEMENT);
start();
}
void InternalUDPListener::addReceiverQueue(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver)
{
receiverQueue_ = receiver;
}
void InternalUDPListener::listen() void InternalUDPListener::listen()
{ {
listening_ = true; listening_ = true;
@@ -59,14 +83,14 @@ void InternalUDPListener::listen()
res = receiverSocket_->recv(msg,zmq::recv_flags::none); res = receiverSocket_->recv(msg,zmq::recv_flags::none);
// std::string message = msg.to_string(); // std::string message = msg.to_string();
if (useHandl_ == true) if (useHandl_ == true)
{ {
auto i = std::async(std::launch::async, MessageHandle_, msg.to_string()); auto i = std::async(std::launch::async, MessageHandle_, msg.to_string());
} else } else if(receiverQueue_ != nullptr)
{ {
WHISPER::Message receivedMessage(msg.to_string()); WHISPER::Message receivedMessage(msg.to_string());
receiverQueue_->addElement(receivedMessage); receiverQueue_->addElement(receivedMessage);
} }
} }
listening_ = false; listening_ = false;