Squashed 'libs/CommService/' content from commit 7ccc0fc

git-subtree-dir: libs/CommService
git-subtree-split: 7ccc0fce88bbc5969df060058cf0fb57abe3bcf9
This commit is contained in:
Henry Winkel
2022-09-15 09:53:53 +02:00
commit cc67e4840f
799 changed files with 179487 additions and 0 deletions

View File

@@ -0,0 +1,405 @@
#include "CommService/Message.hpp"
#include "CommService/PayLoads/Join.hpp"
#include "CommService/PayLoads/Leave.hpp"
#include <CommService/CommService.hpp>
#include <cstring>
#include <loguru.hpp>
#include <memory>
namespace CommService {
void CommService::connect(){
struct sockaddr_in local; // structure for holding local port and ip information
socklen_t localLen;
int err=0;
// initially we are a gateway
setGateway(true);
// fetch us an inet udp socket
inetSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (inetSocket == -1) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not create UDP socket"));
}
int optval = 1;
err=setsockopt(inetSocket, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval));
if(err)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not set SO_BOADCAST"));
}
// create local port and ip information
memset((char *) &local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(broadcastPort);
// listening to all addresses is simpler at the moment
// TODO: think about making it configurable
local.sin_addr.s_addr = htonl(INADDR_ANY);
// if we are the first application using this port on this host
// our localPort is the broadcastPort
localPort=broadcastPort;
// try binding socket to the boradcast port, if it fails, there is already an application
// listening to messages and we have to choose a different port and register as a proxy client
err = bind(inetSocket, (const struct sockaddr *) &local, sizeof(local));
if (err == -1 && errno == EADDRINUSE)
{
// no we are not a gateway
setGateway(false);
// set port to auto on binding
local.sin_port = 0;
err = bind(inetSocket, (const struct sockaddr *) &local, sizeof(local));
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket "));
}
// if we could bind to a port we have to get the port value from the socket
memset((char *) &local, 0, sizeof(local));
localLen=sizeof(local);
err=getsockname(inetSocket, (struct sockaddr *) &local, &localLen);
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr errno=") + std::to_string(errno));
}
//therefore i check for port 0 and try again
localPort=ntohs(local.sin_port);
if (localPort==0)
{
err=getsockname(inetSocket, (struct sockaddr *) &local, &localLen);
if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr "));
}
localPort=ntohs(local.sin_port);
}
}
else if(err == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket"));
}
receiveThread = std::thread(&CommService::receiveThreadFunc,this);
Message m;
m.topic = messageTopic::JOIN;
m.sourceID = deviceID_;
// wait until service is really connected
while(!connected)
{
}
LOG_S(INFO)<< "connected";
// publish the hotplug join message
publish(m);
}
void CommService::disconnect(){
close(inetSocket);
connected = false;
}
void CommService::publish(Message m){
LOG_S(INFO)<< "publish msg";
struct sockaddr_in remote;
// if a HotPlugJoin message is published we add our network specific payload
if (m.topic == JOIN)
{
PayLoads::Join join;
memset(join.networkLayerAddress,0,16);
memcpy(join.networkLayerAddress,srcIp.c_str(),srcIp.size());
join.transportLayerAddress=localPort;
LOG_S(INFO)<<"network layer address " <<join.networkLayerAddress;
m.setData(join);
// if a HotPlugLeave message is published we add our network specific payload
} else if (m.topic == LEAVE)
{
PayLoads::Leave leave;
memset(leave.networkLayerAddress,0,16);
memcpy(leave.networkLayerAddress,srcIp.c_str(),srcIp.size());
leave.transportLayerAddress=localPort;
m.setData(leave);
}
// create buffer to transmit
std::vector<unsigned char> data = m.toByteVector();
// setup destionation address and port
memset((char *) &remote, 0, sizeof(remote));
remote.sin_family = AF_INET;
remote.sin_port = htons(broadcastPort);
if (inet_aton(dstIp.c_str(), &remote.sin_addr) == 0) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("wrong ip address"));
}
// send message to network
if (sendto(inetSocket, data.data(), data.size(), 0, (const struct sockaddr *)&remote, sizeof(remote)) == -1) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("error transmitting message"));
}
// if we are not on a loopback interface we have to transmit it to ourself, so every application
// on our host can receive it
if (srcIp != "127.0.0.1")
{
if (inet_aton(srcIp.c_str(), &remote.sin_addr) == 0) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("wrong ip address"));
}
if (sendto(inetSocket, data.data(), data.size(), 0, (const struct sockaddr *)&remote, sizeof(remote)) == -1) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("error transmitting message"));
}
}
// if we are a proxy to other applications on the host
if (localPort==broadcastPort)
{
// we have to transmit the message to all our local clients
std::vector<LocalUDPClients>::iterator it;
memset((char *) &remote, 0, sizeof(remote));
remote.sin_family = AF_INET;
if (inet_aton(srcIp.c_str(), &remote.sin_addr) == 0) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("wrong ip address"));
}
mutexClients.lock();
for (it=clients.begin(); it != clients.end(); ++it)
{
if (m.sourceID != (*it).id)
{
remote.sin_port = htons((*it).port);
// send message to network
if (sendto(inetSocket, data.data(), data.size(), 0, (const struct sockaddr *)&remote, sizeof(remote)) == -1) {
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("error transmitting message"));
}
}
}
mutexClients.unlock();
}
}
void CommService::receiveThreadFunc(){
try {
asyncReceive();
} catch (const std::exception& e) {
LOG_S(INFO)<< "WHAT: "<<e.what();
}
connected=true;
LOG_S(INFO)<< "receiving msgs";
while(!stopReceiveThread)
{
try {
asyncReceive();
} catch (const std::exception& e) {
LOG_S(INFO)<< "WHAT: "<<e.what();
}
}
receiving = false;
LOG_S(INFO)<<"receive thread stoped";
}
void CommService::asyncReceive(){
// we use a buffer for a maximum sized UDP packet
unsigned char buf[1500];
struct sockaddr_in remote;
int lenRemote = sizeof(remote);
int size=0;
fd_set rfds;
struct timeval tv;
int retval=0;
// we use select to check if data is available
FD_ZERO(&rfds);
FD_SET(inetSocket, &rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
retval = select(inetSocket+1, &rfds, NULL, NULL, &tv);
if (retval == -1)
{
throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("error calling select"));
}
else if (retval) {
if ( (size=recvfrom(inetSocket, buf, 1500, 0, (struct sockaddr *) &remote, (socklen_t*)&lenRemote)) > 0) {
std::vector<unsigned char> data;
data.resize(data.size() + size);
memcpy(&data[data.size() - size], buf, size);
CommService::processMsg(&data);
}
}
}
void CommService::processMsg(std::vector<unsigned char> *m){
struct sockaddr_in remote;
Message msg(*m);
LOG_S(INFO)<< "msg topic =" << msg.topic;
if (msg.topic == messageTopic::LEAVE) {
LOG_S(INFO)<< "msg topic =" << msg.sourceID;
LOG_S(INFO)<< "own device =" << deviceID_;
}
// if we reveive a HotPlugJoin message and we are a proxy to other applications on the same host
if (msg.topic == messageTopic::JOIN && localPort==broadcastPort && msg.sourceID != deviceID_)
{
// check if we are running on the same networkLayerAddress
PayLoads::Join join(msg.getRawData());
LOG_S(INFO)<<"JOIN Message received from port"<<join.transportLayerAddress;
std::string rIp((const char *)join.networkLayerAddress);
if (srcIp == rIp)
{
LocalUDPClients c;
c.id=msg.sourceID;
c.port=join.transportLayerAddress;
mutexClients.lock();
clients.push_back(c);
mutexClients.unlock();
}
// if we reveive a HotPlugLeave message and we are a proxy to other applications on the same host
} else if (msg.topic == messageTopic::LEAVE && msg.sourceID != deviceID_)
{
PayLoads::Leave leave(msg.getRawData());
LOG_S(INFO)<<"LEAVE Message received from port"<<leave.transportLayerAddress;
// remove port from clients vector
std::vector<LocalUDPClients>::iterator it;
mutexClients.lock();
for (it=clients.begin(); it != clients.end(); ++it)
{
if((*it).port == leave.transportLayerAddress)
{
clients.erase(it);
break;
}
}
mutexClients.unlock();
}else{
// if (msg.topic != messageTopic::JOIN & msg.topic != messageTopic::LEAVE) {
// ReceivingMessageQueue->push(msg);
// }
}
// we have to transmit the message to all our local clients if we are a proxy
if (localPort==broadcastPort)
{
std::vector<LocalUDPClients>::iterator it;
memset((char *) &remote, 0, sizeof(remote));
remote.sin_family = AF_INET;
if (inet_aton(srcIp.c_str(), &remote.sin_addr) == 0) {
throw std::runtime_error("wrong ip address");
}
// create buffer to transmit
std::vector<unsigned char> data = msg.toByteVector();
mutexClients.lock();
for (it=clients.begin(); it != clients.end(); ++it)
{
if ((*it).id != msg.sourceID)
{
remote.sin_port = htons((*it).port);
// send message to network
if (sendto(inetSocket, data.data(), data.size(), 0, (const struct sockaddr *)&remote, sizeof(remote)) == -1) {
throw std::runtime_error("error transmitting message");
}
}
}
mutexClients.unlock();
}
LOG_S(INFO)<< "client size"<<clients.size();
}
std::shared_ptr<ThreadsafeQueue<Message>> CommService::getReceivedMessageQueue(){
return ReceivingMessageQueue;
}
void CommService::setGateway(bool b){
gateway = b;
}
void CommService::start(){
}
void CommService::stop(){
stopReceiveThread = true;
while (receiving == false) {
}
LOG_S(INFO)<<"receive thread stoped";
Message m;
m.topic = messageTopic::LEAVE;
LOG_S(INFO) << "own device = "<<deviceID_;
m.sourceID = deviceID_;
publish(m);
disconnect();
while (connected != false) {
}
}
CommService::~CommService(){
if (connected == true) {
disconnect();
}
}
};

