FIX: fixed some bugs

ADD: added raw_track message frame
This commit is contained in:
Henry Winkel
2022-11-15 15:55:24 +01:00
parent cf1800ffba
commit 8fcf4244b0
25 changed files with 1799 additions and 158 deletions

View File

@@ -2,17 +2,21 @@
#include "WHISPER/InternalUDPService.hpp"
#include "WHISPER/Messages/Join.hpp"
#include "WHISPER/Messages/Leave.hpp"
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/whisper.hpp"
#include "zmq.hpp"
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <type_traits>
#include <unistd.h>
#include <utility>
#include <sys/socket.h>
#include <netinet/in.h>
#include <type_traits>
namespace WHISPER {
@@ -37,43 +41,62 @@ namespace WHISPER {
if(ownReceivingPort_ == port_){
std::string portAsString = std::to_string(ownReceivingPort_);
receiver->bind("udp://*:"+portAsString);
}else if (ownReceivingPort_ == 0) {
throw std::invalid_argument( " receiver cant bind to port " );
}else {
LOG_S(INFO)<< "new port to bind: " << ownReceivingPort_;
loopbackSocket = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::radio);
loopbackSocket->connect("udp://127.0.0.255:"+sendingPort);
std::string portAsString = std::to_string(ownReceivingPort_);
receiver->bind("udp://*:"+portAsString);
LOG_S(WARNING)<<"local receiving port is: " << ownReceivingPort_;
}
receiver->join("management");
///subscribe to the basic message domain
///zmq upd sockets only work with pub sub
subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
LOG_S(INFO)<< sendingPort;
///used to set a custom time out to the socket
receiver->set(zmq::sockopt::rcvtimeo,100);
sender.connect("udp://"+destinationAdress_+":"+sendingPort);
// sender.set(zmq::sockopt::multicast_loop ,1);
LOG_S(INFO)<<"own ID: "<< getOwnID();
WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
}
void InternalUDPService::derivedDisconnect()
{
{
WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]);
sender.close();
loopbackSocket = nullptr;
receiver->close();
}
void InternalUDPService::derivedPublish(std::string msg,std::string topic)
{
zmq::message_t tmpmsg(msg.begin(),msg.end());
tmpmsg.set_group(topic.c_str());
sender.send(tmpmsg,zmq::send_flags::none);
zmq::message_t localmsg(msg.begin(),msg.end());
tmpmsg.set_group(topic.c_str());
sendToLocalClients(msg, topic);
if (loopbackSocket != nullptr) {
LOG_S(INFO)<<"loop back send";
zmq::message_t tmpmsg(msg.begin(),msg.end());
tmpmsg.set_group(topic.c_str());
loopbackSocket->send(tmpmsg,zmq::send_flags::none);
@@ -82,69 +105,132 @@ namespace WHISPER {
}
void InternalUDPService::derivedReceive()
{
zmq::recv_result_t res;
zmq::message_t msg;
receiver->recv(msg,zmq::recv_flags::none);
std::string message = msg.to_string();
WHISPER::Message receivedMessage(message);
///res is the size of the received message or error; cant figure out what value is set when ther is an error
res = receiver->recv(msg,zmq::recv_flags::none);
if (receivedMessage.deviceId_ != getOwnID()) {
int msgType = receivedMessage.msgType_;
if (msgType == WHISPER::JOIN) {
if (res.has_value() && res.value() >= 0)
{
WHISPER::Join join(message);
if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") {
bool clientAllreadyIn = false;
if (localclients.size() > 0) {
for (std::vector<std::shared_ptr<localClient>>::iterator it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == join.port) {
clientAllreadyIn = true;
std::string message = msg.to_string();
WHISPER::Message receivedMessage(message);
sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]);
if (receivedMessage.deviceId_ != getOwnID()) {
int msgType = receivedMessage.msgType_;
if (msgType == WHISPER::JOIN) {
WHISPER::Join join(message);
// LOG_S(INFO)<<"joined id "<<join.deviceId_<< " joined addr: "<<join.sourceAddr << " joined port: " << join.port;
// LOG_S(INFO)<<"my address "<<myAdress_<< " joined addr: "<<join.sourceAddr ;
if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") {
LOG_S(INFO)<<"go for it";
bool clientAllreadyIn = false;
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == join.port) {
clientAllreadyIn = true;
}
}
}
}
if (clientAllreadyIn == false) {
auto client = std::make_shared<localClient>();
client->port = join.port;
client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio);
client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port));
localclients.emplace_back(client);
setGateway(true);
if (clientAllreadyIn == false) {
// localClient client;
auto client = std::make_shared<localClient>();
client->port = join.port;
client->id = join.deviceId_;
client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio);
LOG_S(INFO)<< "udp://"+join.sourceAddr+":" + std::to_string(join.port);
client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port));
localclients.emplace_back(client);
setGateway(true);
LOG_S(INFO)<< "new client joined";
}
}
}else if(msgType == WHISPER::LEAVE)
{
LOG_S(INFO)<<"client left";
WHISPER::Leave Leave(message);
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == Leave.port) {
it->get()->clientSocket.close();
it = localclients.erase(it);
LOG_S(INFO)<<"client left";
}
}
}
if (localclients.size() == 0) {
setGateway(false);
}
}else if(msgType == WHISPER::PING)
{
}else if(msgType == WHISPER::PONG)
{
}else
{
addMsgToReceiverQueue(WHISPER::Message(message));
}
}
}else if(res.has_value() && res.value()== -1 ){
}
}
}else if(msgType == WHISPER::LEAVE)
void InternalUDPService::sendToLocalClients(std::string msg,std::string topic)
{
if (localclients.size() > 0 && isGateway() == true) {
auto tmpMsg = WHISPER::Message(msg);
for (auto it = localclients.begin(); it != localclients.end();it++)
{
}else if(msgType == WHISPER::PING)
{
}else if(msgType == WHISPER::PONG)
{
}else
{
addMsgToReceiverQueue(WHISPER::Message(message));
if (tmpMsg.deviceId_ != it->get()->id) {
zmq::message_t tmp(msg.begin(),msg.end());
tmp.set_group(topic.c_str());
it->get()->clientSocket.send(tmp,zmq::send_flags::none);
}
}
}
}
}
void InternalUDPService::derivedSubscribe(std::string topic)
{
receiver->join(topic.c_str());
receiver->join(topic.c_str());
}
void InternalUDPService::derivedUnsubscribe(std::string topic)
{
receiver->leave(topic.c_str());
}
std::uint16_t InternalUDPService::checkPort(std::uint16_t port)
{
std::uint16_t localPort = port;
@@ -162,14 +248,9 @@ namespace WHISPER {
local.sin_addr.s_addr = INADDR_ANY;
local.sin_port = htons(port);
int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local));
if (err == -1 && errno == EADDRINUSE)
{
// no we are not a gateway
setGateway(false);
// set port to auto on binding
local.sin_port = 0;
err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local));
@@ -210,8 +291,6 @@ namespace WHISPER {
close(sockfd);
LOG_S(WARNING)<< "new assignet port is" << localPort;
return localPort;
}

