ADD: added Listener and Sender

This commit is contained in:
Henry Winkel
2023-02-15 11:15:28 +01:00
parent 97d3a10e8c
commit 01e61cd482
8 changed files with 437 additions and 20 deletions

View File

@@ -46,6 +46,12 @@ add_library(whisper-com STATIC
include/WHISPER/InternalUDPService.hpp include/WHISPER/InternalUDPService.hpp
src/WHISPER/InternalUDPService.cpp src/WHISPER/InternalUDPService.cpp
include/WHISPER/InternalUDPListener.hpp
src/WHISPER/InternalUDPListener.cpp
include/WHISPER/InternalUDPSender.hpp
src/WHISPER/InternalUDPSender.cpp
include/WHISPER/localClients.hpp include/WHISPER/localClients.hpp
src/WHISPER/localClients.cpp src/WHISPER/localClients.cpp
@@ -136,5 +142,9 @@ IF (${TEST_WHISPER_COMMUNICATION_LIBRARY})
target_link_libraries(test_test Catch2::Catch2 whisper-com loguru) target_link_libraries(test_test Catch2::Catch2 whisper-com loguru)
catch_discover_tests(test_test) catch_discover_tests(test_test)
add_executable(test_InternalUDPListener tests/test_InternalUDPListener.cpp)
target_link_libraries(test_InternalUDPListener Catch2::Catch2 whisper-com loguru)
catch_discover_tests(test_InternalUDPListener)
ENDIF() ENDIF()

View File

@@ -0,0 +1,56 @@
#pragma once
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/whisper.hpp"
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <vector>
#include <WHISPER/localClients.hpp>
#define ZMQ_BUILD_DRAFT_API 1
#include <zmq.hpp>
#include <zmq_addon.hpp>
namespace WHISPER {
class InternalUDPListener{
private:
std::uint16_t port_;
//zeromq io contex
zmq::context_t ctx;
///shared pointer to receiving socket
std::shared_ptr<zmq::socket_t> receiverSocket_ = nullptr;
std::thread listeningThread;
std::atomic<bool> listening_ = false;
std::atomic<bool> stoplistening_ = false;
std::vector<WHISPER::MsgTopics> subscribedTopics;
std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiverQueue_;
void listen();
std::uint16_t checkPort(std::uint16_t port);
public:
InternalUDPListener(std::uint16_t port);
~InternalUDPListener();
void connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver);
void start();
void stop();
std::uint16_t getPort();
void subscribe(WHISPER::MsgTopics topic);
void unsubscribe(WHISPER::MsgTopics topic);
};
}

View File

@@ -0,0 +1,39 @@
#pragma once
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/whisper.hpp"
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <vector>
#include <WHISPER/localClients.hpp>
#define ZMQ_BUILD_DRAFT_API 1
#include <zmq.hpp>
#include <zmq_addon.hpp>
namespace WHISPER {
class InternalUDPSender{
private:
/// address to send to
std::string address_;
/// port to send to
std::uint16_t port_;
/// zeromq io contex
zmq::context_t ctx;
/// shared pointer to receiving socket
std::shared_ptr<zmq::socket_t> senderSocket_ = nullptr;
public:
InternalUDPSender(std::string address, std::uint16_t port);
~InternalUDPSender();
void send(WHISPER::Message msg,WHISPER::MsgTopics topic = WHISPER::MANAGEMENT);
};
}

View File