View File

@@ -0,0 +1,98 @@
#include "CommService/Convert.hpp"
#include <CommService/Message.hpp>
#include <stdexcept>
#include <sstream> // std::stringstream
#include <loguru.hpp>
namespace CommService {
Message::Message(){}
/*
* copy constructor
*
*/
Message::Message(const Message &obj) : sourceID(obj.sourceID),
srcMajorType(obj.srcMajorType),
srcMinorType(obj.srcMinorType),
topic(obj.topic)
{
std::vector<unsigned char> v=obj.getRawData();
std::copy(v.begin(), v.end(), std::back_inserter(data));
}
Message::Message(std::vector<unsigned char> v){
if (v.size()<20)
{
std::stringstream error;
error << __PRETTY_FUNCTION__ << ": vector v too short for a message";
throw std::invalid_argument(error.str() );
}
/*
* parse message
*/
sourceID=Convert::fromByteArrayToSimple<std::uint16_t>(v);
v.erase(v.begin(), v.begin()+sizeof(sourceID));
srcMajorType=Convert::fromByteArrayToSimple<deviceMajorType>(v);
v.erase(v.begin(), v.begin()+sizeof(srcMajorType));
srcMinorType=Convert::fromByteArrayToString(v);
v.erase(v.begin(), v.begin()+sizeof(uint32_t)+srcMinorType.length());
topic=Convert::fromByteArrayToSimple<messageTopic>(v);
v.erase(v.begin(), v.begin()+sizeof(topic));
std::copy(v.begin(), v.end(), back_inserter(data));
}
std::vector<unsigned char> Message::toByteVector() const {
std::vector<unsigned char> bytes;
std::vector<unsigned char> convert;
if(data.empty())
{
std::stringstream error;
error << __PRETTY_FUNCTION__ << ": attribute data not set yet";
throw std::invalid_argument(error.str() );
}
convert=Convert::fromSimpleToByteArray(sourceID);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromSimpleToByteArray(srcMajorType);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromStringToByteArray(srcMinorType);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromSimpleToByteArray(topic);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
bytes.insert(std::end(bytes), std::begin(data), std::end(data));
// std::uint32_t crc = CRC::Calculate(bytes.data(), bytes.size(), CRC::CRC_32());
// convert=BC::DataTypes::Convert::fromSimpleToByteArray(crc);
// bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
return bytes;
}
};

