ADD: added new basic messages and added the parentID as an part of the identifier

This commit is contained in:
Henry Winkel
2023-01-19 16:03:58 +01:00
parent 49bdb28535
commit 73b15d235a
32 changed files with 2140 additions and 140 deletions

View File

@@ -4,6 +4,8 @@
#include "WHISPER/Messages/Join.hpp"
#include "WHISPER/Messages/Leave.hpp"
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Ping.hpp"
#include "WHISPER/Messages/Pong.hpp"
#include "WHISPER/whisper.hpp"
#include <vector>
#include <zmq.hpp>
@@ -23,8 +25,8 @@
namespace WHISPER {
InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress,std::string myAdress):
whispercomm(id, owndevicetype),port_(port),destinationAdress_(destinationAdress),myAdress_(myAdress)
InternalUDPService::InternalUDPService(std::uint32_t parentid,std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress,std::string myAdress):
whispercomm(parentid,id, owndevicetype),port_(port),destinationAdress_(destinationAdress),myAdress_(myAdress)
{
ctx = zmq::context_t(2);
sender = zmq::socket_t(ctx,zmq::socket_type::radio);
@@ -76,7 +78,7 @@ namespace WHISPER {
LOG_S(INFO)<<"own ID: "<< getOwnID();
WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
WHISPER::Join join(getParentID(),getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
@@ -85,7 +87,7 @@ namespace WHISPER {
void InternalUDPService::derivedDisconnect()
{
WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
WHISPER::Leave Leave(getParentID(),getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]);
unsubscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
@@ -168,6 +170,7 @@ namespace WHISPER {
auto client = std::make_shared<localClient>();
client->port = join.port;
client->id = join.deviceId_;
client->parentid = join.parentId_;
client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio);
@@ -190,7 +193,7 @@ namespace WHISPER {
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == Leave.port || it->get()->id == Leave.deviceId_) {
if (it->get()->port == Leave.port || (it->get()->id == Leave.deviceId_ && it->get()->parentid == Leave.deviceId_)) {
it->get()->clientSocket.close();
it = localclients.erase(it);
LOG_S(INFO)<<"client left";
@@ -204,9 +207,24 @@ namespace WHISPER {
}else if(msgType == WHISPER::PING)
{
if (receivedMessage.deviceId_ != getOwnID() && receivedMessage.parentId_ != getParentID())
{
WHISPER::Pong pong(getParentID(),getOwnID(),getOwnDeviceType(),ownReceivingPort_);
this->publish(pong.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
}
}else if(msgType == WHISPER::PONG)
{
WHISPER::Pong pong(message);
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (pong.deviceId_ == it->get()->id && pong.parentId_ == it->get()->parentid) {
it->get()->lastResponse = std::time(nullptr);
}
}
}
}else
{
@@ -232,6 +250,7 @@ namespace WHISPER {
}
}

View File

@@ -16,6 +16,7 @@ namespace WHISPER {
msgType_ = msg.msgtype();
joinMessage = messages::join::Join();
deviceId_ = msg.sourceid();
parentId_ = msg.parentid();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::join::Join>()) {
@@ -26,7 +27,6 @@ namespace WHISPER {
sourceAddr = joinMessage.srcaddress();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
@@ -34,8 +34,8 @@ namespace WHISPER {
}
WHISPER::Join::Join(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::JOIN,src),port(port),sourceAddr(addr)
WHISPER::Join::Join(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::JOIN,src),port(port),sourceAddr(addr)
{
joinMessage = messages::join::Join();
joinMessage.set_port(port);

View File

@@ -14,6 +14,7 @@ namespace WHISPER {
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
parentId_ = msg.parentid();
leaveMessage = messages::leave::Leave();
@@ -34,8 +35,8 @@ namespace WHISPER {
}
Leave::Leave(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::LEAVE,src),port(port),sourceAddr(addr)
Leave::Leave(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr):
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::LEAVE,src),port(port),sourceAddr(addr)
{
leaveMessage = messages::leave::Leave();

View File

@@ -17,6 +17,7 @@ namespace WHISPER {
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
parentId_ = msg.parentid();
} catch (const std::exception& e) {
@@ -27,7 +28,8 @@ namespace WHISPER {
}
Message::Message(std::int32_t deviceId, MsgTopics topic, MsgType Type,SourceType src):topic_(topic),sourceType_(src),msgType_(Type){
Message::Message(std::uint32_t parentId,std::uint32_t deviceId, MsgTopics topic, MsgType Type,SourceType src):
parentId_(parentId),deviceId_(deviceId),topic_(topic),sourceType_(src),msgType_(Type){
msg = messages::header::Message();
if(msg.IsInitialized())
@@ -36,12 +38,9 @@ namespace WHISPER {
msg.set_topic(topic);
msg.set_sourcetype(sourceType_);
msg.set_msgtype(msgType_);
msg.set_parentid(parentId);
}
deviceId_ = deviceId;
topic_ = topic;
sourceType_ = src;
msgType_ = Type;
}

View File

@@ -0,0 +1,54 @@
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Protos/ping.pb.h"
#include <WHISPER/Messages/Ping.hpp>
#include <memory>
namespace WHISPER {
Ping::Ping(std::string receivedMessage){
msg = messages::header::Message();
try {
msg.ParseFromString(receivedMessage);
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
pingMessage = messages::ping::Ping();
deviceId_ = msg.sourceid();
parentId_ = msg.parentid();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::ping::Ping>()) {
msg.payload().begin()->UnpackTo(&pingMessage);
}
}
port_ = pingMessage.port();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
}
WHISPER::Ping::Ping(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port):
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PING,src),port_(port)
{
pingMessage = messages::ping::Ping();
pingMessage.set_port(port_);
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(pingMessage);
addPayLoad(payloadMessage);
}
}

View File

@@ -0,0 +1,56 @@
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Protos/pong.pb.h"
#include <WHISPER/Messages/Pong.hpp>
#include <memory>
namespace WHISPER {
Pong::Pong(std::string receivedMessage){
msg = messages::header::Message();
try {
msg.ParseFromString(receivedMessage);
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
message_ = messages::pong::Pong();
deviceId_ = msg.sourceid();
parentId_ = msg.parentid();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::pong::Pong>()) {
msg.payload().begin()->UnpackTo(&message_);
}
}
port_ = message_.port();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
}
WHISPER::Pong::Pong(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port):
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PING,src),port_(port)
{
// message_ = messages::pong::Pong();
if (message_.IsInitialized()) {
message_.set_port(port_);
}
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(message_);
addPayLoad(payloadMessage);
}
}

View File

@@ -0,0 +1,54 @@
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Protos/stringData.pb.h"
#include <WHISPER/Messages/stringData.hpp>
#include <memory>
namespace WHISPER {
StringData::StringData(std::string receivedMessage){
msg = messages::header::Message();
try {
msg.ParseFromString(receivedMessage);
topic_ = msg.topic();
sourceType_ = msg.sourcetype();
msgType_ = msg.msgtype();
message_ = messages::stringData::StringData();
deviceId_ = msg.sourceid();
parentId_ = msg.parentid();
if ( msg.payload_size()) {
if (msg.payload().begin()->Is<messages::stringData::StringData>()) {
msg.payload().begin()->UnpackTo(&message_);
}
}
data_ = message_.data();
} catch (const std::exception& e) {
LOG_S(ERROR)<<e.what();
}
}
WHISPER::StringData::StringData(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::string data):
Message(parentID,deviceID,WHISPER::MsgTopics::DATA,WHISPER::STRINGDATA,src),data_(data)
{
message_ = messages::stringData::StringData();
message_.set_data(data);
auto payloadMessage = std::make_shared<google::protobuf::Any>();
payloadMessage->PackFrom(message_);
addPayLoad(payloadMessage);
}
}

View File

@@ -23,7 +23,11 @@ namespace WHISPER
{
return ownID_;
}
std::uint32_t whispercomm::getParentID()
{
return parentID_;
}
void whispercomm::connect(std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiver)
{

View File

@@ -1,5 +1,6 @@
#include "WHISPER/InternalUDPService.hpp"
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/stringData.hpp"
#include <iostream>
#include <loguru.hpp>
@@ -43,14 +44,14 @@ int main()
// WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178");
WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1");
service.connect(receiver);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
// service.publish(join.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
WHISPER::StringData data(0,1,WHISPER::SHIP,"hello world");
service.publish(data.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
@@ -61,18 +62,12 @@ int main()
// service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
// while (running) {
// // zmq::message_t msg(string.begin(),string.end());
// // msg.set_group("data");
// // sock.send(msg,zmq::send_flags::none);
while (running) {
service.publish(data.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
// // if (size != receiver->size()) {
// // LOG_S(INFO)<<"received messages " << size;
// // size = receiver->size();
// // }
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
// }
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
service.disconnect();

View File

@@ -1,4 +1,5 @@
#include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/stringData.hpp"
#include "zmq.hpp"
#include <iostream>
#include <loguru.hpp>
@@ -47,7 +48,7 @@ int main()
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(2,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178");
WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1");
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
@@ -67,14 +68,15 @@ int main()
while (running) {
if (msgcount != receiver->size()) {
LOG_S(INFO)<<"received messages " << receiver->size();
auto received = receiver.get()->get();
auto topic = received.topic_;
LOG_S(INFO)<<"message type is: "<<received.msgType_;
if (received.msgType_ == WHISPER::RAW_TRACK) {
WHISPER::Message msg;
auto received = receiver.get()->get(msg);
if (received == true) {
if (msg.msgType_ == WHISPER::MsgType::STRINGDATA) {
WHISPER::StringData data(msg.serialize());
LOG_S(INFO)<<data.data_;
}
}
msgcount = receiver->size();
// service.publish(received.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)topic]);