first version of InternalUDPService
This commit is contained in:
167
src/WHISPER/InternalUDPService.cpp
Normal file
167
src/WHISPER/InternalUDPService.cpp
Normal file
@@ -0,0 +1,167 @@
|
||||
|
||||
|
||||
#include "WHISPER/InternalUDPService.hpp"
|
||||
#include "WHISPER/Messages/Message.hpp"
|
||||
#include "WHISPER/whisper.hpp"
|
||||
#include "zmq.hpp"
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <unistd.h>
|
||||
#include <utility>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
|
||||
namespace WHISPER {
|
||||
|
||||
|
||||
InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string address):
|
||||
whispercomm(id, owndevicetype),port_(port),address_(address)
|
||||
{
|
||||
ctx = zmq::context_t(2);
|
||||
sender = zmq::socket_t(ctx,zmq::socket_type::radio);
|
||||
receiver = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::dish);
|
||||
// receiver = zmq::socket_t(ctx,zmq::socket_type::dish);
|
||||
LOG_S(INFO)<< "sockets are created";
|
||||
}
|
||||
|
||||
void InternalUDPService::derivedConnect()
|
||||
{
|
||||
ownReceivingPort_ = checkPort(port_);
|
||||
LOG_S(INFO)<< "could bind to port:" << ownReceivingPort_;
|
||||
|
||||
|
||||
if(ownReceivingPort_ == port_){
|
||||
std::string portAsString = std::to_string(ownReceivingPort_);
|
||||
receiver->bind("udp://*:"+portAsString);
|
||||
}else if (ownReceivingPort_ == 0) {
|
||||
throw std::invalid_argument( " receiver cant bind to port " );
|
||||
}else {
|
||||
std::string portAsString = std::to_string(ownReceivingPort_);
|
||||
receiver->bind("udp://*:"+portAsString);
|
||||
}
|
||||
|
||||
receiver->join("management");
|
||||
|
||||
|
||||
std::string sendingPort = std::to_string(port_);
|
||||
LOG_S(INFO)<< sendingPort;
|
||||
|
||||
sender.connect("udp://"+address_+":"+sendingPort);
|
||||
}
|
||||
|
||||
|
||||
void InternalUDPService::derivedDisconnect()
|
||||
{
|
||||
sender.close();
|
||||
receiver->close();
|
||||
}
|
||||
void InternalUDPService::derivedPublish(std::string msg,std::string topic)
|
||||
{
|
||||
zmq::message_t tmpmsg(msg.size());
|
||||
memcpy (tmpmsg.data (), msg.data(), msg.size());
|
||||
const char *tmp = topic.c_str();
|
||||
tmpmsg.set_group(topic.c_str());
|
||||
sender.send(tmpmsg,zmq::send_flags::none);
|
||||
|
||||
}
|
||||
void InternalUDPService::derivedReceive()
|
||||
{
|
||||
|
||||
LOG_S(INFO)<<"zmq receiving funk";
|
||||
zmq::message_t msg;
|
||||
|
||||
|
||||
receiver->recv(msg,zmq::recv_flags::none);
|
||||
// LOG_S(INFO)<<result.value();
|
||||
std::string data;
|
||||
memcpy (data.data (), msg.data(), msg.size());
|
||||
addMsgToReceiverQueue(WHISPER::Message(data));
|
||||
|
||||
|
||||
}
|
||||
|
||||
void InternalUDPService::derivedSubscribe(std::string topic)
|
||||
{
|
||||
// receiver.join(topic.c_str());
|
||||
}
|
||||
void InternalUDPService::derivedUnsubscribe(std::string topic)
|
||||
{
|
||||
// receiver.leave(topic.c_str());
|
||||
}
|
||||
|
||||
std::uint16_t InternalUDPService::checkPort(std::uint16_t port)
|
||||
{
|
||||
std::uint16_t localPort = port;
|
||||
|
||||
struct sockaddr_in local;
|
||||
socklen_t localLen;
|
||||
int sockfd;
|
||||
// Creating socket file descriptor
|
||||
if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) {
|
||||
perror("socket creation failed");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
local.sin_family = AF_INET; // IPv4
|
||||
local.sin_addr.s_addr = INADDR_ANY;
|
||||
local.sin_port = htons(port);
|
||||
int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local));
|
||||
|
||||
|
||||
|
||||
if (err == -1 && errno == EADDRINUSE)
|
||||
{
|
||||
// no we are not a gateway
|
||||
setGateway(false);
|
||||
|
||||
// set port to auto on binding
|
||||
local.sin_port = 0;
|
||||
err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local));
|
||||
|
||||
|
||||
if(err == -1)
|
||||
{
|
||||
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket "));
|
||||
}
|
||||
|
||||
// if we could bind to a port we have to get the port value from the socket
|
||||
memset((char *) &local, 0, sizeof(local));
|
||||
localLen=sizeof(local);
|
||||
err=getsockname(sockfd, (struct sockaddr *) &local, &localLen);
|
||||
if(err == -1)
|
||||
{
|
||||
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr errno=") + std::to_string(errno));
|
||||
}
|
||||
|
||||
//sometime getsock return port 0, i do not know why yet (dmeyer)
|
||||
//therefore i check for port 0 and try again
|
||||
localPort=ntohs(local.sin_port);
|
||||
if (localPort==0)
|
||||
{
|
||||
err=getsockname(sockfd, (struct sockaddr *) &local, &localLen);
|
||||
if(err == -1)
|
||||
{
|
||||
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr "));
|
||||
}
|
||||
localPort=ntohs(local.sin_port);
|
||||
}
|
||||
}
|
||||
else if(err == -1)
|
||||
{
|
||||
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket"));
|
||||
}
|
||||
|
||||
|
||||
close(sockfd);
|
||||
|
||||
|
||||
return localPort;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
#include "WHISPER/Messages/Message.hpp"
|
||||
#include "WHISPER/Messages/Protos/join.pb.h"
|
||||
#include <WHISPER/Messages/Join.hpp>
|
||||
#include <memory>
|
||||
|
||||
@@ -16,6 +15,7 @@ namespace WHISPER {
|
||||
sourceType_ = msg.sourcetype();
|
||||
msgType_ = msg.msgtype();
|
||||
joinMessage = messages::join::Join();
|
||||
|
||||
if ( msg.payload_size()) {
|
||||
if (msg.payload().begin()->Is<messages::join::Join>()) {
|
||||
msg.payload().begin()->UnpackTo(&joinMessage);
|
||||
@@ -44,6 +44,7 @@ namespace WHISPER {
|
||||
auto test = std::make_shared<google::protobuf::Any>();
|
||||
test->PackFrom(joinMessage);
|
||||
addPayLoad(test);
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
|
||||
|
||||
#include "WHISPER/Messages/Protos/join.pb.h"
|
||||
#include <WHISPER/Messages/Message.hpp>
|
||||
#include <memory>
|
||||
|
||||
@@ -47,6 +46,7 @@ namespace WHISPER {
|
||||
|
||||
void Message::addPayLoad(std::shared_ptr<google::protobuf::Any> payload){
|
||||
payload_ = payload;
|
||||
|
||||
LOG_S(INFO)<< "pack any size in message class "<<payload_->ByteSizeLong();
|
||||
|
||||
msg.add_payload()->CopyFrom(*payload_);
|
||||
|
||||
@@ -1,60 +1,60 @@
|
||||
/*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
// /*
|
||||
// * This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// * License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// * file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
// */
|
||||
|
||||
/**
|
||||
* @file BasicMessageQueue.cpp
|
||||
* @brief class which encapsulates queue
|
||||
* @author Christina Sander <christina.sander@hsu-hh.de>
|
||||
* @date 25.06.2020
|
||||
* @copyright MPLv2
|
||||
*/
|
||||
#include <WHISPER/threadSafeQueue.hpp>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
// /**
|
||||
// * @file BasicMessageQueue.cpp
|
||||
// * @brief class which encapsulates queue
|
||||
// * @author Christina Sander <christina.sander@hsu-hh.de>
|
||||
// * @date 25.06.2020
|
||||
// * @copyright MPLv2
|
||||
// */
|
||||
// #include <WHISPER/threadSafeQueue.hpp>
|
||||
// #include <iostream>
|
||||
// #include <string>
|
||||
|
||||
template<class T>
|
||||
WHISPER::threadSafeQueue<T> threadSafeQueue(){
|
||||
// template<class T>
|
||||
// WHISPER::threadSafeQueue<T> threadSafeQueue(){
|
||||
|
||||
}
|
||||
// }
|
||||
|
||||
|
||||
|
||||
// appends the given message to the message queue
|
||||
template<class T>
|
||||
void WHISPER::threadSafeQueue< T>::addElement(std::unique_ptr<T> elem)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mx);
|
||||
q.push( std::move(elem) );
|
||||
lk.unlock();
|
||||
condVar.notify_one();
|
||||
}
|
||||
// // appends the given message to the message queue
|
||||
// template<class T>
|
||||
// void WHISPER::threadSafeQueue< T>::addElement(std::unique_ptr<T> elem)
|
||||
// {
|
||||
// std::unique_lock<std::mutex> lk(mx);
|
||||
// q.push( std::move(elem) );
|
||||
// lk.unlock();
|
||||
// condVar.notify_one();
|
||||
// }
|
||||
|
||||
// gets a message from the queue
|
||||
template<class T>
|
||||
std::unique_ptr<T> WHISPER::threadSafeQueue<T>::getElement()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mx);
|
||||
if( 0 == q.size() )
|
||||
{
|
||||
lk.unlock();
|
||||
throw std::length_error("Empty Queue\n");
|
||||
}
|
||||
std::unique_ptr<std::string> elem = std::move( q.front() );
|
||||
q.pop();
|
||||
lk.unlock();
|
||||
return std::move(elem);
|
||||
}
|
||||
// // gets a message from the queue
|
||||
// template<class T>
|
||||
// std::unique_ptr<T> WHISPER::threadSafeQueue<T>::getElement()
|
||||
// {
|
||||
// std::unique_lock<std::mutex> lk(mx);
|
||||
// if( 0 == q.size() )
|
||||
// {
|
||||
// lk.unlock();
|
||||
// throw std::length_error("Empty Queue\n");
|
||||
// }
|
||||
// std::unique_ptr<std::string> elem = std::move( q.front() );
|
||||
// q.pop();
|
||||
// lk.unlock();
|
||||
// return std::move(elem);
|
||||
// }
|
||||
|
||||
|
||||
//returns the size of the message queue
|
||||
template<class T>
|
||||
unsigned int WHISPER::threadSafeQueue<T>::size()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mx);
|
||||
unsigned int size = q.size();
|
||||
lk.unlock();
|
||||
return size;
|
||||
}
|
||||
// //returns the size of the message queue
|
||||
// template<class T>
|
||||
// unsigned int WHISPER::threadSafeQueue<T>::size()
|
||||
// {
|
||||
// std::unique_lock<std::mutex> lk(mx);
|
||||
// unsigned int size = q.size();
|
||||
// lk.unlock();
|
||||
// return size;
|
||||
// }
|
||||
@@ -10,6 +10,8 @@
|
||||
*/
|
||||
|
||||
#include <WHISPER/whisper.hpp>
|
||||
#include <cstddef>
|
||||
#include <iterator>
|
||||
/**
|
||||
* @brief namespace for all whisper-com related components
|
||||
*/
|
||||
@@ -18,27 +20,30 @@ namespace WHISPER
|
||||
|
||||
|
||||
|
||||
void whispercomm::connect(std::shared_ptr<threadSafeQueue<std::string>> receiver)
|
||||
void whispercomm::connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver)
|
||||
{
|
||||
this->receiveQueue = receiver;
|
||||
this->derivedConnect();
|
||||
|
||||
receiveThread = std::thread(&WHISPER::whispercomm::receive,this);
|
||||
|
||||
}
|
||||
|
||||
void whispercomm::disconnect()
|
||||
{
|
||||
|
||||
derivedDisconnect();
|
||||
}
|
||||
|
||||
void whispercomm::publish(std::string msg){
|
||||
this->derivedPublish(msg);
|
||||
void whispercomm::publish(std::string msg,std::string topic){
|
||||
this->derivedPublish(msg,topic);
|
||||
}
|
||||
|
||||
void whispercomm::receive(){
|
||||
this->derivedReceive();
|
||||
|
||||
connected = true;
|
||||
|
||||
derivedReceive();
|
||||
|
||||
while(!stopReceiveThread)
|
||||
{
|
||||
derivedReceive();
|
||||
@@ -50,6 +55,7 @@ namespace WHISPER
|
||||
void whispercomm::subscribe(std::string topic)
|
||||
{
|
||||
this->subscribedTopics.push_back(topic);
|
||||
derivedSubscribe(topic);
|
||||
}
|
||||
|
||||
void whispercomm::unsubscribe(std::string topic)
|
||||
@@ -61,12 +67,23 @@ namespace WHISPER
|
||||
it = subscribedTopics.erase(it);
|
||||
}
|
||||
}
|
||||
derivedUnsubscribe(topic);
|
||||
|
||||
}
|
||||
|
||||
void whispercomm::addMsgToReceiverQueue(std::string)
|
||||
void whispercomm::addMsgToReceiverQueue(WHISPER::Message msg)
|
||||
{
|
||||
if (this->receiveQueue != nullptr)
|
||||
{
|
||||
|
||||
this->receiveQueue->addElement(msg);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void whispercomm::setGateway(bool val)
|
||||
{
|
||||
gateway = val;
|
||||
}
|
||||
|
||||
// Add datatypes here
|
||||
|
||||
Reference in New Issue
Block a user