View File

@@ -0,0 +1,56 @@
#include <CommService/Convert.hpp>
#include <CommService/Message.hpp>
#include <CommService/PayLoads/Join.hpp>
#include <CommService/transmittable.hpp>
#include <cstring>
#include <vector>
#include <sstream>
#include <loguru.hpp>
namespace CommService {
namespace PayLoads {
/**
* @brief Constructor for generationg a HotPlugJoin object from a received std::vector<unsiged char>
*
* @param v - the std::vector<unsigned char> representing a message object
*/
Join::Join(std::vector<unsigned char> v)
{
/*
* parse the vector
*/
std::memcpy(networkLayerAddress,v.data(),16);
v.erase(v.begin(), v.begin()+sizeof(networkLayerAddress));
transportLayerAddress=Convert::fromByteArrayToSimple<std::uint32_t>(v);
v.erase(v.begin(), v.begin()+sizeof(transportLayerAddress));
}
/**
* @brief converts the whole Payload information into a byte vector
*
* @param payload - returns the byte vector by call by reference
*/
std::vector<unsigned char> Join::toByteVector() const
{
std::vector<unsigned char> bytes;
std::vector<unsigned char> convert;
bytes.resize(16);
std::memcpy(bytes.data(),networkLayerAddress,16);
convert=Convert::fromSimpleToByteArray(transportLayerAddress);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
return bytes;
}
};
};

