From 3006f79883f73cf717713d909813c5ad68e7d7db Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Fri, 11 Nov 2022 14:36:57 +0100 Subject: [PATCH] ADD: Message container and Join Message --- CMakeLists.txt | 15 +++- include/WHISPER/Messages/Join.hpp | 24 +++++ include/WHISPER/Messages/Message.hpp | 88 +++++++++++++++++++ include/WHISPER/Messages/Protos/join.proto | 7 +- include/WHISPER/Messages/Protos/message.proto | 19 ++++ include/WHISPER/threadSafeQueue.hpp | 69 +++++++++++++++ include/WHISPER/whisper.hpp | 65 +++++++++++++- src/WHISPER/Messages/Join.cpp | 53 +++++++++++ src/WHISPER/Messages/Message.cpp | 72 +++++++++++++++ src/WHISPER/threadSafeQueue.cpp | 60 +++++++++++++ src/WHISPER/whisper.cpp | 54 ++++++++++++ src/main.cpp | 26 +++++- src/protos/join.proto | 16 ---- 13 files changed, 545 insertions(+), 23 deletions(-) create mode 100644 include/WHISPER/Messages/Join.hpp create mode 100644 include/WHISPER/Messages/Message.hpp create mode 100644 include/WHISPER/Messages/Protos/message.proto create mode 100644 include/WHISPER/threadSafeQueue.hpp create mode 100644 src/WHISPER/Messages/Join.cpp create mode 100644 src/WHISPER/Messages/Message.cpp create mode 100644 src/WHISPER/threadSafeQueue.cpp delete mode 100644 src/protos/join.proto diff --git a/CMakeLists.txt b/CMakeLists.txt index af75ad0..0e816b9 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,7 +40,20 @@ endif() add_library(whisper-com STATIC include/WHISPER/whisper.hpp src/WHISPER/whisper.cpp - + + include/WHISPER/threadSafeQueue.hpp + src/WHISPER/threadSafeQueue.cpp + + + include/WHISPER/Messages/Message.hpp + src/WHISPER/Messages/Message.cpp + + include/WHISPER/Messages/Join.hpp + src/WHISPER/Messages/Join.cpp + + + ../include/WHISPER/Messages/Protos/message.pb.cc + ) diff --git a/include/WHISPER/Messages/Join.hpp b/include/WHISPER/Messages/Join.hpp new file mode 100644 index 0000000..2900a4c --- /dev/null +++ b/include/WHISPER/Messages/Join.hpp @@ -0,0 +1,24 @@ +#pragma once + + +#include +#include +#include +#include + +namespace WHISPER { + + class Join : public Message + { + private: + messages::join::Join joinMessage; + public: + std::uint32_t port; + std::string sourceAddr; + + Join(std::string receivedMessage); + + Join(std::uint32_t deviceID,std::uint32_t topic, MsgType Type, SourceType src,std::uint32_t port, std::string addr); + + }; +} \ No newline at end of file diff --git a/include/WHISPER/Messages/Message.hpp b/include/WHISPER/Messages/Message.hpp new file mode 100644 index 0000000..f1e7569 --- /dev/null +++ b/include/WHISPER/Messages/Message.hpp @@ -0,0 +1,88 @@ +#pragma once + + + + +#include "google/protobuf/any.pb.h" +#include "google/protobuf/message.h" +#include +#include +#include +#include + + +namespace WHISPER { + + enum MsgType : int32_t + { + /// message to shutdown all participants + SHUTDOWN = 0, + /// on participant is joined + JOIN=1, + /// participant is leaving + LEAVE, + /// owntrack informaton + OWN_TRACK, + /// raw track message + RAW_TRACK, + /// simple data + SIMPLE + }; // enum class EventType + + enum SourceType : int32_t + { + SIMCOMTROLER, + /// + SHIP, + /// + SENSOR, + /// + EFEKTOR, + /// + GATEWAY, + /// + + + }; // enum class EventType + + + + + class Message{ + private: + std::string payloadString_; + std::shared_ptr payload_; + + public: + Message()=default; + + Message(std::int32_t deviceId, std::uint32_t topic, MsgType Type,SourceType src); + Message(std::string msg); + + std::string getPayloadString(); + + std::uint32_t topic_; + std::int32_t msgType_; + std::int32_t sourceType_; + std::int32_t deviceId_; + + void addPayLoad(std::shared_ptr any); + void addPayLoad(std::string any); + + std::string serialize(); + + + protected: + + messages::header::Message msg; + + }; + + + + + + + + +} diff --git a/include/WHISPER/Messages/Protos/join.proto b/include/WHISPER/Messages/Protos/join.proto index 4f5881d..9cdf207 100644 --- a/include/WHISPER/Messages/Protos/join.proto +++ b/include/WHISPER/Messages/Protos/join.proto @@ -3,14 +3,13 @@ syntax = "proto3"; package messages.join; -import "google/protobuf/timestamp.proto"; +// import "google/protobuf/timestamp.proto"; // [END declaration] // [START messages] message Join { - int32 id = 1; - int32 port = 2; - string address = 3; + uint32 port = 1; + string srcAddress = 2; } diff --git a/include/WHISPER/Messages/Protos/message.proto b/include/WHISPER/Messages/Protos/message.proto new file mode 100644 index 0000000..4e27233 --- /dev/null +++ b/include/WHISPER/Messages/Protos/message.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + + +import "google/protobuf/any.proto"; + +package messages.header; + +// [START messages] +message Message { + int32 topic = 1; + int32 msgType = 2; + int32 sourceType = 3; + int32 sourceID = 4; + + repeated google.protobuf.Any payload = 5; + +} + + diff --git a/include/WHISPER/threadSafeQueue.hpp b/include/WHISPER/threadSafeQueue.hpp new file mode 100644 index 0000000..b5249dc --- /dev/null +++ b/include/WHISPER/threadSafeQueue.hpp @@ -0,0 +1,69 @@ +#pragma once +/* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +*/ + +/** +* @file +* @copyright MPLv2 +*/ +#include +#include +#include +#include +#include + +namespace WHISPER{ + /** + * @class BasicMessageQueue + * + * This class encapsulates shared storage which is a queue and protect it + * with a mutex. In Addition there is a condition variable to notify other + * threads if there is new data in the queue. + */ + template + class threadSafeQueue + { + private: + std::queue> q; + std::mutex mx; + std::condition_variable condVar; + public: + /** + * @brief default constructor for class BasicMessageQueue + */ + threadSafeQueue(); + + /** + * @brief appends given message to queue + * + * The storage is protected by a mutex. Makes a notify_one call to inform + * in waiting thread that there is a new message in the queue. + * + * @param msg - incoming message + */ + void addElement( std::unique_ptr msg ); + + /** + * @brief gets the fron message from the queue + * + * This method gets the front message in the queue and deletes it from the + * queue. The storage is protected by a mutex. If the queue is empty the + * function throws an exception. + * + * @result msg - returns the front message from the queue + * @throws length_error - if queue is empty this method throws length_error exception + */ + std::unique_ptr getElement(); + + /** + * @brief method size + * + * @result size - current size of the message queue + */ + unsigned int size(); + + }; //BasicMessageQueue + +}; // namespace BC \ No newline at end of file diff --git a/include/WHISPER/whisper.hpp b/include/WHISPER/whisper.hpp index c5c69f1..7048f3a 100644 --- a/include/WHISPER/whisper.hpp +++ b/include/WHISPER/whisper.hpp @@ -10,8 +10,21 @@ * @copyright 2022 MPLv2 */ +#include +#include +#include #define ZMQ_BUILD_DRAFT_API 1 #include +#include +#include +#include +#include + +#include + + +#include +#include /** * @brief namespace for all whisper-com related components @@ -20,7 +33,57 @@ namespace WHISPER { // Add datatypes here class whispercomm{ - + + private: + /// device ID + std::uint32_t ownID_; + /// device Type + SourceType ownDeviceType_; + /// all topics this service subscribed + std::vector subscribedTopics; + + /// show if the service is connected or not + std::atomic connected; + + /// attribute identifying this service as a gateway and all packets should be forwarded + bool gateway = false; + + /// variable for holding the receive thread identifier + std::thread receiveThread; + + /// variable indicating if the receiveThread should be stopped + std::atomic stopReceiveThread; + + std::shared_ptr> receiveQueue = nullptr; + + + + void receive(); + + + public: + whispercomm(std::uint32_t id, SourceType owndevicetype):ownID_(id),ownDeviceType_(owndevicetype) + { + receiveQueue = std::make_shared>(); + }; + void connect(std::shared_ptr> receiver); + void publish(std::string msg); + void disconnect(); + void subscribe(std::string topic); + void unsubscribe(std::string topic); + + protected: + + void addMsgToReceiverQueue(std::string); + + + virtual void derivedConnect() = 0; + virtual void derivedDisconnect() = 0; + virtual void derivedPublish(std::string msg) = 0; + virtual void derivedReceive() = 0; + + + diff --git a/src/WHISPER/Messages/Join.cpp b/src/WHISPER/Messages/Join.cpp new file mode 100644 index 0000000..1778b51 --- /dev/null +++ b/src/WHISPER/Messages/Join.cpp @@ -0,0 +1,53 @@ +#include "WHISPER/Messages/Message.hpp" +#include "WHISPER/Messages/Protos/join.pb.h" +#include +#include + + + + +namespace WHISPER { + + Join::Join(std::string receivedMessage){ + msg = messages::header::Message(); + try { + msg.ParseFromString(receivedMessage); + topic_ = msg.topic(); + sourceType_ = msg.sourcetype(); + msgType_ = msg.msgtype(); + joinMessage = messages::join::Join(); + if ( msg.payload_size()) { + if (msg.payload().begin()->Is()) { + msg.payload().begin()->UnpackTo(&joinMessage); + } + } + port = joinMessage.port(); + sourceAddr = joinMessage.srcaddress(); + + + + } catch (const std::exception& e) { + LOG_S(ERROR)<(); + test->PackFrom(joinMessage); + addPayLoad(test); + + } + + + + +} \ No newline at end of file diff --git a/src/WHISPER/Messages/Message.cpp b/src/WHISPER/Messages/Message.cpp new file mode 100644 index 0000000..f6716db --- /dev/null +++ b/src/WHISPER/Messages/Message.cpp @@ -0,0 +1,72 @@ + + +#include "WHISPER/Messages/Protos/join.pb.h" +#include +#include + + + + +namespace WHISPER { + Message::Message(std::string stringMessage) + { + msg = messages::header::Message(); + try { + msg.ParseFromString(stringMessage); + topic_ = msg.topic(); + sourceType_ = msg.sourcetype(); + msgType_ = msg.msgtype(); + + + } catch (const std::exception& e) { + LOG_S(ERROR)< payload){ + payload_ = payload; + LOG_S(INFO)<< "pack any size in message class "<ByteSizeLong(); + + msg.add_payload()->CopyFrom(*payload_); + + LOG_S(INFO)<< "pack any size in message class "<ByteSizeLong(); + + } + + + + std::string Message::serialize(){ + + std::string serializedMessage; + LOG_S(INFO)< +* @date 25.06.2020 +* @copyright MPLv2 +*/ +#include +#include +#include + +template +WHISPER::threadSafeQueue threadSafeQueue(){ + +} + + + +// appends the given message to the message queue +template +void WHISPER::threadSafeQueue< T>::addElement(std::unique_ptr elem) +{ + std::unique_lock lk(mx); + q.push( std::move(elem) ); + lk.unlock(); + condVar.notify_one(); +} + +// gets a message from the queue +template +std::unique_ptr WHISPER::threadSafeQueue::getElement() +{ + std::unique_lock lk(mx); + if( 0 == q.size() ) + { + lk.unlock(); + throw std::length_error("Empty Queue\n"); + } + std::unique_ptr elem = std::move( q.front() ); + q.pop(); + lk.unlock(); + return std::move(elem); +} + + +//returns the size of the message queue +template +unsigned int WHISPER::threadSafeQueue::size() +{ + std::unique_lock lk(mx); + unsigned int size = q.size(); + lk.unlock(); + return size; +} \ No newline at end of file diff --git a/src/WHISPER/whisper.cpp b/src/WHISPER/whisper.cpp index 8d076ed..b04bd6e 100644 --- a/src/WHISPER/whisper.cpp +++ b/src/WHISPER/whisper.cpp @@ -15,5 +15,59 @@ */ namespace WHISPER { + + + + void whispercomm::connect(std::shared_ptr> receiver) + { + this->receiveQueue = receiver; + this->derivedConnect(); + receiveThread = std::thread(&WHISPER::whispercomm::receive,this); + + } + + void whispercomm::disconnect() + { + + } + + void whispercomm::publish(std::string msg){ + this->derivedPublish(msg); + } + + void whispercomm::receive(){ + this->derivedReceive(); + connected = true; + + while(!stopReceiveThread) + { + derivedReceive(); + } + + } + + + void whispercomm::subscribe(std::string topic) + { + this->subscribedTopics.push_back(topic); + } + + void whispercomm::unsubscribe(std::string topic) + { + for (std::vector::iterator it = subscribedTopics.begin(); it != subscribedTopics.end();it++) + { + if (*it == topic) + { + it = subscribedTopics.erase(it); + } + } + + } + + void whispercomm::addMsgToReceiverQueue(std::string) + { + + } + // Add datatypes here } // namespace WHISPER \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index f7fb09d..bdad6d8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,12 +1,36 @@ +#include "WHISPER/Messages/Message.hpp" #include +#include +#include +#include -#include "../include/WHISPER/Messages/Protos/join.pb.h" int main() { GOOGLE_PROTOBUF_VERIFY_VERSION; + + WHISPER::Join join(1,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"127.0.0.1"); + std::string msg = join.serialize(); + LOG_S(INFO)<<" serialized Message is "<