View File

@@ -15,6 +15,7 @@ namespace WHISPER {
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
joinMessage = messages::join::Join();
deviceId_ = msg.sourceid();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::join::Join>()) {
@@ -33,17 +34,16 @@ namespace WHISPER {
}
WHISPER::Join::Join(std::uint32_t deviceID,std::uint32_t topic, MsgType Type, SourceType src,std::uint32_t port, std::string addr):
Message(deviceID,topic,Type,src),port(port),sourceAddr(addr)
WHISPER::Join::Join(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::JOIN,src),port(port),sourceAddr(addr)
{
joinMessage = messages::join::Join();
joinMessage.set_port(port);
joinMessage.set_srcaddress(sourceAddr);
LOG_S(INFO)<< "join message befor packing:" << joinMessage.ByteSizeLong();
auto test = std::make_shared<google::protobuf::Any>();
test->PackFrom(joinMessage);
addPayLoad(test);
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(joinMessage);
addPayLoad(payloadMessage);
}

View File

@@ -0,0 +1,55 @@
#include "WHISPER/Messages/Message.hpp"
#include <WHISPER/Messages/Leave.hpp>
#include <memory>
namespace WHISPER {
Leave::Leave(std::string receivedMessage){
msg = messages::header::Message();
try {
msg.ParseFromString(receivedMessage);
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
leaveMessage = messages::leave::Leave();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::leave::Leave>()) {
msg.payload().begin()->UnpackTo(&leaveMessage);
}
}
port = leaveMessage.port();
sourceAddr = leaveMessage.srcaddress();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
}
Leave::Leave(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::LEAVE,src),port(port),sourceAddr(addr)
{
leaveMessage = messages::leave::Leave();
leaveMessage.set_port(port);
leaveMessage.set_srcaddress(sourceAddr);
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(leaveMessage);
addPayLoad(payloadMessage);
}
}