View File

@@ -0,0 +1,66 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
/**
* @file
* @brief implementation file for the HotPlugLeave payload
* @author Dominik Meyer <dmeyer@hsu-hh.de>
* @date 2019-02-05
* @copyright 2019 no yet defined
*/
#include <CommService/Convert.hpp>
#include <CommService/Message.hpp>
#include <CommService/PayLoads/Leave.hpp>
#include <CommService/transmittable.hpp>
#include <cstring>
#include <vector>
#include <sstream>
namespace CommService {
namespace PayLoads {
/**
* @brief Constructor for generationg a HotPlugLeave object from a received std::vector<unsiged char>
*
* @param v - the std::vector<unsigned char> representing a message object
*/
Leave::Leave(std::vector<unsigned char> v)
{
/*
* parse the vector
*/
std::memcpy(networkLayerAddress,v.data(),16);
v.erase(v.begin(), v.begin()+16);
transportLayerAddress=Convert::fromByteArrayToSimple<uint32_t>(v);
v.erase(v.begin(), v.begin()+sizeof(transportLayerAddress));
}
/**
* @brief converts the whole Payload information into a byte vector
*
* @param payload - returns the byte vector by call by reference
*/
std::vector<unsigned char> Leave::toByteVector() const
{
std::vector<unsigned char> bytes;
std::vector<unsigned char> convert;
bytes.resize(16);
std::memcpy(bytes.data(),networkLayerAddress,16);
convert=Convert::fromSimpleToByteArray(transportLayerAddress);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
return bytes;
}
}
}

View File