@@ -0,0 +1,185 @@
#include "WHISPER/Messages/Message.hpp"
#include <WHISPER/InternalUDPListener.hpp>
#include <memory>
#include <netinet/in.h>
#include <string>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
namespace WHISPER {
InternalUDPListener::InternalUDPListener(std::uint16_t port)
{
port_ = checkPort(port);
}
InternalUDPListener::~InternalUDPListener()
{
unsubscribe(WHISPER::MsgTopics::MANAGEMENT);
while (subscribedTopics.size() > 0) {
unsubscribe(subscribedTopics.front());
}
if (listening_ == true) {
stop();
}
receiverSocket_->close();
}
void InternalUDPListener::connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver)
{
receiverQueue_ = receiver;
if (receiverSocket_ == nullptr)
{
ctx = zmq::context_t(2);
receiverSocket_ = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::dish);
std::string portAsString = std::to_string(port_);
receiverSocket_->bind("udp://*:"+portAsString);
///used to set a custom time out to the socket
receiverSocket_->set(zmq::sockopt::rcvtimeo,100);
}
subscribe(WHISPER::MsgTopics::MANAGEMENT);
start();
}
void InternalUDPListener::listen()
{
listening_ = true;
while (stoplistening_ == false)
{
zmq::recv_result_t res;
zmq::message_t msg;
///res is the size of the received message or error; cant figure out what value is set when ther is an error
res = receiverSocket_->recv(msg,zmq::recv_flags::none);
// std::string message = msg.to_string();
WHISPER::Message receivedMessage(msg.to_string());
receiverQueue_->addElement(receivedMessage);
}
listening_ = false;
}
void InternalUDPListener::start()
{
if (receiverSocket_ != nullptr)
{
if (listening_ != true)
{
stoplistening_ = false;
listeningThread = std::thread(&InternalUDPListener::listen,this);
while (listening_ != true)
{
}
}
}
}
void InternalUDPListener::stop()
{
stoplistening_ = true;
if (listeningThread.joinable())
{
listeningThread.join();
}
}
void InternalUDPListener::subscribe(WHISPER::MsgTopics topic)
{
subscribedTopics.push_back(topic);
receiverSocket_->join(WHISPER::MsgTopicsMap[topic].c_str());
}
void InternalUDPListener::unsubscribe(WHISPER::MsgTopics topic)
{
try
{
receiverSocket_->leave(WHISPER::MsgTopicsMap[topic].c_str());
for (auto it = subscribedTopics.begin(); it != subscribedTopics.end(); )
{
if (*it == topic)
{
it = subscribedTopics.erase(it);
}else
{
it++;
}
}
} catch(const std::exception& e)
{
LOG_S(ERROR)<<e.what();
}
}
std::uint16_t InternalUDPListener::checkPort(std::uint16_t port)
{
std::uint16_t localPort = port;
struct sockaddr_in local;
socklen_t localLen;
int sockfd;
// Creating socket file descriptor
if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
{
perror("socket creation failed");
exit(EXIT_FAILURE);
}
local.sin_family = AF_INET; // IPv4
local.sin_addr.s_addr = INADDR_ANY;
local.sin_port = htons(port);
int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local));
if (err == -1 && errno == EADDRINUSE)
{
// set port to auto on binding
local.sin_port = 0;
err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local));
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket "));
}
// if we could bind to a port we have to get the port value from the socket
memset((char *) &local, 0, sizeof(local));
localLen=sizeof(local);
err=getsockname(sockfd, (struct sockaddr *) &local, &localLen);
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr errno=") + std::to_string(errno));
}
localPort=ntohs(local.sin_port);
if (localPort==0)
{
err=getsockname(sockfd, (struct sockaddr *) &local, &localLen);
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr "));
}
localPort=ntohs(local.sin_port);
}
}
else if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket"));
}
close(sockfd);
return localPort;
}
}

View File

@@ -0,0 +1,41 @@
#include "WHISPER/Messages/Message.hpp"
#include "zmq.hpp"
#include <WHISPER/InternalUDPSender.hpp>
#include <memory>
#include <string>
namespace WHISPER {
/// address to send to
std::string address_;
/// port to send to
std::uint16_t port_;
/// zeromq io contex
zmq::context_t ctx;
/// shared pointer to receiving socket
std::shared_ptr<zmq::socket_t> senderSocket_ = nullptr;
InternalUDPSender::InternalUDPSender(std::string address, std::uint16_t port):address_(address),port_(port)
{
ctx = zmq::context_t(1);
senderSocket_ = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::radio);
senderSocket_->connect("udp://"+address_+":"+std::to_string(port_));
}
InternalUDPSender::~InternalUDPSender()
{
senderSocket_->close();
}
void InternalUDPSender::send(WHISPER::Message msg,WHISPER::MsgTopics topic)
{
std::string MessageString = msg.serialize();
zmq::message_t zmqMsg(MessageString.begin(),MessageString.end());
zmqMsg.set_group(WHISPER::MsgTopicsMap[topic].c_str());
senderSocket_->send(zmqMsg,zmq::send_flags::none);
}
}