View File

@@ -27,7 +27,7 @@ namespace WHISPER {
}
Message::Message(std::int32_t deviceId,std::uint32_t topic, MsgType Type,SourceType src):topic_(topic),sourceType_(src),msgType_(Type){
Message::Message(std::int32_t deviceId, MsgTopics topic, MsgType Type,SourceType src):topic_(topic),sourceType_(src),msgType_(Type){
msg = messages::header::Message();
if(msg.IsInitialized())
@@ -37,6 +37,11 @@ namespace WHISPER {
msg.set_sourcetype(sourceType_);
msg.set_msgtype(msgType_);
}
deviceId_ = deviceId;
topic_ = topic;
sourceType_ = src;
msgType_ = Type;
}
@@ -49,11 +54,7 @@ namespace WHISPER {
void Message::addPayLoad(std::shared_ptr<google::protobuf::Any> payload){
payload_ = payload;
LOG_S(INFO)<< "pack any size in message class "<<payload_->ByteSizeLong();
msg.add_payload()->CopyFrom(*payload_);
LOG_S(INFO)<< "pack any size in message class "<<payload_->ByteSizeLong();
}
@@ -65,7 +66,6 @@ namespace WHISPER {
if (msg.IsInitialized()) {
serializedMessage = msg.SerializeAsString();
}
return serializedMessage;
}

View File

@@ -0,0 +1,56 @@
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Protos/raw_track.pb.h"
#include <WHISPER/Messages/Track.hpp>
#include <memory>
namespace WHISPER {
RawTrack::RawTrack(std::string receivedMessage){
msg = messages::header::Message();
try {
msg.ParseFromString(receivedMessage);
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
trackMessage = messages::raw_track::RawTrack();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::raw_track::RawTrack>()) {
msg.payload().begin()->UnpackTo(&trackMessage);
}
}
trackNo = trackMessage.trackno();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
}
RawTrack::RawTrack(std::uint32_t deviceID,WHISPER::MsgTopics topic, SourceType src,std::uint32_t trackNo):
Message(deviceID,WHISPER::MsgTopics::TRACK,WHISPER::RAW_TRACK,src),trackNo(trackNo)
{
trackMessage = messages::raw_track::RawTrack();
trackMessage.set_trackno(trackNo);
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(trackMessage);
addPayLoad(payloadMessage);
}
}

View File

@@ -35,13 +35,17 @@ namespace WHISPER
void whispercomm::disconnect()
{
stopReceiveThread = true;
if(receiveThread.joinable()) receiveThread.join();
derivedDisconnect();
}
void whispercomm::publish(std::string msg,std::string topic){
this->derivedPublish(msg,topic);
}
void whispercomm::receive(){
connected = true;
@@ -49,17 +53,20 @@ namespace WHISPER
derivedReceive();
while(!stopReceiveThread)
{
derivedReceive();
}
{
derivedReceive();
}
}
void whispercomm::subscribe(std::string topic)
{
{
this->subscribedTopics.push_back(topic);
derivedSubscribe(topic);
}
void whispercomm::unsubscribe(std::string topic)
@@ -89,6 +96,14 @@ namespace WHISPER
{
gateway = val;
}
bool whispercomm::isGateway(){
return gateway;
}
SourceType whispercomm::getOwnDeviceType()
{
return ownDeviceType_;
}
// Add datatypes here
} // namespace WHISPER

View File

@@ -1,5 +1,6 @@
#include "WHISPER/InternalUDPService.hpp"
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Track.hpp"
#include "zmq.hpp"
#include <iostream>
#include <loguru.hpp>
@@ -24,70 +25,57 @@ void killHandlerPing(int s) {
}
}
#define ID 1
int main()
{
// GOOGLE_PROTOBUF_VERIFY_VERSION;
// setup signal handler
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = killHandlerPing;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
GOOGLE_PROTOBUF_VERIFY_VERSION;
WHISPER::Join join(1,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"192.168.0.19");
// std::string msg = join.serialize();
// LOG_S(INFO)<<" serialized Message is "<<msg.size();
// LOG_S(INFO)<<msg;
// messages::header::Message proto;
// proto.ParseFromString(msg);
// WHISPER::Message receivedMessage(msg);
// LOG_S(INFO)<<receivedMessage.msgType_;
// switch (receivedMessage.msgType_) {
// case WHISPER::MsgType::JOIN:
// WHISPER::Join receivedJoin(msg);
// LOG_S(INFO)<< "join message data afer reception "<< receivedJoin.port;
// break;
// }
// WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.19");
WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178");
service.connect(receiver);
service.publish(join.serialize(), "management");
service.subscribe("data");
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
// zmq::context_t ctx(2);
// zmq::socket_t sock(ctx,zmq::socket_type::radio);
// sock.connect("udp://127.0.0.255:8000");
// std::string string = "hello world form 2";
// zmq::message_t msg(string.begin(),string.end());
// // memcpy (msg.data (), string.data(), string.size());
// LOG_S(INFO)<<"message contains "<<msg.str();
// zmq::context_t ctx(2);
// zmq::socket_t sock(ctx,zmq::socket_type::dish);
// sock.bind("udp://*:8000");
// zmq::message_t temp;
// sock.join("data");
// service.publish(join.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
WHISPER::RawTrack RawTrack(ID,WHISPER::MsgTopics::TRACK,WHISPER::SourceType::SHIP,0001);
int size = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
while (running) {
// zmq::message_t msg(string.begin(),string.end());
// msg.set_group("data");
// sock.send(msg,zmq::send_flags::none);
// service.publish(join.serialize(), "management");
if (size != receiver->size()) {
LOG_S(INFO)<<"received messages " << size;
size = receiver->size();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
service.disconnect();
return 0;
}

View File

@@ -1,3 +1,4 @@
#include "WHISPER/Messages/Message.hpp"
#include "zmq.hpp"
#include <iostream>
#include <loguru.hpp>
@@ -7,6 +8,8 @@
#include <WHISPER/InternalUDPService.hpp>
#include <WHISPER/Messages/Join.hpp>
#include <WHISPER/Messages/Leave.hpp>
/// variable for stopping the application
@@ -24,11 +27,16 @@ void killHandlerPing(int s) {
}
}
#define ID 2
int main()
{
// setup signal handler
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = killHandlerPing;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
// zmq::context_t ctx(2);
// zmq::socket_t sock(ctx,zmq::socket_type::dish);
@@ -36,26 +44,49 @@ int main()
// zmq::message_t temp;
// sock.join("data");
WHISPER::Join join(2,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"192.168.0.19");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.255");
WHISPER::InternalUDPService service(2,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178");
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
// WHISPER::Join join(2,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
// service.publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
service.connect(receiver);
service.publish(join.serialize(), "management");
// zmq::context_t ctx(2);
// zmq::socket_t sock(ctx,zmq::socket_type::radio);
// sock.connect("udp://127.0.0.1:8000");
// std::string string = "hello world form 2";
int msgcount = 0;
while (running) {
// LOG_S(INFO)<<"received messages " << receiver->size();
service.publish(join.serialize(), "management");
// zmq::message_t msg(string.begin(),string.end());
// msg.set_group("management");
// sock.send(msg,zmq::send_flags::none);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
if (msgcount != receiver->size()) {
LOG_S(INFO)<<"received messages " << receiver->size();
auto received = receiver.get()->get();
auto topic = received.topic_;
LOG_S(INFO)<<"message type is: "<<received.msgType_;
if (received.msgType_ == WHISPER::RAW_TRACK) {
}
msgcount = receiver->size();
// service.publish(received.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)topic]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// WHISPER::Leave leave(2,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
// service.publish(leave.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
// LOG_S(INFO)<<"message send";
service.disconnect();
return 0;
}