@@ -0,0 +1,80 @@
#include "CommService/Convert.hpp"
#include <CommService/PayLoads/Ping.hpp>
#include <CommService/transmittable.hpp>
#include <cstring>
#include <vector>
#include <sstream>
#include <random>
#include <cstdint>
namespace CommService {
namespace PayLoads {
/**
* @brief Basic Constructor for the Ping object. It initializes the messageID to a random number
*
*/
Ping::Ping() : sequenceNr(0), pingTime(0)
{
// fetch us required random number device
std::random_device rd;
std::mt19937 mt(rd());
// set the distribution range of the numbers
std::uniform_int_distribution<uint64_t> dist(1, UINT64_MAX);
// fetch a random number
messageID=dist(mt);
}
/**
* @brief Constructor for generationg a Ping object from a received std::vector<unsiged char>
*
* @param v - the std::vector<unsigned char> representing a message object
*/
Ping::Ping(std::vector<unsigned char> v)
{
/*
* parse the vector
*/
messageID=Convert::fromByteArrayToSimple<uint64_t>(v);
v.erase(v.begin(), v.begin()+sizeof(messageID));
sequenceNr=Convert::fromByteArrayToSimple<uint64_t>(v);
v.erase(v.begin(), v.begin()+sizeof(sequenceNr));
pingTime=Convert::fromByteArrayToSimple<uint64_t>(v);
v.erase(v.begin(), v.begin()+sizeof(sequenceNr));
}
/**
* @brief converts the whole Payload information into a byte vector
*
* @param payload - returns the byte vector by call by reference
*/
std::vector<unsigned char> Ping::toByteVector() const
{
std::vector<unsigned char> bytes;
std::vector<unsigned char> convert;
convert=Convert::fromSimpleToByteArray(messageID);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromSimpleToByteArray(sequenceNr);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert=Convert::fromSimpleToByteArray(pingTime);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
return bytes;
}
};
};

View File

@@ -0,0 +1,70 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
/**
* @file
* @brief implementation file for the Pong payload
* @author Dominik Meyer <dmeyer@hsu-hh.de>
* @date 2019-02-06
* @copyright 2019 no yet defined
*/
#include "CommService/Convert.hpp"
#include <CommService/PayLoads/Pong.hpp>
#include <CommService/transmittable.hpp>
#include <cstring>
#include <vector>
#include <sstream>
namespace CommService {
namespace PayLoads {
/**
* @brief Constructor for generationg a Pong object from a received std::vector<unsiged char>
*
* @param v - the std::vector<unsigned char> representing a message object
*/
Pong::Pong(std::vector<unsigned char> v)
{
/*
* parse the vector
*/
pingMessageID= Convert::fromByteArrayToSimple<uint64_t>(v);
v.erase(v.begin(), v.begin()+sizeof(pingMessageID));
pingTransmissionTime=Convert::fromByteArrayToSimple<int>(v);
v.erase(v.begin(), v.begin()+sizeof(pingTransmissionTime));
sequenceNr=Convert::fromByteArrayToSimple<uint64_t>(v);
v.erase(v.begin(), v.begin()+sizeof(sequenceNr));
}
/**
* @brief converts the whole Payload information into a byte vector
*
* @param payload - returns the byte vector by call by reference
*/
std::vector<unsigned char> Pong::toByteVector() const
{
std::vector<unsigned char> bytes;
std::vector<unsigned char> convert;
convert=Convert::fromSimpleToByteArray(pingMessageID);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromSimpleToByteArray(pingTransmissionTime);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
convert.clear();
convert=Convert::fromSimpleToByteArray(sequenceNr);
bytes.insert(std::end(bytes), std::begin(convert), std::end(convert));
return bytes;
}
};
};

53
src/main.cpp Normal file
View File

@@ -0,0 +1,53 @@
#include "CommService/Message.hpp"
#include <cstdint>
#include <cstring>
#include <iostream>
#include <vector>
#include <csignal>
#include <CommService/Convert.hpp>
#include <CommService/CommService.hpp>
/// variable for stopping the application
bool running = true;
/**
* @brief killhandler to set running to false on CTRL-C
*
* @param s - the signal to manage
*/
void killHandlerPing(int s) {
if (s == SIGINT) {
running = false;
}
}
int main(){
// setup signal handler
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = killHandlerPing;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
CommService::CommService UDPService("127.0.0.255","127.0.0.1",1,CommService::WAR_SHIP,"F124",8000);
UDPService.connect();
auto msgQueue = UDPService.getReceivedMessageQueue();
while (running) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
UDPService.stop();
return 0;
}