View File

@@ -39,7 +39,8 @@ namespace WHISPER {
InternalUDPService::~InternalUDPService() InternalUDPService::~InternalUDPService()
{ {
if (this->isConnected() == true) { if (this->isConnected() == true)
{
disconnect(); disconnect();
} }
@@ -55,9 +56,11 @@ namespace WHISPER {
std::string portAsString = std::to_string(ownReceivingPort_); std::string portAsString = std::to_string(ownReceivingPort_);
receiver->bind("udp://*:"+portAsString); receiver->bind("udp://*:"+portAsString);
}else if (ownReceivingPort_ == 0) { }else if (ownReceivingPort_ == 0)
{
throw std::invalid_argument( " receiver cant bind to port " ); throw std::invalid_argument( " receiver cant bind to port " );
}else { }else
{
loopbackSocket = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::radio); loopbackSocket = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::radio);
loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); loopbackSocket->connect("udp://127.0.0.255:"+sendingPort);
@@ -115,9 +118,8 @@ namespace WHISPER {
{ {
zmq::message_t tmpmsg(msg.begin(),msg.end()); zmq::message_t tmpmsg(msg.begin(),msg.end());
tmpmsg.set_group(topic.c_str()); tmpmsg.set_group(topic.c_str());
sender.send(tmpmsg,zmq::send_flags::none); sender.send(tmpmsg,zmq::send_flags::none);
zmq::message_t localmsg(msg.begin(),msg.end()); zmq::message_t localmsg(msg.begin(),msg.end());
tmpmsg.set_group(topic.c_str()); tmpmsg.set_group(topic.c_str());
@@ -148,11 +150,12 @@ namespace WHISPER {
sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]); sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]);
if (receivedMessage.deviceId_ != getOwnID()) { if (receivedMessage.deviceId_ != getOwnID())
{
int msgType = receivedMessage.msgType_; int msgType = receivedMessage.msgType_;
if (msgType == WHISPER::JOIN) { if (msgType == WHISPER::JOIN)
{
WHISPER::Join join(message); WHISPER::Join join(message);
// LOG_S(INFO)<<"joined id "<<join.deviceId_<< " joined addr: "<<join.sourceAddr << " joined port: " << join.port; // LOG_S(INFO)<<"joined id "<<join.deviceId_<< " joined addr: "<<join.sourceAddr << " joined port: " << join.port;
@@ -161,15 +164,18 @@ namespace WHISPER {
if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") {
LOG_S(INFO)<<"go for it"; LOG_S(INFO)<<"go for it";
bool clientAllreadyIn = false; bool clientAllreadyIn = false;
if (localclients.size() > 0) { if (localclients.size() > 0)
{
for (auto it = localclients.begin(); it != localclients.end();it++) for (auto it = localclients.begin(); it != localclients.end();it++)
{ {
if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_) { if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_)
{
clientAllreadyIn = true; clientAllreadyIn = true;
} }
} }
} }
if (clientAllreadyIn == false) { if (clientAllreadyIn == false)
{
// localClient client; // localClient client;
auto tmpAddr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); auto tmpAddr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
@@ -201,7 +207,8 @@ namespace WHISPER {
if (localclients.size() > 0) { if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++) for (auto it = localclients.begin(); it != localclients.end();it++)
{ {
if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_)) { if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_))
{
it->get()->getClientSocket()->close(); it->get()->getClientSocket()->close();
it = localclients.erase(it); it = localclients.erase(it);
LOG_S(INFO)<<"client left"; LOG_S(INFO)<<"client left";
@@ -226,11 +233,13 @@ namespace WHISPER {
{ {
WHISPER::Pong pong(message); WHISPER::Pong pong(message);
if (localclients.size() > 0) { if (localclients.size() > 0)
{
for (auto it = localclients.begin(); it != localclients.end();it++) for (auto it = localclients.begin(); it != localclients.end();it++)
{ {
if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid()) { if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid())
{
it->get()->setLastResponse(std::time(nullptr)); it->get()->setLastResponse(std::time(nullptr));
} }
} }
@@ -248,11 +257,13 @@ namespace WHISPER {
void InternalUDPService::sendToLocalClients(std::string msg,std::string topic) void InternalUDPService::sendToLocalClients(std::string msg,std::string topic)
{ {
if (localclients.size() > 0 && isGateway() == true) { if (localclients.size() > 0 && isGateway() == true)
{
auto tmpMsg = WHISPER::Message(msg); auto tmpMsg = WHISPER::Message(msg);
for (auto it = localclients.begin(); it != localclients.end();it++) for (auto it = localclients.begin(); it != localclients.end();it++)
{ {
if (tmpMsg.deviceId_ != it->get()->getID()) { if (tmpMsg.deviceId_ != it->get()->getID())
{
zmq::message_t tmp(msg.begin(),msg.end()); zmq::message_t tmp(msg.begin(),msg.end());
tmp.set_group(topic.c_str()); tmp.set_group(topic.c_str());
it->get()->getClientSocket()->send(tmp,zmq::send_flags::none); it->get()->getClientSocket()->send(tmp,zmq::send_flags::none);
@@ -286,11 +297,13 @@ namespace WHISPER {
} }
void InternalUDPService::stopChecker() void InternalUDPService::stopChecker()
{ {
while (clientCheckerisRunning == true ) { while (clientCheckerisRunning == true )
{
stopClientChecker = true; stopClientChecker = true;
// std::this_thread::sleep_for(std::chrono::milliseconds(100)); // std::this_thread::sleep_for(std::chrono::milliseconds(100));
LOG_S(INFO)<<"waiting for checker thread"; LOG_S(INFO)<<"waiting for checker thread";
if (clientsCheckerThread.joinable() == true ) { if (clientsCheckerThread.joinable() == true )
{
clientsCheckerThread.join(); clientsCheckerThread.join();
} }
} }
@@ -302,7 +315,8 @@ namespace WHISPER {
while(stopClientChecker == false) while(stopClientChecker == false)
{ {
if (localclients.size() > 0) { if (localclients.size() > 0)
{
WHISPER::Ping ping(getParentID(),getOwnID(),getOwnDeviceType(),port_); WHISPER::Ping ping(getParentID(),getOwnID(),getOwnDeviceType(),port_);
sendToLocalClients(ping.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); sendToLocalClients(ping.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);

View File

@@ -0,0 +1,72 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
/**
* @file
* @copyright 2022 MPLv2
*/
/**
* @defgroup tests
*/
#include "WHISPER/InternalUDPSender.hpp"
#include "WHISPER/Messages/stringData.hpp"
#include <string>
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
#include "WHISPER/InternalUDPListener.hpp"
#include "WHISPER/Messages/Message.hpp"
#include "zmq.hpp"
#include "google/protobuf/message.h"
#include <memory>
/**
* @brief brief test description
* @ingroup group
*/
SCENARIO("A test scenario","[keywords]")
{
GIVEN("Preliminaries")
{
auto *listener = new WHISPER::InternalUDPListener(8000);
auto sender = std::make_shared<WHISPER::InternalUDPSender>("127.0.0.1",8000);
WHEN("doing something")
{
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
listener->connect(receiver) ;
listener->subscribe(WHISPER::DATA);
// listener->start();
// listener->stop();
auto stringMsg = WHISPER::StringData(1,1,WHISPER::SourceType::SENSOR,"hello world");
sender->send(stringMsg,WHISPER::DATA);
LOG_S(INFO)<<"wating";
std::this_thread::sleep_for(std::chrono::milliseconds(200));
LOG_S(INFO)<<"closing";
THEN("expecting something to happen")
{
REQUIRE(receiver->size() > 0);
WHISPER::Message msg;
receiver->get(msg);
LOG_S(INFO)<< msg.msgType_;
WHISPER::StringData data(msg.serialize());
REQUIRE(data.data_ == "hello world");
delete listener;
REQUIRE(receiver.unique() == true );
} // THEN
} // WHEN
} // GIVEN
} // SCENARIO

View File

@@ -36,7 +36,7 @@ SCENARIO("A test scenario","[keywords]")
{ {
WHISPER::InternalUDPService service(0,1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1"); WHISPER::InternalUDPService service(0,1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>(); auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
service.connect(receiver) ; service.connect(receiver) ;
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); // service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
int i = 0; int i = 0;