diff --git a/CMakeLists.txt b/CMakeLists.txt index 82af62d..d81dd66 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,16 +56,23 @@ add_library(whisper-com STATIC include/WHISPER/Messages/Join.hpp src/WHISPER/Messages/Join.cpp + + include/WHISPER/Messages/Leave.hpp + src/WHISPER/Messages/Leave.cpp + + include/WHISPER/Messages/Track.hpp + src/WHISPER/Messages/Track.cpp - include/WHISPER/Messages/Protos/message.pb.h include/WHISPER/Messages/Protos/message.pb.cc - include/WHISPER/Messages/Protos/join.pb.h - include/WHISPER/Messages/Protos/join.pb.cc + include/WHISPER/Messages/Protos/join.pb.cc + + include/WHISPER/Messages/Protos/leave.pb.cc + include/WHISPER/Messages/Protos/raw_track.pb.cc ) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c220ad5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +# # Get the base Ubuntu image from Docker Hub +# FROM debian:latest + +# # Update apps on the base image +# RUN apt-get -y update && apt-get install -y + +# # Install the Clang compiler +# RUN apt-get -y install clang + +# # Copy the current folder which contains C++ source code to the Docker image under /usr/src +# COPY . /usr/src/dockertest1 + +# # Specify the working directory +# WORKDIR /usr/src/dockertest1 + +# # Use Clang to compile the Test.cpp source file +# RUN clang++ -o Test Test.cpp + +# # Run the output program from the previous step +# CMD ["./Test"] \ No newline at end of file diff --git a/include/WHISPER/InternalUDPService.hpp b/include/WHISPER/InternalUDPService.hpp index fd94105..8292699 100644 --- a/include/WHISPER/InternalUDPService.hpp +++ b/include/WHISPER/InternalUDPService.hpp @@ -16,8 +16,11 @@ namespace WHISPER { struct localClient{ short port; + std::string addr; + std::uint32_t id; zmq::socket_t clientSocket; - }; + + }; @@ -27,18 +30,35 @@ namespace WHISPER { private: + /// ip address of the destination std::string destinationAdress_; + /// my own ipadress std::string myAdress_; + /// sending port std::uint16_t port_; + /// port wich i bind to std::uint16_t ownReceivingPort_; + ///zeromq io contex zmq::context_t ctx; + /// socket for sending zmq::socket_t sender; + ///shared pointer to receiving socket std::shared_ptr receiver = nullptr; + /// shared pointer to loopback sending socket std::shared_ptr loopbackSocket = nullptr; + /// vector of local udp clients + std::list> localclients; + - std::vector> localclients; + std::atomic stopReceiving = false; + + /** + * @brief checks if the given port is already is use + * @param std:uint16 port + * @param returns the original port or a new free port + */ std::uint16_t checkPort(std::uint16_t port); void derivedConnect() override; @@ -48,10 +68,12 @@ namespace WHISPER { void derivedSubscribe(std::string topic) override; void derivedUnsubscribe(std::string topic) override; + void sendToLocalClients(std::string msg,std::string topic); + public: InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress, std::string myAdress); diff --git a/include/WHISPER/Messages/Join.hpp b/include/WHISPER/Messages/Join.hpp index 2900a4c..669568d 100644 --- a/include/WHISPER/Messages/Join.hpp +++ b/include/WHISPER/Messages/Join.hpp @@ -18,7 +18,7 @@ namespace WHISPER { Join(std::string receivedMessage); - Join(std::uint32_t deviceID,std::uint32_t topic, MsgType Type, SourceType src,std::uint32_t port, std::string addr); + Join(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr); }; } \ No newline at end of file diff --git a/include/WHISPER/Messages/Leave.hpp b/include/WHISPER/Messages/Leave.hpp new file mode 100644 index 0000000..ac993f8 --- /dev/null +++ b/include/WHISPER/Messages/Leave.hpp @@ -0,0 +1,24 @@ +#pragma once + + +#include +#include +#include +#include + +namespace WHISPER { + + class Leave : public Message + { + private: + messages::leave::Leave leaveMessage; + public: + std::uint32_t port; + std::string sourceAddr; + + Leave(std::string receivedMessage); + + Leave(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr); + + }; +} \ No newline at end of file diff --git a/include/WHISPER/Messages/Message.hpp b/include/WHISPER/Messages/Message.hpp index d9b0b59..4edb725 100644 --- a/include/WHISPER/Messages/Message.hpp +++ b/include/WHISPER/Messages/Message.hpp @@ -6,6 +6,7 @@ #include "google/protobuf/any.pb.h" #include "google/protobuf/message.h" #include +#include #include #include #include @@ -13,7 +14,7 @@ namespace WHISPER { - enum MsgType : int32_t + enum MsgType : int32_t { /// message to shutdown all participants SHUTDOWN = 0, @@ -33,6 +34,24 @@ namespace WHISPER { SIMPLE }; // enum class EventType + + enum MsgTopics : uint32_t + { + MANAGEMENT, + DATA, + COMMANDS, + TRACK + }; + + static std::mapMsgTopicsMap = + { + {MsgTopics::MANAGEMENT, "Management"}, + {MsgTopics::DATA, "Data"}, + {MsgTopics::COMMANDS, "Commands"}, + {MsgTopics::TRACK, "Track"}, + + }; + enum SourceType : int32_t { SIMCOMTROLER, @@ -49,36 +68,56 @@ namespace WHISPER { }; // enum class EventType - - - + /** + * @brief the message class is the basic sending an receiving class + * it is part of every message an every received string is first deserilized as a Message + * + */ class Message{ private: - std::string payloadString_; + /// the serialized payload + std::string payloadString_; + /// shared pointer to the payload protobuf message std::shared_ptr payload_; public: Message()=default; - Message(std::int32_t deviceId, std::uint32_t topic, MsgType Type,SourceType src); + Message(std::int32_t deviceId, MsgTopics topic, MsgType Type,SourceType src); Message(std::string msg); + /** + * @brief returns the payload string + * @return std::string + */ std::string getPayloadString(); + ///topic of the message for pub sub std::uint32_t topic_; + /// WHISPER::MsgType ot the payload std::int32_t msgType_; + /// WHISPER::SourceType of the sender std::int32_t sourceType_; + /// id of the sender std::int32_t deviceId_; - - + /** + * @brief returns the serialized message + * @return std::string + * + */ std::string serialize(); protected: + /** + * @brief adds the payload with type of shared_prt of protbuf::any + * @param std::shared_ptr + */ void addPayLoad(std::shared_ptr any); // void addPayLoad(std::string any); + /// protobuf message; our header message messages::header::Message msg; }; diff --git a/include/WHISPER/Messages/Protos/leave.pb.cc b/include/WHISPER/Messages/Protos/leave.pb.cc new file mode 100644 index 0000000..d99a3dd --- /dev/null +++ b/include/WHISPER/Messages/Protos/leave.pb.cc @@ -0,0 +1,353 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: leave.proto + +#include "leave.pb.h" + +#include +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/extension_set.h" +#include "google/protobuf/wire_format_lite.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/reflection_ops.h" +#include "google/protobuf/wire_format.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" +PROTOBUF_PRAGMA_INIT_SEG +namespace _pb = ::PROTOBUF_NAMESPACE_ID; +namespace _pbi = ::PROTOBUF_NAMESPACE_ID::internal; +namespace messages { +namespace leave { +PROTOBUF_CONSTEXPR Leave::Leave( + ::_pbi::ConstantInitialized): _impl_{ + /*decltype(_impl_.srcaddress_)*/{&::_pbi::fixed_address_empty_string, ::_pbi::ConstantInitialized{}} + , /*decltype(_impl_.port_)*/0u + , /*decltype(_impl_._cached_size_)*/{}} {} +struct LeaveDefaultTypeInternal { + PROTOBUF_CONSTEXPR LeaveDefaultTypeInternal() : _instance(::_pbi::ConstantInitialized{}) {} + ~LeaveDefaultTypeInternal() {} + union { + Leave _instance; + }; +}; + +PROTOBUF_ATTRIBUTE_NO_DESTROY PROTOBUF_CONSTINIT + PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 LeaveDefaultTypeInternal _Leave_default_instance_; +} // namespace leave +} // namespace messages +static ::_pb::Metadata file_level_metadata_leave_2eproto[1]; +static constexpr const ::_pb::EnumDescriptor** + file_level_enum_descriptors_leave_2eproto = nullptr; +static constexpr const ::_pb::ServiceDescriptor** + file_level_service_descriptors_leave_2eproto = nullptr; +const uint32_t TableStruct_leave_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE( + protodesc_cold) = { + ~0u, // no _has_bits_ + PROTOBUF_FIELD_OFFSET(::messages::leave::Leave, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + ~0u, // no _inlined_string_donated_ + ~0u, // no _split_ + ~0u, // no sizeof(Split) + PROTOBUF_FIELD_OFFSET(::messages::leave::Leave, _impl_.port_), + PROTOBUF_FIELD_OFFSET(::messages::leave::Leave, _impl_.srcaddress_), +}; + +static const ::_pbi::MigrationSchema + schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + { 0, -1, -1, sizeof(::messages::leave::Leave)}, +}; + +static const ::_pb::Message* const file_default_instances[] = { + &::messages::leave::_Leave_default_instance_._instance, +}; +const char descriptor_table_protodef_leave_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + "\n\013leave.proto\022\016messages.leave\")\n\005Leave\022\014" + "\n\004port\030\001 \001(\r\022\022\n\nsrcAddress\030\002 \001(\tb\006proto3" +}; +static ::absl::once_flag descriptor_table_leave_2eproto_once; +const ::_pbi::DescriptorTable descriptor_table_leave_2eproto = { + false, + false, + 80, + descriptor_table_protodef_leave_2eproto, + "leave.proto", + &descriptor_table_leave_2eproto_once, + nullptr, + 0, + 1, + schemas, + file_default_instances, + TableStruct_leave_2eproto::offsets, + file_level_metadata_leave_2eproto, + file_level_enum_descriptors_leave_2eproto, + file_level_service_descriptors_leave_2eproto, +}; + +// This function exists to be marked as weak. +// It can significantly speed up compilation by breaking up LLVM's SCC +// in the .pb.cc translation units. Large translation units see a +// reduction of more than 35% of walltime for optimized builds. Without +// the weak attribute all the messages in the file, including all the +// vtables and everything they use become part of the same SCC through +// a cycle like: +// GetMetadata -> descriptor table -> default instances -> +// vtables -> GetMetadata +// By adding a weak function here we break the connection from the +// individual vtables back into the descriptor table. +PROTOBUF_ATTRIBUTE_WEAK const ::_pbi::DescriptorTable* descriptor_table_leave_2eproto_getter() { + return &descriptor_table_leave_2eproto; +} +// Force running AddDescriptors() at dynamic initialization time. +PROTOBUF_ATTRIBUTE_INIT_PRIORITY2 +static ::_pbi::AddDescriptorsRunner dynamic_init_dummy_leave_2eproto(&descriptor_table_leave_2eproto); +namespace messages { +namespace leave { +// =================================================================== + +class Leave::_Internal { + public: +}; + +Leave::Leave(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned) + : ::PROTOBUF_NAMESPACE_ID::Message(arena, is_message_owned) { + SharedCtor(arena, is_message_owned); + // @@protoc_insertion_point(arena_constructor:messages.leave.Leave) +} +Leave::Leave(const Leave& from) + : ::PROTOBUF_NAMESPACE_ID::Message() { + Leave* const _this = this; (void)_this; + new (&_impl_) Impl_{ + decltype(_impl_.srcaddress_){} + , decltype(_impl_.port_){} + , /*decltype(_impl_._cached_size_)*/{}}; + + _internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); + _impl_.srcaddress_.InitDefault(); + #ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + #endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING + if (!from._internal_srcaddress().empty()) { + _this->_impl_.srcaddress_.Set(from._internal_srcaddress(), + _this->GetArenaForAllocation()); + } + _this->_impl_.port_ = from._impl_.port_; + // @@protoc_insertion_point(copy_constructor:messages.leave.Leave) +} + +inline void Leave::SharedCtor( + ::_pb::Arena* arena, bool is_message_owned) { + (void)arena; + (void)is_message_owned; + new (&_impl_) Impl_{ + decltype(_impl_.srcaddress_){} + , decltype(_impl_.port_){0u} + , /*decltype(_impl_._cached_size_)*/{} + }; + _impl_.srcaddress_.InitDefault(); + #ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + #endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING +} + +Leave::~Leave() { + // @@protoc_insertion_point(destructor:messages.leave.Leave) + if (auto *arena = _internal_metadata_.DeleteReturnArena<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>()) { + (void)arena; + return; + } + SharedDtor(); +} + +inline void Leave::SharedDtor() { + GOOGLE_DCHECK(GetArenaForAllocation() == nullptr); + _impl_.srcaddress_.Destroy(); +} + +void Leave::SetCachedSize(int size) const { + _impl_._cached_size_.Set(size); +} + +void Leave::Clear() { +// @@protoc_insertion_point(message_clear_start:messages.leave.Leave) + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + _impl_.srcaddress_.ClearToEmpty(); + _impl_.port_ = 0u; + _internal_metadata_.Clear<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(); +} + +const char* Leave::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx) { + +#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure + while (!ctx->Done(&ptr)) { + uint32_t tag; + ptr = ::_pbi::ReadTag(ptr, &tag); + switch (tag >> 3) { + // uint32 port = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 8)) { + _impl_.port_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // string srcAddress = 2; + case 2: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 18)) { + auto str = _internal_mutable_srcaddress(); + ptr = ::_pbi::InlineGreedyStringParser(str, ptr, ctx); + CHK_(ptr); + CHK_(::_pbi::VerifyUTF8(str, "messages.leave.Leave.srcAddress")); + } else + goto handle_unusual; + continue; + default: + goto handle_unusual; + } // switch + handle_unusual: + if ((tag == 0) || ((tag & 7) == 4)) { + CHK_(ptr); + ctx->SetLastTag(tag); + goto message_done; + } + ptr = UnknownFieldParse( + tag, + _internal_metadata_.mutable_unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(), + ptr, ctx); + CHK_(ptr != nullptr); + } // while +message_done: + return ptr; +failure: + ptr = nullptr; + goto message_done; +#undef CHK_ +} + +uint8_t* Leave::_InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const { + + // @@protoc_insertion_point(serialize_to_array_start:messages.leave.Leave) + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + // uint32 port = 1; + if (this->_internal_port() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray(1, this->_internal_port(), target); + } + + // string srcAddress = 2; + if (!this->_internal_srcaddress().empty()) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( + this->_internal_srcaddress().data(), static_cast(this->_internal_srcaddress().length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, + "messages.leave.Leave.srcAddress"); + target = stream->WriteStringMaybeAliased( + 2, this->_internal_srcaddress(), target); + } + + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { + target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOBUF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); + } + // @@protoc_insertion_point(serialize_to_array_end:messages.leave.Leave) + return target; +} + +size_t Leave::ByteSizeLong() const { + +// @@protoc_insertion_point(message_byte_size_start:messages.leave.Leave) + size_t total_size = 0; + + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + // string srcAddress = 2; + if (!this->_internal_srcaddress().empty()) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + this->_internal_srcaddress()); + } + + // uint32 port = 1; + if (this->_internal_port() != 0) { + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne(this->_internal_port()); + } + + return MaybeComputeUnknownFieldsSize(total_size, &_impl_._cached_size_); +} + +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData Leave::_class_data_ = { + ::PROTOBUF_NAMESPACE_ID::Message::CopyWithSourceCheck, + Leave::MergeImpl +}; +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*Leave::GetClassData() const { return &_class_data_; } + + +void Leave::MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg) { + auto* const _this = static_cast(&to_msg); + auto& from = static_cast(from_msg); + + // @@protoc_insertion_point(class_specific_merge_from_start:messages.leave.Leave) + GOOGLE_DCHECK_NE(&from, _this); + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + if (!from._internal_srcaddress().empty()) { + _this->_internal_set_srcaddress(from._internal_srcaddress()); + } + if (from._internal_port() != 0) { + _this->_internal_set_port(from._internal_port()); + } + _this->_internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); +} + +void Leave::CopyFrom(const Leave& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:messages.leave.Leave) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool Leave::IsInitialized() const { + return true; +} + +void Leave::InternalSwap(Leave* other) { + using std::swap; + auto* lhs_arena = GetArenaForAllocation(); + auto* rhs_arena = other->GetArenaForAllocation(); + _internal_metadata_.InternalSwap(&other->_internal_metadata_); + ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr::InternalSwap( + &_impl_.srcaddress_, lhs_arena, + &other->_impl_.srcaddress_, rhs_arena + ); + swap(_impl_.port_, other->_impl_.port_); +} + +::PROTOBUF_NAMESPACE_ID::Metadata Leave::GetMetadata() const { + + return ::_pbi::AssignDescriptors( + &descriptor_table_leave_2eproto_getter, &descriptor_table_leave_2eproto_once, + file_level_metadata_leave_2eproto[0]); +} +// @@protoc_insertion_point(namespace_scope) +} // namespace leave +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template<> PROTOBUF_NOINLINE ::messages::leave::Leave* +Arena::CreateMaybeMessage< ::messages::leave::Leave >(Arena* arena) { + return Arena::CreateMessageInternal< ::messages::leave::Leave >(arena); +} +PROTOBUF_NAMESPACE_CLOSE +// @@protoc_insertion_point(global_scope) +#include "google/protobuf/port_undef.inc" diff --git a/include/WHISPER/Messages/Protos/leave.pb.h b/include/WHISPER/Messages/Protos/leave.pb.h new file mode 100644 index 0000000..922ba49 --- /dev/null +++ b/include/WHISPER/Messages/Protos/leave.pb.h @@ -0,0 +1,329 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: leave.proto + +#ifndef GOOGLE_PROTOBUF_INCLUDED_leave_2eproto_2epb_2eh +#define GOOGLE_PROTOBUF_INCLUDED_leave_2eproto_2epb_2eh + +#include +#include +#include + +#include "google/protobuf/port_def.inc" +#if PROTOBUF_VERSION < 3021000 +#error "This file was generated by a newer version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please update" +#error "your headers." +#endif // PROTOBUF_VERSION + +#if 3021008 < PROTOBUF_MIN_PROTOC_VERSION +#error "This file was generated by an older version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please" +#error "regenerate this file with a newer version of protoc." +#endif // PROTOBUF_MIN_PROTOC_VERSION +#include "google/protobuf/port_undef.inc" +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/arena.h" +#include "google/protobuf/arenastring.h" +#include "google/protobuf/generated_message_util.h" +#include "google/protobuf/metadata_lite.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/message.h" +#include "google/protobuf/repeated_field.h" // IWYU pragma: export +#include "google/protobuf/extension_set.h" // IWYU pragma: export +#include "google/protobuf/unknown_field_set.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" + +#define PROTOBUF_INTERNAL_EXPORT_leave_2eproto + +PROTOBUF_NAMESPACE_OPEN +namespace internal { +class AnyMetadata; +} // namespace internal +PROTOBUF_NAMESPACE_CLOSE + +// Internal implementation detail -- do not use these members. +struct TableStruct_leave_2eproto { + static const uint32_t offsets[]; +}; +extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable + descriptor_table_leave_2eproto; +namespace messages { +namespace leave { +class Leave; +struct LeaveDefaultTypeInternal; +extern LeaveDefaultTypeInternal _Leave_default_instance_; +} // namespace leave +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template <> +::messages::leave::Leave* Arena::CreateMaybeMessage<::messages::leave::Leave>(Arena*); +PROTOBUF_NAMESPACE_CLOSE + +namespace messages { +namespace leave { + +// =================================================================== + + +// ------------------------------------------------------------------- + +class Leave final : + public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:messages.leave.Leave) */ { + public: + inline Leave() : Leave(nullptr) {} + ~Leave() override; + explicit PROTOBUF_CONSTEXPR Leave(::PROTOBUF_NAMESPACE_ID::internal::ConstantInitialized); + + Leave(const Leave& from); + Leave(Leave&& from) noexcept + : Leave() { + *this = ::std::move(from); + } + + inline Leave& operator=(const Leave& from) { + CopyFrom(from); + return *this; + } + inline Leave& operator=(Leave&& from) noexcept { + if (this == &from) return *this; + if (GetOwningArena() == from.GetOwningArena() + #ifdef PROTOBUF_FORCE_COPY_IN_MOVE + && GetOwningArena() != nullptr + #endif // !PROTOBUF_FORCE_COPY_IN_MOVE + ) { + InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { + return GetDescriptor(); + } + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { + return default_instance().GetMetadata().descriptor; + } + static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { + return default_instance().GetMetadata().reflection; + } + static const Leave& default_instance() { + return *internal_default_instance(); + } + static inline const Leave* internal_default_instance() { + return reinterpret_cast( + &_Leave_default_instance_); + } + static constexpr int kIndexInFileMessages = + 0; + + friend void swap(Leave& a, Leave& b) { + a.Swap(&b); + } + inline void Swap(Leave* other) { + if (other == this) return; + #ifdef PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() != nullptr && + GetOwningArena() == other->GetOwningArena()) { + #else // PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() == other->GetOwningArena()) { + #endif // !PROTOBUF_FORCE_COPY_IN_SWAP + InternalSwap(other); + } else { + ::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other); + } + } + void UnsafeArenaSwap(Leave* other) { + if (other == this) return; + GOOGLE_DCHECK(GetOwningArena() == other->GetOwningArena()); + InternalSwap(other); + } + + // implements Message ---------------------------------------------- + + Leave* New(::PROTOBUF_NAMESPACE_ID::Arena* arena = nullptr) const final { + return CreateMaybeMessage(arena); + } + using ::PROTOBUF_NAMESPACE_ID::Message::CopyFrom; + void CopyFrom(const Leave& from); + using ::PROTOBUF_NAMESPACE_ID::Message::MergeFrom; + void MergeFrom( const Leave& from) { + Leave::MergeImpl(*this, from); + } + private: + static void MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg); + public: + PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; + uint8_t* _InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final; + int GetCachedSize() const final { return _impl_._cached_size_.Get(); } + + private: + void SharedCtor(::PROTOBUF_NAMESPACE_ID::Arena* arena, bool is_message_owned); + void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(Leave* other); + + private: + friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; + static ::absl::string_view FullMessageName() { + return "messages.leave.Leave"; + } + protected: + explicit Leave(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned = false); + public: + + static const ClassData _class_data_; + const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*GetClassData() const final; + + ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + enum : int { + kSrcAddressFieldNumber = 2, + kPortFieldNumber = 1, + }; + // string srcAddress = 2; + void clear_srcaddress(); + const std::string& srcaddress() const; + template + void set_srcaddress(ArgT0&& arg0, ArgT... args); + std::string* mutable_srcaddress(); + PROTOBUF_NODISCARD std::string* release_srcaddress(); + void set_allocated_srcaddress(std::string* srcaddress); + private: + const std::string& _internal_srcaddress() const; + inline PROTOBUF_ALWAYS_INLINE void _internal_set_srcaddress(const std::string& value); + std::string* _internal_mutable_srcaddress(); + public: + + // uint32 port = 1; + void clear_port(); + uint32_t port() const; + void set_port(uint32_t value); + private: + uint32_t _internal_port() const; + void _internal_set_port(uint32_t value); + public: + + // @@protoc_insertion_point(class_scope:messages.leave.Leave) + private: + class _Internal; + + template friend class ::PROTOBUF_NAMESPACE_ID::Arena::InternalHelper; + typedef void InternalArenaConstructable_; + typedef void DestructorSkippable_; + struct Impl_ { + ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr srcaddress_; + uint32_t port_; + mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; + }; + union { Impl_ _impl_; }; + friend struct ::TableStruct_leave_2eproto; +}; +// =================================================================== + + + + +// =================================================================== + + +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wstrict-aliasing" +#endif // __GNUC__ +// ------------------------------------------------------------------- + +// Leave + +// uint32 port = 1; +inline void Leave::clear_port() { + _impl_.port_ = 0u; +} +inline uint32_t Leave::_internal_port() const { + return _impl_.port_; +} +inline uint32_t Leave::port() const { + // @@protoc_insertion_point(field_get:messages.leave.Leave.port) + return _internal_port(); +} +inline void Leave::_internal_set_port(uint32_t value) { + + _impl_.port_ = value; +} +inline void Leave::set_port(uint32_t value) { + _internal_set_port(value); + // @@protoc_insertion_point(field_set:messages.leave.Leave.port) +} + +// string srcAddress = 2; +inline void Leave::clear_srcaddress() { + _impl_.srcaddress_.ClearToEmpty(); +} +inline const std::string& Leave::srcaddress() const { + // @@protoc_insertion_point(field_get:messages.leave.Leave.srcAddress) + return _internal_srcaddress(); +} +template +inline PROTOBUF_ALWAYS_INLINE +void Leave::set_srcaddress(ArgT0&& arg0, ArgT... args) { + + _impl_.srcaddress_.Set(static_cast(arg0), args..., GetArenaForAllocation()); + // @@protoc_insertion_point(field_set:messages.leave.Leave.srcAddress) +} +inline std::string* Leave::mutable_srcaddress() { + std::string* _s = _internal_mutable_srcaddress(); + // @@protoc_insertion_point(field_mutable:messages.leave.Leave.srcAddress) + return _s; +} +inline const std::string& Leave::_internal_srcaddress() const { + return _impl_.srcaddress_.Get(); +} +inline void Leave::_internal_set_srcaddress(const std::string& value) { + + _impl_.srcaddress_.Set(value, GetArenaForAllocation()); +} +inline std::string* Leave::_internal_mutable_srcaddress() { + + return _impl_.srcaddress_.Mutable(GetArenaForAllocation()); +} +inline std::string* Leave::release_srcaddress() { + // @@protoc_insertion_point(field_release:messages.leave.Leave.srcAddress) + return _impl_.srcaddress_.Release(); +} +inline void Leave::set_allocated_srcaddress(std::string* srcaddress) { + _impl_.srcaddress_.SetAllocated(srcaddress, GetArenaForAllocation()); +#ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + if (_impl_.srcaddress_.IsDefault()) { + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + } +#endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING + // @@protoc_insertion_point(field_set_allocated:messages.leave.Leave.srcAddress) +} + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif // __GNUC__ + +// @@protoc_insertion_point(namespace_scope) +} // namespace leave +} // namespace messages + + +// @@protoc_insertion_point(global_scope) + +#include "google/protobuf/port_undef.inc" + +#endif // GOOGLE_PROTOBUF_INCLUDED_leave_2eproto_2epb_2eh diff --git a/include/WHISPER/Messages/Protos/leave.proto b/include/WHISPER/Messages/Protos/leave.proto new file mode 100644 index 0000000..f8be89f --- /dev/null +++ b/include/WHISPER/Messages/Protos/leave.proto @@ -0,0 +1,15 @@ + +// [START declaration] +syntax = "proto3"; +package messages.leave; + +// import "google/protobuf/timestamp.proto"; +// [END declaration] + + +// [START messages] +message Leave { + uint32 port = 1; + string srcAddress = 2; +} + diff --git a/include/WHISPER/Messages/Protos/message.pb.cc b/include/WHISPER/Messages/Protos/message.pb.cc index 7f6a958..619d442 100644 --- a/include/WHISPER/Messages/Protos/message.pb.cc +++ b/include/WHISPER/Messages/Protos/message.pb.cc @@ -23,7 +23,7 @@ namespace header { PROTOBUF_CONSTEXPR Message::Message( ::_pbi::ConstantInitialized): _impl_{ /*decltype(_impl_.payload_)*/{} - , /*decltype(_impl_.topic_)*/0 + , /*decltype(_impl_.topic_)*/0u , /*decltype(_impl_.msgtype_)*/0 , /*decltype(_impl_.sourcetype_)*/0 , /*decltype(_impl_.sourceid_)*/0 @@ -73,7 +73,7 @@ static const ::_pb::Message* const file_default_instances[] = { const char descriptor_table_protodef_message_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { "\n\rmessage.proto\022\017messages.header\032\031google" "/protobuf/any.proto\"v\n\007Message\022\r\n\005topic\030" - "\001 \001(\005\022\017\n\007msgType\030\002 \001(\005\022\022\n\nsourceType\030\003 \001" + "\001 \001(\r\022\017\n\007msgType\030\002 \001(\005\022\022\n\nsourceType\030\003 \001" "(\005\022\020\n\010sourceID\030\004 \001(\005\022%\n\007payload\030\005 \003(\0132\024." "google.protobuf.Anyb\006proto3" }; @@ -158,7 +158,7 @@ inline void Message::SharedCtor( (void)is_message_owned; new (&_impl_) Impl_{ decltype(_impl_.payload_){arena} - , decltype(_impl_.topic_){0} + , decltype(_impl_.topic_){0u} , decltype(_impl_.msgtype_){0} , decltype(_impl_.sourcetype_){0} , decltype(_impl_.sourceid_){0} @@ -204,7 +204,7 @@ const char* Message::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx) uint32_t tag; ptr = ::_pbi::ReadTag(ptr, &tag); switch (tag >> 3) { - // int32 topic = 1; + // uint32 topic = 1; case 1: if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 8)) { _impl_.topic_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); @@ -279,10 +279,10 @@ uint8_t* Message::_InternalSerialize( uint32_t cached_has_bits = 0; (void) cached_has_bits; - // int32 topic = 1; + // uint32 topic = 1; if (this->_internal_topic() != 0) { target = stream->EnsureSpace(target); - target = ::_pbi::WireFormatLite::WriteInt32ToArray(1, this->_internal_topic(), target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray(1, this->_internal_topic(), target); } // int32 msgType = 2; @@ -335,9 +335,9 @@ size_t Message::ByteSizeLong() const { ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(msg); } - // int32 topic = 1; + // uint32 topic = 1; if (this->_internal_topic() != 0) { - total_size += ::_pbi::WireFormatLite::Int32SizePlusOne(this->_internal_topic()); + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne(this->_internal_topic()); } // int32 msgType = 2; diff --git a/include/WHISPER/Messages/Protos/message.pb.h b/include/WHISPER/Messages/Protos/message.pb.h index cfbb761..4f442d9 100644 --- a/include/WHISPER/Messages/Protos/message.pb.h +++ b/include/WHISPER/Messages/Protos/message.pb.h @@ -216,13 +216,13 @@ class Message final : const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any >& payload() const; - // int32 topic = 1; + // uint32 topic = 1; void clear_topic(); - int32_t topic() const; - void set_topic(int32_t value); + uint32_t topic() const; + void set_topic(uint32_t value); private: - int32_t _internal_topic() const; - void _internal_set_topic(int32_t value); + uint32_t _internal_topic() const; + void _internal_set_topic(uint32_t value); public: // int32 msgType = 2; @@ -261,7 +261,7 @@ class Message final : typedef void DestructorSkippable_; struct Impl_ { ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any > payload_; - int32_t topic_; + uint32_t topic_; int32_t msgtype_; int32_t sourcetype_; int32_t sourceid_; @@ -286,22 +286,22 @@ class Message final : // Message -// int32 topic = 1; +// uint32 topic = 1; inline void Message::clear_topic() { - _impl_.topic_ = 0; + _impl_.topic_ = 0u; } -inline int32_t Message::_internal_topic() const { +inline uint32_t Message::_internal_topic() const { return _impl_.topic_; } -inline int32_t Message::topic() const { +inline uint32_t Message::topic() const { // @@protoc_insertion_point(field_get:messages.header.Message.topic) return _internal_topic(); } -inline void Message::_internal_set_topic(int32_t value) { +inline void Message::_internal_set_topic(uint32_t value) { _impl_.topic_ = value; } -inline void Message::set_topic(int32_t value) { +inline void Message::set_topic(uint32_t value) { _internal_set_topic(value); // @@protoc_insertion_point(field_set:messages.header.Message.topic) } diff --git a/include/WHISPER/Messages/Protos/message.proto b/include/WHISPER/Messages/Protos/message.proto index 4e27233..2838dbf 100644 --- a/include/WHISPER/Messages/Protos/message.proto +++ b/include/WHISPER/Messages/Protos/message.proto @@ -7,7 +7,7 @@ package messages.header; // [START messages] message Message { - int32 topic = 1; + uint32 topic = 1; int32 msgType = 2; int32 sourceType = 3; int32 sourceID = 4; diff --git a/include/WHISPER/Messages/Protos/raw_track.pb.cc b/include/WHISPER/Messages/Protos/raw_track.pb.cc new file mode 100644 index 0000000..b521ccb --- /dev/null +++ b/include/WHISPER/Messages/Protos/raw_track.pb.cc @@ -0,0 +1,294 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: raw_track.proto + +#include "raw_track.pb.h" + +#include +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/extension_set.h" +#include "google/protobuf/wire_format_lite.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/reflection_ops.h" +#include "google/protobuf/wire_format.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" +PROTOBUF_PRAGMA_INIT_SEG +namespace _pb = ::PROTOBUF_NAMESPACE_ID; +namespace _pbi = ::PROTOBUF_NAMESPACE_ID::internal; +namespace messages { +namespace raw_track { +PROTOBUF_CONSTEXPR RawTrack::RawTrack( + ::_pbi::ConstantInitialized): _impl_{ + /*decltype(_impl_.trackno_)*/0u + , /*decltype(_impl_._cached_size_)*/{}} {} +struct RawTrackDefaultTypeInternal { + PROTOBUF_CONSTEXPR RawTrackDefaultTypeInternal() : _instance(::_pbi::ConstantInitialized{}) {} + ~RawTrackDefaultTypeInternal() {} + union { + RawTrack _instance; + }; +}; + +PROTOBUF_ATTRIBUTE_NO_DESTROY PROTOBUF_CONSTINIT + PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 RawTrackDefaultTypeInternal _RawTrack_default_instance_; +} // namespace raw_track +} // namespace messages +static ::_pb::Metadata file_level_metadata_raw_5ftrack_2eproto[1]; +static constexpr const ::_pb::EnumDescriptor** + file_level_enum_descriptors_raw_5ftrack_2eproto = nullptr; +static constexpr const ::_pb::ServiceDescriptor** + file_level_service_descriptors_raw_5ftrack_2eproto = nullptr; +const uint32_t TableStruct_raw_5ftrack_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE( + protodesc_cold) = { + ~0u, // no _has_bits_ + PROTOBUF_FIELD_OFFSET(::messages::raw_track::RawTrack, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + ~0u, // no _inlined_string_donated_ + ~0u, // no _split_ + ~0u, // no sizeof(Split) + PROTOBUF_FIELD_OFFSET(::messages::raw_track::RawTrack, _impl_.trackno_), +}; + +static const ::_pbi::MigrationSchema + schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + { 0, -1, -1, sizeof(::messages::raw_track::RawTrack)}, +}; + +static const ::_pb::Message* const file_default_instances[] = { + &::messages::raw_track::_RawTrack_default_instance_._instance, +}; +const char descriptor_table_protodef_raw_5ftrack_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + "\n\017raw_track.proto\022\022messages.raw_track\"\033\n" + "\010RawTrack\022\017\n\007trackNo\030\001 \001(\rb\006proto3" +}; +static ::absl::once_flag descriptor_table_raw_5ftrack_2eproto_once; +const ::_pbi::DescriptorTable descriptor_table_raw_5ftrack_2eproto = { + false, + false, + 74, + descriptor_table_protodef_raw_5ftrack_2eproto, + "raw_track.proto", + &descriptor_table_raw_5ftrack_2eproto_once, + nullptr, + 0, + 1, + schemas, + file_default_instances, + TableStruct_raw_5ftrack_2eproto::offsets, + file_level_metadata_raw_5ftrack_2eproto, + file_level_enum_descriptors_raw_5ftrack_2eproto, + file_level_service_descriptors_raw_5ftrack_2eproto, +}; + +// This function exists to be marked as weak. +// It can significantly speed up compilation by breaking up LLVM's SCC +// in the .pb.cc translation units. Large translation units see a +// reduction of more than 35% of walltime for optimized builds. Without +// the weak attribute all the messages in the file, including all the +// vtables and everything they use become part of the same SCC through +// a cycle like: +// GetMetadata -> descriptor table -> default instances -> +// vtables -> GetMetadata +// By adding a weak function here we break the connection from the +// individual vtables back into the descriptor table. +PROTOBUF_ATTRIBUTE_WEAK const ::_pbi::DescriptorTable* descriptor_table_raw_5ftrack_2eproto_getter() { + return &descriptor_table_raw_5ftrack_2eproto; +} +// Force running AddDescriptors() at dynamic initialization time. +PROTOBUF_ATTRIBUTE_INIT_PRIORITY2 +static ::_pbi::AddDescriptorsRunner dynamic_init_dummy_raw_5ftrack_2eproto(&descriptor_table_raw_5ftrack_2eproto); +namespace messages { +namespace raw_track { +// =================================================================== + +class RawTrack::_Internal { + public: +}; + +RawTrack::RawTrack(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned) + : ::PROTOBUF_NAMESPACE_ID::Message(arena, is_message_owned) { + SharedCtor(arena, is_message_owned); + // @@protoc_insertion_point(arena_constructor:messages.raw_track.RawTrack) +} +RawTrack::RawTrack(const RawTrack& from) + : ::PROTOBUF_NAMESPACE_ID::Message(), _impl_(from._impl_) { + _internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>( + from._internal_metadata_); + // @@protoc_insertion_point(copy_constructor:messages.raw_track.RawTrack) +} + +inline void RawTrack::SharedCtor( + ::_pb::Arena* arena, bool is_message_owned) { + (void)arena; + (void)is_message_owned; + new (&_impl_) Impl_{ + decltype(_impl_.trackno_){0u} + , /*decltype(_impl_._cached_size_)*/{} + }; +} + +RawTrack::~RawTrack() { + // @@protoc_insertion_point(destructor:messages.raw_track.RawTrack) + if (auto *arena = _internal_metadata_.DeleteReturnArena<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>()) { + (void)arena; + return; + } + SharedDtor(); +} + +inline void RawTrack::SharedDtor() { + GOOGLE_DCHECK(GetArenaForAllocation() == nullptr); +} + +void RawTrack::SetCachedSize(int size) const { + _impl_._cached_size_.Set(size); +} + +void RawTrack::Clear() { +// @@protoc_insertion_point(message_clear_start:messages.raw_track.RawTrack) + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + _impl_.trackno_ = 0u; + _internal_metadata_.Clear<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(); +} + +const char* RawTrack::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx) { + +#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure + while (!ctx->Done(&ptr)) { + uint32_t tag; + ptr = ::_pbi::ReadTag(ptr, &tag); + switch (tag >> 3) { + // uint32 trackNo = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 8)) { + _impl_.trackno_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + default: + goto handle_unusual; + } // switch + handle_unusual: + if ((tag == 0) || ((tag & 7) == 4)) { + CHK_(ptr); + ctx->SetLastTag(tag); + goto message_done; + } + ptr = UnknownFieldParse( + tag, + _internal_metadata_.mutable_unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(), + ptr, ctx); + CHK_(ptr != nullptr); + } // while +message_done: + return ptr; +failure: + ptr = nullptr; + goto message_done; +#undef CHK_ +} + +uint8_t* RawTrack::_InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const { + + // @@protoc_insertion_point(serialize_to_array_start:messages.raw_track.RawTrack) + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + // uint32 trackNo = 1; + if (this->_internal_trackno() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray(1, this->_internal_trackno(), target); + } + + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { + target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOBUF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); + } + // @@protoc_insertion_point(serialize_to_array_end:messages.raw_track.RawTrack) + return target; +} + +size_t RawTrack::ByteSizeLong() const { + +// @@protoc_insertion_point(message_byte_size_start:messages.raw_track.RawTrack) + size_t total_size = 0; + + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + // uint32 trackNo = 1; + if (this->_internal_trackno() != 0) { + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne(this->_internal_trackno()); + } + + return MaybeComputeUnknownFieldsSize(total_size, &_impl_._cached_size_); +} + +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData RawTrack::_class_data_ = { + ::PROTOBUF_NAMESPACE_ID::Message::CopyWithSourceCheck, + RawTrack::MergeImpl +}; +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*RawTrack::GetClassData() const { return &_class_data_; } + + +void RawTrack::MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg) { + auto* const _this = static_cast(&to_msg); + auto& from = static_cast(from_msg); + + // @@protoc_insertion_point(class_specific_merge_from_start:messages.raw_track.RawTrack) + GOOGLE_DCHECK_NE(&from, _this); + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + if (from._internal_trackno() != 0) { + _this->_internal_set_trackno(from._internal_trackno()); + } + _this->_internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); +} + +void RawTrack::CopyFrom(const RawTrack& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:messages.raw_track.RawTrack) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool RawTrack::IsInitialized() const { + return true; +} + +void RawTrack::InternalSwap(RawTrack* other) { + using std::swap; + _internal_metadata_.InternalSwap(&other->_internal_metadata_); + swap(_impl_.trackno_, other->_impl_.trackno_); +} + +::PROTOBUF_NAMESPACE_ID::Metadata RawTrack::GetMetadata() const { + + return ::_pbi::AssignDescriptors( + &descriptor_table_raw_5ftrack_2eproto_getter, &descriptor_table_raw_5ftrack_2eproto_once, + file_level_metadata_raw_5ftrack_2eproto[0]); +} +// @@protoc_insertion_point(namespace_scope) +} // namespace raw_track +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template<> PROTOBUF_NOINLINE ::messages::raw_track::RawTrack* +Arena::CreateMaybeMessage< ::messages::raw_track::RawTrack >(Arena* arena) { + return Arena::CreateMessageInternal< ::messages::raw_track::RawTrack >(arena); +} +PROTOBUF_NAMESPACE_CLOSE +// @@protoc_insertion_point(global_scope) +#include "google/protobuf/port_undef.inc" diff --git a/include/WHISPER/Messages/Protos/raw_track.pb.h b/include/WHISPER/Messages/Protos/raw_track.pb.h new file mode 100644 index 0000000..832eb76 --- /dev/null +++ b/include/WHISPER/Messages/Protos/raw_track.pb.h @@ -0,0 +1,268 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: raw_track.proto + +#ifndef GOOGLE_PROTOBUF_INCLUDED_raw_5ftrack_2eproto_2epb_2eh +#define GOOGLE_PROTOBUF_INCLUDED_raw_5ftrack_2eproto_2epb_2eh + +#include +#include +#include + +#include "google/protobuf/port_def.inc" +#if PROTOBUF_VERSION < 3021000 +#error "This file was generated by a newer version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please update" +#error "your headers." +#endif // PROTOBUF_VERSION + +#if 3021008 < PROTOBUF_MIN_PROTOC_VERSION +#error "This file was generated by an older version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please" +#error "regenerate this file with a newer version of protoc." +#endif // PROTOBUF_MIN_PROTOC_VERSION +#include "google/protobuf/port_undef.inc" +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/arena.h" +#include "google/protobuf/arenastring.h" +#include "google/protobuf/generated_message_util.h" +#include "google/protobuf/metadata_lite.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/message.h" +#include "google/protobuf/repeated_field.h" // IWYU pragma: export +#include "google/protobuf/extension_set.h" // IWYU pragma: export +#include "google/protobuf/unknown_field_set.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" + +#define PROTOBUF_INTERNAL_EXPORT_raw_5ftrack_2eproto + +PROTOBUF_NAMESPACE_OPEN +namespace internal { +class AnyMetadata; +} // namespace internal +PROTOBUF_NAMESPACE_CLOSE + +// Internal implementation detail -- do not use these members. +struct TableStruct_raw_5ftrack_2eproto { + static const uint32_t offsets[]; +}; +extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable + descriptor_table_raw_5ftrack_2eproto; +namespace messages { +namespace raw_track { +class RawTrack; +struct RawTrackDefaultTypeInternal; +extern RawTrackDefaultTypeInternal _RawTrack_default_instance_; +} // namespace raw_track +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template <> +::messages::raw_track::RawTrack* Arena::CreateMaybeMessage<::messages::raw_track::RawTrack>(Arena*); +PROTOBUF_NAMESPACE_CLOSE + +namespace messages { +namespace raw_track { + +// =================================================================== + + +// ------------------------------------------------------------------- + +class RawTrack final : + public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:messages.raw_track.RawTrack) */ { + public: + inline RawTrack() : RawTrack(nullptr) {} + ~RawTrack() override; + explicit PROTOBUF_CONSTEXPR RawTrack(::PROTOBUF_NAMESPACE_ID::internal::ConstantInitialized); + + RawTrack(const RawTrack& from); + RawTrack(RawTrack&& from) noexcept + : RawTrack() { + *this = ::std::move(from); + } + + inline RawTrack& operator=(const RawTrack& from) { + CopyFrom(from); + return *this; + } + inline RawTrack& operator=(RawTrack&& from) noexcept { + if (this == &from) return *this; + if (GetOwningArena() == from.GetOwningArena() + #ifdef PROTOBUF_FORCE_COPY_IN_MOVE + && GetOwningArena() != nullptr + #endif // !PROTOBUF_FORCE_COPY_IN_MOVE + ) { + InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { + return GetDescriptor(); + } + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { + return default_instance().GetMetadata().descriptor; + } + static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { + return default_instance().GetMetadata().reflection; + } + static const RawTrack& default_instance() { + return *internal_default_instance(); + } + static inline const RawTrack* internal_default_instance() { + return reinterpret_cast( + &_RawTrack_default_instance_); + } + static constexpr int kIndexInFileMessages = + 0; + + friend void swap(RawTrack& a, RawTrack& b) { + a.Swap(&b); + } + inline void Swap(RawTrack* other) { + if (other == this) return; + #ifdef PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() != nullptr && + GetOwningArena() == other->GetOwningArena()) { + #else // PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() == other->GetOwningArena()) { + #endif // !PROTOBUF_FORCE_COPY_IN_SWAP + InternalSwap(other); + } else { + ::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other); + } + } + void UnsafeArenaSwap(RawTrack* other) { + if (other == this) return; + GOOGLE_DCHECK(GetOwningArena() == other->GetOwningArena()); + InternalSwap(other); + } + + // implements Message ---------------------------------------------- + + RawTrack* New(::PROTOBUF_NAMESPACE_ID::Arena* arena = nullptr) const final { + return CreateMaybeMessage(arena); + } + using ::PROTOBUF_NAMESPACE_ID::Message::CopyFrom; + void CopyFrom(const RawTrack& from); + using ::PROTOBUF_NAMESPACE_ID::Message::MergeFrom; + void MergeFrom( const RawTrack& from) { + RawTrack::MergeImpl(*this, from); + } + private: + static void MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg); + public: + PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; + uint8_t* _InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final; + int GetCachedSize() const final { return _impl_._cached_size_.Get(); } + + private: + void SharedCtor(::PROTOBUF_NAMESPACE_ID::Arena* arena, bool is_message_owned); + void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(RawTrack* other); + + private: + friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; + static ::absl::string_view FullMessageName() { + return "messages.raw_track.RawTrack"; + } + protected: + explicit RawTrack(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned = false); + public: + + static const ClassData _class_data_; + const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*GetClassData() const final; + + ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + enum : int { + kTrackNoFieldNumber = 1, + }; + // uint32 trackNo = 1; + void clear_trackno(); + uint32_t trackno() const; + void set_trackno(uint32_t value); + private: + uint32_t _internal_trackno() const; + void _internal_set_trackno(uint32_t value); + public: + + // @@protoc_insertion_point(class_scope:messages.raw_track.RawTrack) + private: + class _Internal; + + template friend class ::PROTOBUF_NAMESPACE_ID::Arena::InternalHelper; + typedef void InternalArenaConstructable_; + typedef void DestructorSkippable_; + struct Impl_ { + uint32_t trackno_; + mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; + }; + union { Impl_ _impl_; }; + friend struct ::TableStruct_raw_5ftrack_2eproto; +}; +// =================================================================== + + + + +// =================================================================== + + +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wstrict-aliasing" +#endif // __GNUC__ +// ------------------------------------------------------------------- + +// RawTrack + +// uint32 trackNo = 1; +inline void RawTrack::clear_trackno() { + _impl_.trackno_ = 0u; +} +inline uint32_t RawTrack::_internal_trackno() const { + return _impl_.trackno_; +} +inline uint32_t RawTrack::trackno() const { + // @@protoc_insertion_point(field_get:messages.raw_track.RawTrack.trackNo) + return _internal_trackno(); +} +inline void RawTrack::_internal_set_trackno(uint32_t value) { + + _impl_.trackno_ = value; +} +inline void RawTrack::set_trackno(uint32_t value) { + _internal_set_trackno(value); + // @@protoc_insertion_point(field_set:messages.raw_track.RawTrack.trackNo) +} + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif // __GNUC__ + +// @@protoc_insertion_point(namespace_scope) +} // namespace raw_track +} // namespace messages + + +// @@protoc_insertion_point(global_scope) + +#include "google/protobuf/port_undef.inc" + +#endif // GOOGLE_PROTOBUF_INCLUDED_raw_5ftrack_2eproto_2epb_2eh diff --git a/include/WHISPER/Messages/Protos/raw_track.proto b/include/WHISPER/Messages/Protos/raw_track.proto new file mode 100644 index 0000000..4a2e7e0 --- /dev/null +++ b/include/WHISPER/Messages/Protos/raw_track.proto @@ -0,0 +1,15 @@ + +// [START declaration] +syntax = "proto3"; +package messages.raw_track; + +// import "google/protobuf/timestamp.proto"; +// [END declaration] + + +// [START messages] +message RawTrack { + uint32 trackNo = 1; + +} + diff --git a/include/WHISPER/Messages/Track.hpp b/include/WHISPER/Messages/Track.hpp new file mode 100644 index 0000000..048de18 --- /dev/null +++ b/include/WHISPER/Messages/Track.hpp @@ -0,0 +1,25 @@ +#pragma once + + +#include "google/protobuf/message.h" +#include +#include +#include +#include + +namespace WHISPER { + + class RawTrack : public Message + { + private: + + messages::raw_track::RawTrack trackMessage; + public: + std::uint32_t trackNo; + + RawTrack(std::string receivedMessage); + + RawTrack(std::uint32_t deviceID,WHISPER::MsgTopics topic, SourceType src,std::uint32_t trackNo); + + }; +} \ No newline at end of file diff --git a/include/WHISPER/whisper.hpp b/include/WHISPER/whisper.hpp index d9e24db..7180e95 100644 --- a/include/WHISPER/whisper.hpp +++ b/include/WHISPER/whisper.hpp @@ -52,7 +52,7 @@ namespace WHISPER std::thread receiveThread; /// variable indicating if the receiveThread should be stopped - std::atomic stopReceiveThread; + std::atomic stopReceiveThread = false; std::shared_ptr> receiveQueue = nullptr; @@ -71,11 +71,17 @@ namespace WHISPER void unsubscribe(std::string topic); std::uint32_t getOwnID(); + SourceType getOwnDeviceType(); protected: + + void addMsgToReceiverQueue(WHISPER::Message); void setGateway(bool); + bool isGateway(); + + virtual void derivedConnect() = 0; virtual void derivedDisconnect() = 0; diff --git a/src/WHISPER/InternalUDPService.cpp b/src/WHISPER/InternalUDPService.cpp index b7c903a..71a72c9 100644 --- a/src/WHISPER/InternalUDPService.cpp +++ b/src/WHISPER/InternalUDPService.cpp @@ -2,17 +2,21 @@ #include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Join.hpp" +#include "WHISPER/Messages/Leave.hpp" #include "WHISPER/Messages/Message.hpp" #include "WHISPER/whisper.hpp" #include "zmq.hpp" #include +#include #include #include #include +#include #include #include #include #include +#include namespace WHISPER { @@ -37,43 +41,62 @@ namespace WHISPER { if(ownReceivingPort_ == port_){ std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); + }else if (ownReceivingPort_ == 0) { throw std::invalid_argument( " receiver cant bind to port " ); }else { - LOG_S(INFO)<< "new port to bind: " << ownReceivingPort_; loopbackSocket = std::make_shared(ctx,zmq::socket_type::radio); loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); - std::string portAsString = std::to_string(ownReceivingPort_); - receiver->bind("udp://*:"+portAsString); + + + LOG_S(WARNING)<<"local receiving port is: " << ownReceivingPort_; } - - receiver->join("management"); + ///subscribe to the basic message domain + ///zmq upd sockets only work with pub sub + subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); - LOG_S(INFO)<< sendingPort; + ///used to set a custom time out to the socket + receiver->set(zmq::sockopt::rcvtimeo,100); sender.connect("udp://"+destinationAdress_+":"+sendingPort); // sender.set(zmq::sockopt::multicast_loop ,1); + LOG_S(INFO)<<"own ID: "<< getOwnID(); + WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); + this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); + + } void InternalUDPService::derivedDisconnect() - { + { + WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); + this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]); + sender.close(); + loopbackSocket = nullptr; receiver->close(); } + + void InternalUDPService::derivedPublish(std::string msg,std::string topic) { zmq::message_t tmpmsg(msg.begin(),msg.end()); - tmpmsg.set_group(topic.c_str()); + + sender.send(tmpmsg,zmq::send_flags::none); + zmq::message_t localmsg(msg.begin(),msg.end()); + tmpmsg.set_group(topic.c_str()); + + sendToLocalClients(msg, topic); + if (loopbackSocket != nullptr) { - LOG_S(INFO)<<"loop back send"; zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); loopbackSocket->send(tmpmsg,zmq::send_flags::none); @@ -82,69 +105,132 @@ namespace WHISPER { } void InternalUDPService::derivedReceive() { - + zmq::recv_result_t res; zmq::message_t msg; - receiver->recv(msg,zmq::recv_flags::none); - std::string message = msg.to_string(); - - WHISPER::Message receivedMessage(message); + ///res is the size of the received message or error; cant figure out what value is set when ther is an error + res = receiver->recv(msg,zmq::recv_flags::none); - if (receivedMessage.deviceId_ != getOwnID()) { - int msgType = receivedMessage.msgType_; - if (msgType == WHISPER::JOIN) { + + if (res.has_value() && res.value() >= 0) + { - WHISPER::Join join(message); - if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { - - bool clientAllreadyIn = false; - if (localclients.size() > 0) { - for (std::vector>::iterator it = localclients.begin(); it != localclients.end();it++) - { - if (it->get()->port == join.port) { - clientAllreadyIn = true; + std::string message = msg.to_string(); + WHISPER::Message receivedMessage(message); + + + sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]); + if (receivedMessage.deviceId_ != getOwnID()) { + + + int msgType = receivedMessage.msgType_; + if (msgType == WHISPER::JOIN) { + + WHISPER::Join join(message); + // LOG_S(INFO)<<"joined id "<myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { + LOG_S(INFO)<<"go for it"; + bool clientAllreadyIn = false; + if (localclients.size() > 0) { + for (auto it = localclients.begin(); it != localclients.end();it++) + { + if (it->get()->port == join.port) { + clientAllreadyIn = true; + } } } - } - if (clientAllreadyIn == false) { - auto client = std::make_shared(); - client->port = join.port; - client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); - client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); - localclients.emplace_back(client); - setGateway(true); + if (clientAllreadyIn == false) { + // localClient client; + auto client = std::make_shared(); + client->port = join.port; + client->id = join.deviceId_; + client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); + client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); + + LOG_S(INFO)<< "udp://"+join.sourceAddr+":" + std::to_string(join.port); + + client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); + localclients.emplace_back(client); + setGateway(true); + LOG_S(INFO)<< "new client joined"; + + } } + + }else if(msgType == WHISPER::LEAVE) + { + LOG_S(INFO)<<"client left"; + + WHISPER::Leave Leave(message); + if (localclients.size() > 0) { + for (auto it = localclients.begin(); it != localclients.end();it++) + { + if (it->get()->port == Leave.port) { + it->get()->clientSocket.close(); + it = localclients.erase(it); + LOG_S(INFO)<<"client left"; + } + } + } + + if (localclients.size() == 0) { + setGateway(false); + } + + }else if(msgType == WHISPER::PING) + { + + }else if(msgType == WHISPER::PONG) + { + + }else + { + addMsgToReceiverQueue(WHISPER::Message(message)); } + } + }else if(res.has_value() && res.value()== -1 ){ + + } + } - }else if(msgType == WHISPER::LEAVE) + void InternalUDPService::sendToLocalClients(std::string msg,std::string topic) + { + if (localclients.size() > 0 && isGateway() == true) { + auto tmpMsg = WHISPER::Message(msg); + for (auto it = localclients.begin(); it != localclients.end();it++) { - - }else if(msgType == WHISPER::PING) - { - - }else if(msgType == WHISPER::PONG) - { - - }else - { - addMsgToReceiverQueue(WHISPER::Message(message)); + if (tmpMsg.deviceId_ != it->get()->id) { + zmq::message_t tmp(msg.begin(),msg.end()); + tmp.set_group(topic.c_str()); + it->get()->clientSocket.send(tmp,zmq::send_flags::none); + } + + } } - } + } + void InternalUDPService::derivedSubscribe(std::string topic) { - receiver->join(topic.c_str()); + receiver->join(topic.c_str()); } + void InternalUDPService::derivedUnsubscribe(std::string topic) { receiver->leave(topic.c_str()); } + + + + std::uint16_t InternalUDPService::checkPort(std::uint16_t port) { std::uint16_t localPort = port; @@ -162,14 +248,9 @@ namespace WHISPER { local.sin_addr.s_addr = INADDR_ANY; local.sin_port = htons(port); int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local)); - - if (err == -1 && errno == EADDRINUSE) { - // no we are not a gateway - setGateway(false); - // set port to auto on binding local.sin_port = 0; err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local)); @@ -210,8 +291,6 @@ namespace WHISPER { close(sockfd); - LOG_S(WARNING)<< "new assignet port is" << localPort; - return localPort; } diff --git a/src/WHISPER/Messages/Join.cpp b/src/WHISPER/Messages/Join.cpp index 25c5803..123fe57 100644 --- a/src/WHISPER/Messages/Join.cpp +++ b/src/WHISPER/Messages/Join.cpp @@ -15,6 +15,7 @@ namespace WHISPER { sourceType_ = msg.sourcetype(); msgType_ = msg.msgtype(); joinMessage = messages::join::Join(); + deviceId_ = msg.sourceid(); if ( msg.payload_size()) { if (msg.payload().begin()->Is()) { @@ -33,17 +34,16 @@ namespace WHISPER { } - WHISPER::Join::Join(std::uint32_t deviceID,std::uint32_t topic, MsgType Type, SourceType src,std::uint32_t port, std::string addr): - Message(deviceID,topic,Type,src),port(port),sourceAddr(addr) + WHISPER::Join::Join(std::uint32_t deviceID, SourceType src,std::uint32_t port, std::string addr): + Message(deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::JOIN,src),port(port),sourceAddr(addr) { joinMessage = messages::join::Join(); joinMessage.set_port(port); joinMessage.set_srcaddress(sourceAddr); - LOG_S(INFO)<< "join message befor packing:" << joinMessage.ByteSizeLong(); - auto test = std::make_shared(); - test->PackFrom(joinMessage); - addPayLoad(test); + auto payloadMessage = std::make_shared(); + payloadMessage->PackFrom(joinMessage); + addPayLoad(payloadMessage); } diff --git a/src/WHISPER/Messages/Leave.cpp b/src/WHISPER/Messages/Leave.cpp new file mode 100644 index 0000000..3fd2fe2 --- /dev/null +++ b/src/WHISPER/Messages/Leave.cpp @@ -0,0 +1,55 @@ +#include "WHISPER/Messages/Message.hpp" +#include +#include + + + + +namespace WHISPER { + + Leave::Leave(std::string receivedMessage){ + msg = messages::header::Message(); + try { + msg.ParseFromString(receivedMessage); + topic_ = msg.topic(); + sourceType_ = msg.sourcetype(); + msgType_ = msg.msgtype(); + + leaveMessage = messages::leave::Leave(); + + if ( msg.payload_size()) { + if (msg.payload().begin()->Is()) { + msg.payload().begin()->UnpackTo(&leaveMessage); + } + } + port = leaveMessage.port(); + sourceAddr = leaveMessage.srcaddress(); + + + + } catch (const std::exception& e) { + LOG_S(ERROR)<(); + payloadMessage->PackFrom(leaveMessage); + addPayLoad(payloadMessage); + + + } + + + + +} \ No newline at end of file diff --git a/src/WHISPER/Messages/Message.cpp b/src/WHISPER/Messages/Message.cpp index 70b32a3..e1e2c18 100644 --- a/src/WHISPER/Messages/Message.cpp +++ b/src/WHISPER/Messages/Message.cpp @@ -27,7 +27,7 @@ namespace WHISPER { } - Message::Message(std::int32_t deviceId,std::uint32_t topic, MsgType Type,SourceType src):topic_(topic),sourceType_(src),msgType_(Type){ + Message::Message(std::int32_t deviceId, MsgTopics topic, MsgType Type,SourceType src):topic_(topic),sourceType_(src),msgType_(Type){ msg = messages::header::Message(); if(msg.IsInitialized()) @@ -37,6 +37,11 @@ namespace WHISPER { msg.set_sourcetype(sourceType_); msg.set_msgtype(msgType_); } + deviceId_ = deviceId; + topic_ = topic; + sourceType_ = src; + msgType_ = Type; + } @@ -49,11 +54,7 @@ namespace WHISPER { void Message::addPayLoad(std::shared_ptr payload){ payload_ = payload; - LOG_S(INFO)<< "pack any size in message class "<ByteSizeLong(); - msg.add_payload()->CopyFrom(*payload_); - - LOG_S(INFO)<< "pack any size in message class "<ByteSizeLong(); } @@ -65,7 +66,6 @@ namespace WHISPER { if (msg.IsInitialized()) { serializedMessage = msg.SerializeAsString(); } - return serializedMessage; } diff --git a/src/WHISPER/Messages/Track.cpp b/src/WHISPER/Messages/Track.cpp new file mode 100644 index 0000000..6aac3d9 --- /dev/null +++ b/src/WHISPER/Messages/Track.cpp @@ -0,0 +1,56 @@ +#include "WHISPER/Messages/Message.hpp" +#include "WHISPER/Messages/Protos/raw_track.pb.h" +#include +#include + + + + +namespace WHISPER { + + RawTrack::RawTrack(std::string receivedMessage){ + msg = messages::header::Message(); + try { + msg.ParseFromString(receivedMessage); + topic_ = msg.topic(); + sourceType_ = msg.sourcetype(); + msgType_ = msg.msgtype(); + + trackMessage = messages::raw_track::RawTrack(); + + if ( msg.payload_size()) { + if (msg.payload().begin()->Is()) { + msg.payload().begin()->UnpackTo(&trackMessage); + } + } + + + trackNo = trackMessage.trackno(); + + + + } catch (const std::exception& e) { + LOG_S(ERROR)<(); + payloadMessage->PackFrom(trackMessage); + addPayLoad(payloadMessage); + + + } + + + + +} \ No newline at end of file diff --git a/src/WHISPER/whisper.cpp b/src/WHISPER/whisper.cpp index 5f1d790..35454f6 100644 --- a/src/WHISPER/whisper.cpp +++ b/src/WHISPER/whisper.cpp @@ -35,13 +35,17 @@ namespace WHISPER void whispercomm::disconnect() { + stopReceiveThread = true; + if(receiveThread.joinable()) receiveThread.join(); derivedDisconnect(); + } void whispercomm::publish(std::string msg,std::string topic){ this->derivedPublish(msg,topic); } + void whispercomm::receive(){ connected = true; @@ -49,17 +53,20 @@ namespace WHISPER derivedReceive(); while(!stopReceiveThread) - { - derivedReceive(); - } + { + derivedReceive(); + } } void whispercomm::subscribe(std::string topic) - { + { + this->subscribedTopics.push_back(topic); derivedSubscribe(topic); + + } void whispercomm::unsubscribe(std::string topic) @@ -89,6 +96,14 @@ namespace WHISPER { gateway = val; } + bool whispercomm::isGateway(){ + return gateway; + } + SourceType whispercomm::getOwnDeviceType() + { + return ownDeviceType_; + } + // Add datatypes here } // namespace WHISPER \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 878c5de..5862bf0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,5 +1,6 @@ #include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Message.hpp" +#include "WHISPER/Messages/Track.hpp" #include "zmq.hpp" #include #include @@ -24,70 +25,57 @@ void killHandlerPing(int s) { } } - +#define ID 1 int main() { - // GOOGLE_PROTOBUF_VERIFY_VERSION; + + + + // setup signal handler + struct sigaction sigIntHandler; + sigIntHandler.sa_handler = killHandlerPing; + sigemptyset(&sigIntHandler.sa_mask); + sigIntHandler.sa_flags = 0; + sigaction(SIGINT, &sigIntHandler, NULL); + GOOGLE_PROTOBUF_VERIFY_VERSION; - WHISPER::Join join(1,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"192.168.0.19"); - // std::string msg = join.serialize(); - // LOG_S(INFO)<<" serialized Message is "<>(); - WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.19"); + WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178"); service.connect(receiver); - service.publish(join.serialize(), "management"); - service.subscribe("data"); + + service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); + service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); - // zmq::context_t ctx(2); - // zmq::socket_t sock(ctx,zmq::socket_type::radio); - // sock.connect("udp://127.0.0.255:8000"); - - - // std::string string = "hello world form 2"; - // zmq::message_t msg(string.begin(),string.end()); - // // memcpy (msg.data (), string.data(), string.size()); - // LOG_S(INFO)<<"message contains "<size()) { LOG_S(INFO)<<"received messages " << size; size = receiver->size(); } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } + + + service.disconnect(); return 0; } \ No newline at end of file diff --git a/src/mainRecv.cpp b/src/mainRecv.cpp index 28c19f3..4754ee9 100644 --- a/src/mainRecv.cpp +++ b/src/mainRecv.cpp @@ -1,3 +1,4 @@ +#include "WHISPER/Messages/Message.hpp" #include "zmq.hpp" #include #include @@ -7,6 +8,8 @@ #include #include +#include + /// variable for stopping the application @@ -24,11 +27,16 @@ void killHandlerPing(int s) { } } - +#define ID 2 int main() { - +// setup signal handler + struct sigaction sigIntHandler; + sigIntHandler.sa_handler = killHandlerPing; + sigemptyset(&sigIntHandler.sa_mask); + sigIntHandler.sa_flags = 0; + sigaction(SIGINT, &sigIntHandler, NULL); // zmq::context_t ctx(2); // zmq::socket_t sock(ctx,zmq::socket_type::dish); @@ -36,26 +44,49 @@ int main() // zmq::message_t temp; // sock.join("data"); - WHISPER::Join join(2,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"192.168.0.19"); + auto receiver = std::make_shared>(); - WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.255"); + WHISPER::InternalUDPService service(2,WHISPER::SHIP,8000,"192.168.0.255","192.168.1.178"); + + service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); + service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); + + // WHISPER::Join join(2,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178"); + // service.publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); + service.connect(receiver); - service.publish(join.serialize(), "management"); // zmq::context_t ctx(2); // zmq::socket_t sock(ctx,zmq::socket_type::radio); // sock.connect("udp://127.0.0.1:8000"); // std::string string = "hello world form 2"; + int msgcount = 0; while (running) { - // LOG_S(INFO)<<"received messages " << receiver->size(); - service.publish(join.serialize(), "management"); - // zmq::message_t msg(string.begin(),string.end()); - // msg.set_group("management"); - // sock.send(msg,zmq::send_flags::none); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + if (msgcount != receiver->size()) { + LOG_S(INFO)<<"received messages " << receiver->size(); + + auto received = receiver.get()->get(); + auto topic = received.topic_; + LOG_S(INFO)<<"message type is: "<size(); + + // service.publish(received.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)topic]); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + // WHISPER::Leave leave(2,WHISPER::SourceType::SHIP,8000,"192.168.1.178"); + // service.publish(leave.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); + + // LOG_S(INFO)<<"message send"; + + service.disconnect(); return 0; } \ No newline at end of file