From 84305eb7faa1a4de845613986e9b44f16227b98a Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Sat, 12 Nov 2022 14:23:12 +0100 Subject: [PATCH] first version of InternalUDPService --- .gitignore | 4 +- CMakeLists.txt | 18 +- include/WHISPER/InternalUDPService.hpp | 47 ++ include/WHISPER/Messages/Message.hpp | 7 +- include/WHISPER/Messages/Protos/join.pb.cc | 353 ++++++++++++++ include/WHISPER/Messages/Protos/join.pb.h | 329 +++++++++++++ include/WHISPER/Messages/Protos/message.pb.cc | 432 ++++++++++++++++++ include/WHISPER/Messages/Protos/message.pb.h | 419 +++++++++++++++++ include/WHISPER/threadSafeQueue.hpp | 100 ++-- include/WHISPER/whisper.hpp | 23 +- src/WHISPER/InternalUDPService.cpp | 167 +++++++ src/WHISPER/Messages/Join.cpp | 3 +- src/WHISPER/Messages/Message.cpp | 2 +- src/WHISPER/threadSafeQueue.cpp | 102 ++--- src/WHISPER/whisper.cpp | 29 +- src/main.cpp | 25 +- 16 files changed, 1924 insertions(+), 136 deletions(-) create mode 100644 include/WHISPER/InternalUDPService.hpp create mode 100644 include/WHISPER/Messages/Protos/join.pb.cc create mode 100644 include/WHISPER/Messages/Protos/join.pb.h create mode 100644 include/WHISPER/Messages/Protos/message.pb.cc create mode 100644 include/WHISPER/Messages/Protos/message.pb.h create mode 100644 src/WHISPER/InternalUDPService.cpp diff --git a/.gitignore b/.gitignore index dc949bd..e9eaa67 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ build .cache -/include/WHISPER/Messages/Protos/*.pb.cc -/include/WHISPER/Messages/Protos/*.pb.h \ No newline at end of file +# /include/WHISPER/Messages/Protos/*.pb.cc +# /include/WHISPER/Messages/Protos/*.pb.h \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e816b9..369362d 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,8 @@ IF(NOT TARGET cppzmq) add_subdirectory(libs/cppzmq EXCLUDE_FROM_ALL) ENDIF() + + set(protobuf_BUILD_TESTS OFF CACHE INTERNAL "") set(protobuf_BUILD_EXAMPLES OFF CACHE INTERNAL "") add_subdirectory(libs/protobuf EXCLUDE_FROM_ALL) @@ -41,6 +43,10 @@ add_library(whisper-com STATIC include/WHISPER/whisper.hpp src/WHISPER/whisper.cpp + include/WHISPER/InternalUDPService.hpp + src/WHISPER/InternalUDPService.cpp + + include/WHISPER/threadSafeQueue.hpp src/WHISPER/threadSafeQueue.cpp @@ -52,7 +58,15 @@ add_library(whisper-com STATIC src/WHISPER/Messages/Join.cpp - ../include/WHISPER/Messages/Protos/message.pb.cc + + + + include/WHISPER/Messages/Protos/message.pb.h + include/WHISPER/Messages/Protos/message.pb.cc + + include/WHISPER/Messages/Protos/join.pb.h + include/WHISPER/Messages/Protos/join.pb.cc + ) @@ -64,6 +78,7 @@ target_link_libraries(whisper-com libprotobuf ) +add_dependencies(whisper-com protoc) target_include_directories(whisper-com PUBLIC $ @@ -73,7 +88,6 @@ target_include_directories(whisper-com PUBLIC add_executable(main src/main.cpp - ../include/WHISPER/Messages/Protos/join.pb.cc ) diff --git a/include/WHISPER/InternalUDPService.hpp b/include/WHISPER/InternalUDPService.hpp new file mode 100644 index 0000000..bed1e21 --- /dev/null +++ b/include/WHISPER/InternalUDPService.hpp @@ -0,0 +1,47 @@ +#pragma once + + +#include "WHISPER/whisper.hpp" +#include +#include + +#define ZMQ_BUILD_DRAFT_API 1 +#include +#include + +namespace WHISPER { + + class InternalUDPService : public whispercomm { + + + private: + std::string address_; + std::uint16_t port_; + std::uint16_t ownReceivingPort_; + + zmq::context_t ctx; + zmq::socket_t sender; + std::shared_ptr receiver; + + std::uint16_t checkPort(std::uint16_t port); + + void derivedConnect() override; + void derivedDisconnect() override; + void derivedPublish(std::string msg,std::string topic) override; + void derivedReceive() override; + void derivedSubscribe(std::string topic) override; + void derivedUnsubscribe(std::string topic) override; + + + public: + InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string address); + + + + + }; + + + + +} \ No newline at end of file diff --git a/include/WHISPER/Messages/Message.hpp b/include/WHISPER/Messages/Message.hpp index f1e7569..d1ea29b 100644 --- a/include/WHISPER/Messages/Message.hpp +++ b/include/WHISPER/Messages/Message.hpp @@ -66,14 +66,15 @@ namespace WHISPER { std::int32_t sourceType_; std::int32_t deviceId_; - void addPayLoad(std::shared_ptr any); - void addPayLoad(std::string any); + std::string serialize(); protected: - + void addPayLoad(std::shared_ptr any); + // void addPayLoad(std::string any); + messages::header::Message msg; }; diff --git a/include/WHISPER/Messages/Protos/join.pb.cc b/include/WHISPER/Messages/Protos/join.pb.cc new file mode 100644 index 0000000..9edd0a0 --- /dev/null +++ b/include/WHISPER/Messages/Protos/join.pb.cc @@ -0,0 +1,353 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: join.proto + +#include "join.pb.h" + +#include +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/extension_set.h" +#include "google/protobuf/wire_format_lite.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/reflection_ops.h" +#include "google/protobuf/wire_format.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" +PROTOBUF_PRAGMA_INIT_SEG +namespace _pb = ::PROTOBUF_NAMESPACE_ID; +namespace _pbi = ::PROTOBUF_NAMESPACE_ID::internal; +namespace messages { +namespace join { +PROTOBUF_CONSTEXPR Join::Join( + ::_pbi::ConstantInitialized): _impl_{ + /*decltype(_impl_.srcaddress_)*/{&::_pbi::fixed_address_empty_string, ::_pbi::ConstantInitialized{}} + , /*decltype(_impl_.port_)*/0u + , /*decltype(_impl_._cached_size_)*/{}} {} +struct JoinDefaultTypeInternal { + PROTOBUF_CONSTEXPR JoinDefaultTypeInternal() : _instance(::_pbi::ConstantInitialized{}) {} + ~JoinDefaultTypeInternal() {} + union { + Join _instance; + }; +}; + +PROTOBUF_ATTRIBUTE_NO_DESTROY PROTOBUF_CONSTINIT + PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 JoinDefaultTypeInternal _Join_default_instance_; +} // namespace join +} // namespace messages +static ::_pb::Metadata file_level_metadata_join_2eproto[1]; +static constexpr const ::_pb::EnumDescriptor** + file_level_enum_descriptors_join_2eproto = nullptr; +static constexpr const ::_pb::ServiceDescriptor** + file_level_service_descriptors_join_2eproto = nullptr; +const uint32_t TableStruct_join_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE( + protodesc_cold) = { + ~0u, // no _has_bits_ + PROTOBUF_FIELD_OFFSET(::messages::join::Join, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + ~0u, // no _inlined_string_donated_ + ~0u, // no _split_ + ~0u, // no sizeof(Split) + PROTOBUF_FIELD_OFFSET(::messages::join::Join, _impl_.port_), + PROTOBUF_FIELD_OFFSET(::messages::join::Join, _impl_.srcaddress_), +}; + +static const ::_pbi::MigrationSchema + schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + { 0, -1, -1, sizeof(::messages::join::Join)}, +}; + +static const ::_pb::Message* const file_default_instances[] = { + &::messages::join::_Join_default_instance_._instance, +}; +const char descriptor_table_protodef_join_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + "\n\njoin.proto\022\rmessages.join\"(\n\004Join\022\014\n\004p" + "ort\030\001 \001(\r\022\022\n\nsrcAddress\030\002 \001(\tb\006proto3" +}; +static ::absl::once_flag descriptor_table_join_2eproto_once; +const ::_pbi::DescriptorTable descriptor_table_join_2eproto = { + false, + false, + 77, + descriptor_table_protodef_join_2eproto, + "join.proto", + &descriptor_table_join_2eproto_once, + nullptr, + 0, + 1, + schemas, + file_default_instances, + TableStruct_join_2eproto::offsets, + file_level_metadata_join_2eproto, + file_level_enum_descriptors_join_2eproto, + file_level_service_descriptors_join_2eproto, +}; + +// This function exists to be marked as weak. +// It can significantly speed up compilation by breaking up LLVM's SCC +// in the .pb.cc translation units. Large translation units see a +// reduction of more than 35% of walltime for optimized builds. Without +// the weak attribute all the messages in the file, including all the +// vtables and everything they use become part of the same SCC through +// a cycle like: +// GetMetadata -> descriptor table -> default instances -> +// vtables -> GetMetadata +// By adding a weak function here we break the connection from the +// individual vtables back into the descriptor table. +PROTOBUF_ATTRIBUTE_WEAK const ::_pbi::DescriptorTable* descriptor_table_join_2eproto_getter() { + return &descriptor_table_join_2eproto; +} +// Force running AddDescriptors() at dynamic initialization time. +PROTOBUF_ATTRIBUTE_INIT_PRIORITY2 +static ::_pbi::AddDescriptorsRunner dynamic_init_dummy_join_2eproto(&descriptor_table_join_2eproto); +namespace messages { +namespace join { +// =================================================================== + +class Join::_Internal { + public: +}; + +Join::Join(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned) + : ::PROTOBUF_NAMESPACE_ID::Message(arena, is_message_owned) { + SharedCtor(arena, is_message_owned); + // @@protoc_insertion_point(arena_constructor:messages.join.Join) +} +Join::Join(const Join& from) + : ::PROTOBUF_NAMESPACE_ID::Message() { + Join* const _this = this; (void)_this; + new (&_impl_) Impl_{ + decltype(_impl_.srcaddress_){} + , decltype(_impl_.port_){} + , /*decltype(_impl_._cached_size_)*/{}}; + + _internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); + _impl_.srcaddress_.InitDefault(); + #ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + #endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING + if (!from._internal_srcaddress().empty()) { + _this->_impl_.srcaddress_.Set(from._internal_srcaddress(), + _this->GetArenaForAllocation()); + } + _this->_impl_.port_ = from._impl_.port_; + // @@protoc_insertion_point(copy_constructor:messages.join.Join) +} + +inline void Join::SharedCtor( + ::_pb::Arena* arena, bool is_message_owned) { + (void)arena; + (void)is_message_owned; + new (&_impl_) Impl_{ + decltype(_impl_.srcaddress_){} + , decltype(_impl_.port_){0u} + , /*decltype(_impl_._cached_size_)*/{} + }; + _impl_.srcaddress_.InitDefault(); + #ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + #endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING +} + +Join::~Join() { + // @@protoc_insertion_point(destructor:messages.join.Join) + if (auto *arena = _internal_metadata_.DeleteReturnArena<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>()) { + (void)arena; + return; + } + SharedDtor(); +} + +inline void Join::SharedDtor() { + GOOGLE_DCHECK(GetArenaForAllocation() == nullptr); + _impl_.srcaddress_.Destroy(); +} + +void Join::SetCachedSize(int size) const { + _impl_._cached_size_.Set(size); +} + +void Join::Clear() { +// @@protoc_insertion_point(message_clear_start:messages.join.Join) + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + _impl_.srcaddress_.ClearToEmpty(); + _impl_.port_ = 0u; + _internal_metadata_.Clear<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(); +} + +const char* Join::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx) { + +#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure + while (!ctx->Done(&ptr)) { + uint32_t tag; + ptr = ::_pbi::ReadTag(ptr, &tag); + switch (tag >> 3) { + // uint32 port = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 8)) { + _impl_.port_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // string srcAddress = 2; + case 2: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 18)) { + auto str = _internal_mutable_srcaddress(); + ptr = ::_pbi::InlineGreedyStringParser(str, ptr, ctx); + CHK_(ptr); + CHK_(::_pbi::VerifyUTF8(str, "messages.join.Join.srcAddress")); + } else + goto handle_unusual; + continue; + default: + goto handle_unusual; + } // switch + handle_unusual: + if ((tag == 0) || ((tag & 7) == 4)) { + CHK_(ptr); + ctx->SetLastTag(tag); + goto message_done; + } + ptr = UnknownFieldParse( + tag, + _internal_metadata_.mutable_unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(), + ptr, ctx); + CHK_(ptr != nullptr); + } // while +message_done: + return ptr; +failure: + ptr = nullptr; + goto message_done; +#undef CHK_ +} + +uint8_t* Join::_InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const { + + // @@protoc_insertion_point(serialize_to_array_start:messages.join.Join) + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + // uint32 port = 1; + if (this->_internal_port() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteUInt32ToArray(1, this->_internal_port(), target); + } + + // string srcAddress = 2; + if (!this->_internal_srcaddress().empty()) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( + this->_internal_srcaddress().data(), static_cast(this->_internal_srcaddress().length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, + "messages.join.Join.srcAddress"); + target = stream->WriteStringMaybeAliased( + 2, this->_internal_srcaddress(), target); + } + + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { + target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOBUF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); + } + // @@protoc_insertion_point(serialize_to_array_end:messages.join.Join) + return target; +} + +size_t Join::ByteSizeLong() const { + +// @@protoc_insertion_point(message_byte_size_start:messages.join.Join) + size_t total_size = 0; + + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + // string srcAddress = 2; + if (!this->_internal_srcaddress().empty()) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + this->_internal_srcaddress()); + } + + // uint32 port = 1; + if (this->_internal_port() != 0) { + total_size += ::_pbi::WireFormatLite::UInt32SizePlusOne(this->_internal_port()); + } + + return MaybeComputeUnknownFieldsSize(total_size, &_impl_._cached_size_); +} + +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData Join::_class_data_ = { + ::PROTOBUF_NAMESPACE_ID::Message::CopyWithSourceCheck, + Join::MergeImpl +}; +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*Join::GetClassData() const { return &_class_data_; } + + +void Join::MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg) { + auto* const _this = static_cast(&to_msg); + auto& from = static_cast(from_msg); + + // @@protoc_insertion_point(class_specific_merge_from_start:messages.join.Join) + GOOGLE_DCHECK_NE(&from, _this); + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + if (!from._internal_srcaddress().empty()) { + _this->_internal_set_srcaddress(from._internal_srcaddress()); + } + if (from._internal_port() != 0) { + _this->_internal_set_port(from._internal_port()); + } + _this->_internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); +} + +void Join::CopyFrom(const Join& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:messages.join.Join) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool Join::IsInitialized() const { + return true; +} + +void Join::InternalSwap(Join* other) { + using std::swap; + auto* lhs_arena = GetArenaForAllocation(); + auto* rhs_arena = other->GetArenaForAllocation(); + _internal_metadata_.InternalSwap(&other->_internal_metadata_); + ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr::InternalSwap( + &_impl_.srcaddress_, lhs_arena, + &other->_impl_.srcaddress_, rhs_arena + ); + swap(_impl_.port_, other->_impl_.port_); +} + +::PROTOBUF_NAMESPACE_ID::Metadata Join::GetMetadata() const { + + return ::_pbi::AssignDescriptors( + &descriptor_table_join_2eproto_getter, &descriptor_table_join_2eproto_once, + file_level_metadata_join_2eproto[0]); +} +// @@protoc_insertion_point(namespace_scope) +} // namespace join +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template<> PROTOBUF_NOINLINE ::messages::join::Join* +Arena::CreateMaybeMessage< ::messages::join::Join >(Arena* arena) { + return Arena::CreateMessageInternal< ::messages::join::Join >(arena); +} +PROTOBUF_NAMESPACE_CLOSE +// @@protoc_insertion_point(global_scope) +#include "google/protobuf/port_undef.inc" diff --git a/include/WHISPER/Messages/Protos/join.pb.h b/include/WHISPER/Messages/Protos/join.pb.h new file mode 100644 index 0000000..415c872 --- /dev/null +++ b/include/WHISPER/Messages/Protos/join.pb.h @@ -0,0 +1,329 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: join.proto + +#ifndef GOOGLE_PROTOBUF_INCLUDED_join_2eproto_2epb_2eh +#define GOOGLE_PROTOBUF_INCLUDED_join_2eproto_2epb_2eh + +#include +#include +#include + +#include "google/protobuf/port_def.inc" +#if PROTOBUF_VERSION < 3021000 +#error "This file was generated by a newer version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please update" +#error "your headers." +#endif // PROTOBUF_VERSION + +#if 3021008 < PROTOBUF_MIN_PROTOC_VERSION +#error "This file was generated by an older version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please" +#error "regenerate this file with a newer version of protoc." +#endif // PROTOBUF_MIN_PROTOC_VERSION +#include "google/protobuf/port_undef.inc" +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/arena.h" +#include "google/protobuf/arenastring.h" +#include "google/protobuf/generated_message_util.h" +#include "google/protobuf/metadata_lite.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/message.h" +#include "google/protobuf/repeated_field.h" // IWYU pragma: export +#include "google/protobuf/extension_set.h" // IWYU pragma: export +#include "google/protobuf/unknown_field_set.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" + +#define PROTOBUF_INTERNAL_EXPORT_join_2eproto + +PROTOBUF_NAMESPACE_OPEN +namespace internal { +class AnyMetadata; +} // namespace internal +PROTOBUF_NAMESPACE_CLOSE + +// Internal implementation detail -- do not use these members. +struct TableStruct_join_2eproto { + static const uint32_t offsets[]; +}; +extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable + descriptor_table_join_2eproto; +namespace messages { +namespace join { +class Join; +struct JoinDefaultTypeInternal; +extern JoinDefaultTypeInternal _Join_default_instance_; +} // namespace join +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template <> +::messages::join::Join* Arena::CreateMaybeMessage<::messages::join::Join>(Arena*); +PROTOBUF_NAMESPACE_CLOSE + +namespace messages { +namespace join { + +// =================================================================== + + +// ------------------------------------------------------------------- + +class Join final : + public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:messages.join.Join) */ { + public: + inline Join() : Join(nullptr) {} + ~Join() override; + explicit PROTOBUF_CONSTEXPR Join(::PROTOBUF_NAMESPACE_ID::internal::ConstantInitialized); + + Join(const Join& from); + Join(Join&& from) noexcept + : Join() { + *this = ::std::move(from); + } + + inline Join& operator=(const Join& from) { + CopyFrom(from); + return *this; + } + inline Join& operator=(Join&& from) noexcept { + if (this == &from) return *this; + if (GetOwningArena() == from.GetOwningArena() + #ifdef PROTOBUF_FORCE_COPY_IN_MOVE + && GetOwningArena() != nullptr + #endif // !PROTOBUF_FORCE_COPY_IN_MOVE + ) { + InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { + return GetDescriptor(); + } + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { + return default_instance().GetMetadata().descriptor; + } + static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { + return default_instance().GetMetadata().reflection; + } + static const Join& default_instance() { + return *internal_default_instance(); + } + static inline const Join* internal_default_instance() { + return reinterpret_cast( + &_Join_default_instance_); + } + static constexpr int kIndexInFileMessages = + 0; + + friend void swap(Join& a, Join& b) { + a.Swap(&b); + } + inline void Swap(Join* other) { + if (other == this) return; + #ifdef PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() != nullptr && + GetOwningArena() == other->GetOwningArena()) { + #else // PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() == other->GetOwningArena()) { + #endif // !PROTOBUF_FORCE_COPY_IN_SWAP + InternalSwap(other); + } else { + ::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other); + } + } + void UnsafeArenaSwap(Join* other) { + if (other == this) return; + GOOGLE_DCHECK(GetOwningArena() == other->GetOwningArena()); + InternalSwap(other); + } + + // implements Message ---------------------------------------------- + + Join* New(::PROTOBUF_NAMESPACE_ID::Arena* arena = nullptr) const final { + return CreateMaybeMessage(arena); + } + using ::PROTOBUF_NAMESPACE_ID::Message::CopyFrom; + void CopyFrom(const Join& from); + using ::PROTOBUF_NAMESPACE_ID::Message::MergeFrom; + void MergeFrom( const Join& from) { + Join::MergeImpl(*this, from); + } + private: + static void MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg); + public: + PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; + uint8_t* _InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final; + int GetCachedSize() const final { return _impl_._cached_size_.Get(); } + + private: + void SharedCtor(::PROTOBUF_NAMESPACE_ID::Arena* arena, bool is_message_owned); + void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(Join* other); + + private: + friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; + static ::absl::string_view FullMessageName() { + return "messages.join.Join"; + } + protected: + explicit Join(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned = false); + public: + + static const ClassData _class_data_; + const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*GetClassData() const final; + + ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + enum : int { + kSrcAddressFieldNumber = 2, + kPortFieldNumber = 1, + }; + // string srcAddress = 2; + void clear_srcaddress(); + const std::string& srcaddress() const; + template + void set_srcaddress(ArgT0&& arg0, ArgT... args); + std::string* mutable_srcaddress(); + PROTOBUF_NODISCARD std::string* release_srcaddress(); + void set_allocated_srcaddress(std::string* srcaddress); + private: + const std::string& _internal_srcaddress() const; + inline PROTOBUF_ALWAYS_INLINE void _internal_set_srcaddress(const std::string& value); + std::string* _internal_mutable_srcaddress(); + public: + + // uint32 port = 1; + void clear_port(); + uint32_t port() const; + void set_port(uint32_t value); + private: + uint32_t _internal_port() const; + void _internal_set_port(uint32_t value); + public: + + // @@protoc_insertion_point(class_scope:messages.join.Join) + private: + class _Internal; + + template friend class ::PROTOBUF_NAMESPACE_ID::Arena::InternalHelper; + typedef void InternalArenaConstructable_; + typedef void DestructorSkippable_; + struct Impl_ { + ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr srcaddress_; + uint32_t port_; + mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; + }; + union { Impl_ _impl_; }; + friend struct ::TableStruct_join_2eproto; +}; +// =================================================================== + + + + +// =================================================================== + + +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wstrict-aliasing" +#endif // __GNUC__ +// ------------------------------------------------------------------- + +// Join + +// uint32 port = 1; +inline void Join::clear_port() { + _impl_.port_ = 0u; +} +inline uint32_t Join::_internal_port() const { + return _impl_.port_; +} +inline uint32_t Join::port() const { + // @@protoc_insertion_point(field_get:messages.join.Join.port) + return _internal_port(); +} +inline void Join::_internal_set_port(uint32_t value) { + + _impl_.port_ = value; +} +inline void Join::set_port(uint32_t value) { + _internal_set_port(value); + // @@protoc_insertion_point(field_set:messages.join.Join.port) +} + +// string srcAddress = 2; +inline void Join::clear_srcaddress() { + _impl_.srcaddress_.ClearToEmpty(); +} +inline const std::string& Join::srcaddress() const { + // @@protoc_insertion_point(field_get:messages.join.Join.srcAddress) + return _internal_srcaddress(); +} +template +inline PROTOBUF_ALWAYS_INLINE +void Join::set_srcaddress(ArgT0&& arg0, ArgT... args) { + + _impl_.srcaddress_.Set(static_cast(arg0), args..., GetArenaForAllocation()); + // @@protoc_insertion_point(field_set:messages.join.Join.srcAddress) +} +inline std::string* Join::mutable_srcaddress() { + std::string* _s = _internal_mutable_srcaddress(); + // @@protoc_insertion_point(field_mutable:messages.join.Join.srcAddress) + return _s; +} +inline const std::string& Join::_internal_srcaddress() const { + return _impl_.srcaddress_.Get(); +} +inline void Join::_internal_set_srcaddress(const std::string& value) { + + _impl_.srcaddress_.Set(value, GetArenaForAllocation()); +} +inline std::string* Join::_internal_mutable_srcaddress() { + + return _impl_.srcaddress_.Mutable(GetArenaForAllocation()); +} +inline std::string* Join::release_srcaddress() { + // @@protoc_insertion_point(field_release:messages.join.Join.srcAddress) + return _impl_.srcaddress_.Release(); +} +inline void Join::set_allocated_srcaddress(std::string* srcaddress) { + _impl_.srcaddress_.SetAllocated(srcaddress, GetArenaForAllocation()); +#ifdef PROTOBUF_FORCE_COPY_DEFAULT_STRING + if (_impl_.srcaddress_.IsDefault()) { + _impl_.srcaddress_.Set("", GetArenaForAllocation()); + } +#endif // PROTOBUF_FORCE_COPY_DEFAULT_STRING + // @@protoc_insertion_point(field_set_allocated:messages.join.Join.srcAddress) +} + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif // __GNUC__ + +// @@protoc_insertion_point(namespace_scope) +} // namespace join +} // namespace messages + + +// @@protoc_insertion_point(global_scope) + +#include "google/protobuf/port_undef.inc" + +#endif // GOOGLE_PROTOBUF_INCLUDED_join_2eproto_2epb_2eh diff --git a/include/WHISPER/Messages/Protos/message.pb.cc b/include/WHISPER/Messages/Protos/message.pb.cc new file mode 100644 index 0000000..7f6a958 --- /dev/null +++ b/include/WHISPER/Messages/Protos/message.pb.cc @@ -0,0 +1,432 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: message.proto + +#include "message.pb.h" + +#include +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/extension_set.h" +#include "google/protobuf/wire_format_lite.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/reflection_ops.h" +#include "google/protobuf/wire_format.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" +PROTOBUF_PRAGMA_INIT_SEG +namespace _pb = ::PROTOBUF_NAMESPACE_ID; +namespace _pbi = ::PROTOBUF_NAMESPACE_ID::internal; +namespace messages { +namespace header { +PROTOBUF_CONSTEXPR Message::Message( + ::_pbi::ConstantInitialized): _impl_{ + /*decltype(_impl_.payload_)*/{} + , /*decltype(_impl_.topic_)*/0 + , /*decltype(_impl_.msgtype_)*/0 + , /*decltype(_impl_.sourcetype_)*/0 + , /*decltype(_impl_.sourceid_)*/0 + , /*decltype(_impl_._cached_size_)*/{}} {} +struct MessageDefaultTypeInternal { + PROTOBUF_CONSTEXPR MessageDefaultTypeInternal() : _instance(::_pbi::ConstantInitialized{}) {} + ~MessageDefaultTypeInternal() {} + union { + Message _instance; + }; +}; + +PROTOBUF_ATTRIBUTE_NO_DESTROY PROTOBUF_CONSTINIT + PROTOBUF_ATTRIBUTE_INIT_PRIORITY1 MessageDefaultTypeInternal _Message_default_instance_; +} // namespace header +} // namespace messages +static ::_pb::Metadata file_level_metadata_message_2eproto[1]; +static constexpr const ::_pb::EnumDescriptor** + file_level_enum_descriptors_message_2eproto = nullptr; +static constexpr const ::_pb::ServiceDescriptor** + file_level_service_descriptors_message_2eproto = nullptr; +const uint32_t TableStruct_message_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE( + protodesc_cold) = { + ~0u, // no _has_bits_ + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + ~0u, // no _inlined_string_donated_ + ~0u, // no _split_ + ~0u, // no sizeof(Split) + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _impl_.topic_), + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _impl_.msgtype_), + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _impl_.sourcetype_), + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _impl_.sourceid_), + PROTOBUF_FIELD_OFFSET(::messages::header::Message, _impl_.payload_), +}; + +static const ::_pbi::MigrationSchema + schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + { 0, -1, -1, sizeof(::messages::header::Message)}, +}; + +static const ::_pb::Message* const file_default_instances[] = { + &::messages::header::_Message_default_instance_._instance, +}; +const char descriptor_table_protodef_message_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { + "\n\rmessage.proto\022\017messages.header\032\031google" + "/protobuf/any.proto\"v\n\007Message\022\r\n\005topic\030" + "\001 \001(\005\022\017\n\007msgType\030\002 \001(\005\022\022\n\nsourceType\030\003 \001" + "(\005\022\020\n\010sourceID\030\004 \001(\005\022%\n\007payload\030\005 \003(\0132\024." + "google.protobuf.Anyb\006proto3" +}; +static const ::_pbi::DescriptorTable* const descriptor_table_message_2eproto_deps[1] = + { + &::descriptor_table_google_2fprotobuf_2fany_2eproto, +}; +static ::absl::once_flag descriptor_table_message_2eproto_once; +const ::_pbi::DescriptorTable descriptor_table_message_2eproto = { + false, + false, + 187, + descriptor_table_protodef_message_2eproto, + "message.proto", + &descriptor_table_message_2eproto_once, + descriptor_table_message_2eproto_deps, + 1, + 1, + schemas, + file_default_instances, + TableStruct_message_2eproto::offsets, + file_level_metadata_message_2eproto, + file_level_enum_descriptors_message_2eproto, + file_level_service_descriptors_message_2eproto, +}; + +// This function exists to be marked as weak. +// It can significantly speed up compilation by breaking up LLVM's SCC +// in the .pb.cc translation units. Large translation units see a +// reduction of more than 35% of walltime for optimized builds. Without +// the weak attribute all the messages in the file, including all the +// vtables and everything they use become part of the same SCC through +// a cycle like: +// GetMetadata -> descriptor table -> default instances -> +// vtables -> GetMetadata +// By adding a weak function here we break the connection from the +// individual vtables back into the descriptor table. +PROTOBUF_ATTRIBUTE_WEAK const ::_pbi::DescriptorTable* descriptor_table_message_2eproto_getter() { + return &descriptor_table_message_2eproto; +} +// Force running AddDescriptors() at dynamic initialization time. +PROTOBUF_ATTRIBUTE_INIT_PRIORITY2 +static ::_pbi::AddDescriptorsRunner dynamic_init_dummy_message_2eproto(&descriptor_table_message_2eproto); +namespace messages { +namespace header { +// =================================================================== + +class Message::_Internal { + public: +}; + +void Message::clear_payload() { + _impl_.payload_.Clear(); +} +Message::Message(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned) + : ::PROTOBUF_NAMESPACE_ID::Message(arena, is_message_owned) { + SharedCtor(arena, is_message_owned); + // @@protoc_insertion_point(arena_constructor:messages.header.Message) +} +Message::Message(const Message& from) + : ::PROTOBUF_NAMESPACE_ID::Message() { + Message* const _this = this; (void)_this; + new (&_impl_) Impl_{ + decltype(_impl_.payload_){from._impl_.payload_} + , decltype(_impl_.topic_){} + , decltype(_impl_.msgtype_){} + , decltype(_impl_.sourcetype_){} + , decltype(_impl_.sourceid_){} + , /*decltype(_impl_._cached_size_)*/{}}; + + _internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); + ::memcpy(&_impl_.topic_, &from._impl_.topic_, + static_cast(reinterpret_cast(&_impl_.sourceid_) - + reinterpret_cast(&_impl_.topic_)) + sizeof(_impl_.sourceid_)); + // @@protoc_insertion_point(copy_constructor:messages.header.Message) +} + +inline void Message::SharedCtor( + ::_pb::Arena* arena, bool is_message_owned) { + (void)arena; + (void)is_message_owned; + new (&_impl_) Impl_{ + decltype(_impl_.payload_){arena} + , decltype(_impl_.topic_){0} + , decltype(_impl_.msgtype_){0} + , decltype(_impl_.sourcetype_){0} + , decltype(_impl_.sourceid_){0} + , /*decltype(_impl_._cached_size_)*/{} + }; +} + +Message::~Message() { + // @@protoc_insertion_point(destructor:messages.header.Message) + if (auto *arena = _internal_metadata_.DeleteReturnArena<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>()) { + (void)arena; + return; + } + SharedDtor(); +} + +inline void Message::SharedDtor() { + GOOGLE_DCHECK(GetArenaForAllocation() == nullptr); + _impl_.payload_.~RepeatedPtrField(); +} + +void Message::SetCachedSize(int size) const { + _impl_._cached_size_.Set(size); +} + +void Message::Clear() { +// @@protoc_insertion_point(message_clear_start:messages.header.Message) + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + _impl_.payload_.Clear(); + ::memset(&_impl_.topic_, 0, static_cast( + reinterpret_cast(&_impl_.sourceid_) - + reinterpret_cast(&_impl_.topic_)) + sizeof(_impl_.sourceid_)); + _internal_metadata_.Clear<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(); +} + +const char* Message::_InternalParse(const char* ptr, ::_pbi::ParseContext* ctx) { + +#define CHK_(x) if (PROTOBUF_PREDICT_FALSE(!(x))) goto failure + while (!ctx->Done(&ptr)) { + uint32_t tag; + ptr = ::_pbi::ReadTag(ptr, &tag); + switch (tag >> 3) { + // int32 topic = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 8)) { + _impl_.topic_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // int32 msgType = 2; + case 2: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 16)) { + _impl_.msgtype_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // int32 sourceType = 3; + case 3: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 24)) { + _impl_.sourcetype_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // int32 sourceID = 4; + case 4: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 32)) { + _impl_.sourceid_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint32(&ptr); + CHK_(ptr); + } else + goto handle_unusual; + continue; + // repeated .google.protobuf.Any payload = 5; + case 5: + if (PROTOBUF_PREDICT_TRUE(static_cast(tag) == 42)) { + ptr -= 1; + do { + ptr += 1; + ptr = ctx->ParseMessage(_internal_add_payload(), ptr); + CHK_(ptr); + if (!ctx->DataAvailable(ptr)) break; + } while (::PROTOBUF_NAMESPACE_ID::internal::ExpectTag<42>(ptr)); + } else + goto handle_unusual; + continue; + default: + goto handle_unusual; + } // switch + handle_unusual: + if ((tag == 0) || ((tag & 7) == 4)) { + CHK_(ptr); + ctx->SetLastTag(tag); + goto message_done; + } + ptr = UnknownFieldParse( + tag, + _internal_metadata_.mutable_unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(), + ptr, ctx); + CHK_(ptr != nullptr); + } // while +message_done: + return ptr; +failure: + ptr = nullptr; + goto message_done; +#undef CHK_ +} + +uint8_t* Message::_InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const { + + // @@protoc_insertion_point(serialize_to_array_start:messages.header.Message) + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + // int32 topic = 1; + if (this->_internal_topic() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteInt32ToArray(1, this->_internal_topic(), target); + } + + // int32 msgType = 2; + if (this->_internal_msgtype() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteInt32ToArray(2, this->_internal_msgtype(), target); + } + + // int32 sourceType = 3; + if (this->_internal_sourcetype() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteInt32ToArray(3, this->_internal_sourcetype(), target); + } + + // int32 sourceID = 4; + if (this->_internal_sourceid() != 0) { + target = stream->EnsureSpace(target); + target = ::_pbi::WireFormatLite::WriteInt32ToArray(4, this->_internal_sourceid(), target); + } + + // repeated .google.protobuf.Any payload = 5; + for (unsigned i = 0, + n = static_cast(this->_internal_payload_size()); i < n; i++) { + const auto& repfield = this->_internal_payload(i); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: + InternalWriteMessage(5, repfield, repfield.GetCachedSize(), target, stream); + } + + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { + target = ::_pbi::WireFormat::InternalSerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(::PROTOBUF_NAMESPACE_ID::UnknownFieldSet::default_instance), target, stream); + } + // @@protoc_insertion_point(serialize_to_array_end:messages.header.Message) + return target; +} + +size_t Message::ByteSizeLong() const { + +// @@protoc_insertion_point(message_byte_size_start:messages.header.Message) + size_t total_size = 0; + + uint32_t cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + // repeated .google.protobuf.Any payload = 5; + total_size += 1UL * this->_internal_payload_size(); + for (const auto& msg : this->_impl_.payload_) { + total_size += + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(msg); + } + + // int32 topic = 1; + if (this->_internal_topic() != 0) { + total_size += ::_pbi::WireFormatLite::Int32SizePlusOne(this->_internal_topic()); + } + + // int32 msgType = 2; + if (this->_internal_msgtype() != 0) { + total_size += ::_pbi::WireFormatLite::Int32SizePlusOne(this->_internal_msgtype()); + } + + // int32 sourceType = 3; + if (this->_internal_sourcetype() != 0) { + total_size += ::_pbi::WireFormatLite::Int32SizePlusOne(this->_internal_sourcetype()); + } + + // int32 sourceID = 4; + if (this->_internal_sourceid() != 0) { + total_size += ::_pbi::WireFormatLite::Int32SizePlusOne(this->_internal_sourceid()); + } + + return MaybeComputeUnknownFieldsSize(total_size, &_impl_._cached_size_); +} + +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData Message::_class_data_ = { + ::PROTOBUF_NAMESPACE_ID::Message::CopyWithSourceCheck, + Message::MergeImpl +}; +const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*Message::GetClassData() const { return &_class_data_; } + + +void Message::MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg) { + auto* const _this = static_cast(&to_msg); + auto& from = static_cast(from_msg); + + // @@protoc_insertion_point(class_specific_merge_from_start:messages.header.Message) + GOOGLE_DCHECK_NE(&from, _this); + uint32_t cached_has_bits = 0; + (void) cached_has_bits; + + _this->_impl_.payload_.MergeFrom(from._impl_.payload_); + if (from._internal_topic() != 0) { + _this->_internal_set_topic(from._internal_topic()); + } + if (from._internal_msgtype() != 0) { + _this->_internal_set_msgtype(from._internal_msgtype()); + } + if (from._internal_sourcetype() != 0) { + _this->_internal_set_sourcetype(from._internal_sourcetype()); + } + if (from._internal_sourceid() != 0) { + _this->_internal_set_sourceid(from._internal_sourceid()); + } + _this->_internal_metadata_.MergeFrom<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(from._internal_metadata_); +} + +void Message::CopyFrom(const Message& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:messages.header.Message) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool Message::IsInitialized() const { + return true; +} + +void Message::InternalSwap(Message* other) { + using std::swap; + _internal_metadata_.InternalSwap(&other->_internal_metadata_); + _impl_.payload_.InternalSwap(&other->_impl_.payload_); + ::PROTOBUF_NAMESPACE_ID::internal::memswap< + PROTOBUF_FIELD_OFFSET(Message, _impl_.sourceid_) + + sizeof(Message::_impl_.sourceid_) + - PROTOBUF_FIELD_OFFSET(Message, _impl_.topic_)>( + reinterpret_cast(&_impl_.topic_), + reinterpret_cast(&other->_impl_.topic_)); +} + +::PROTOBUF_NAMESPACE_ID::Metadata Message::GetMetadata() const { + + return ::_pbi::AssignDescriptors( + &descriptor_table_message_2eproto_getter, &descriptor_table_message_2eproto_once, + file_level_metadata_message_2eproto[0]); +} +// @@protoc_insertion_point(namespace_scope) +} // namespace header +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template<> PROTOBUF_NOINLINE ::messages::header::Message* +Arena::CreateMaybeMessage< ::messages::header::Message >(Arena* arena) { + return Arena::CreateMessageInternal< ::messages::header::Message >(arena); +} +PROTOBUF_NAMESPACE_CLOSE +// @@protoc_insertion_point(global_scope) +#include "google/protobuf/port_undef.inc" diff --git a/include/WHISPER/Messages/Protos/message.pb.h b/include/WHISPER/Messages/Protos/message.pb.h new file mode 100644 index 0000000..cfbb761 --- /dev/null +++ b/include/WHISPER/Messages/Protos/message.pb.h @@ -0,0 +1,419 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: message.proto + +#ifndef GOOGLE_PROTOBUF_INCLUDED_message_2eproto_2epb_2eh +#define GOOGLE_PROTOBUF_INCLUDED_message_2eproto_2epb_2eh + +#include +#include +#include + +#include "google/protobuf/port_def.inc" +#if PROTOBUF_VERSION < 3021000 +#error "This file was generated by a newer version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please update" +#error "your headers." +#endif // PROTOBUF_VERSION + +#if 3021008 < PROTOBUF_MIN_PROTOC_VERSION +#error "This file was generated by an older version of protoc which is" +#error "incompatible with your Protocol Buffer headers. Please" +#error "regenerate this file with a newer version of protoc." +#endif // PROTOBUF_MIN_PROTOC_VERSION +#include "google/protobuf/port_undef.inc" +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/arena.h" +#include "google/protobuf/arenastring.h" +#include "google/protobuf/generated_message_util.h" +#include "google/protobuf/metadata_lite.h" +#include "google/protobuf/generated_message_reflection.h" +#include "google/protobuf/message.h" +#include "google/protobuf/repeated_field.h" // IWYU pragma: export +#include "google/protobuf/extension_set.h" // IWYU pragma: export +#include "google/protobuf/unknown_field_set.h" +#include "google/protobuf/any.pb.h" +// @@protoc_insertion_point(includes) + +// Must be included last. +#include "google/protobuf/port_def.inc" + +#define PROTOBUF_INTERNAL_EXPORT_message_2eproto + +PROTOBUF_NAMESPACE_OPEN +namespace internal { +class AnyMetadata; +} // namespace internal +PROTOBUF_NAMESPACE_CLOSE + +// Internal implementation detail -- do not use these members. +struct TableStruct_message_2eproto { + static const uint32_t offsets[]; +}; +extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable + descriptor_table_message_2eproto; +namespace messages { +namespace header { +class Message; +struct MessageDefaultTypeInternal; +extern MessageDefaultTypeInternal _Message_default_instance_; +} // namespace header +} // namespace messages +PROTOBUF_NAMESPACE_OPEN +template <> +::messages::header::Message* Arena::CreateMaybeMessage<::messages::header::Message>(Arena*); +PROTOBUF_NAMESPACE_CLOSE + +namespace messages { +namespace header { + +// =================================================================== + + +// ------------------------------------------------------------------- + +class Message final : + public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:messages.header.Message) */ { + public: + inline Message() : Message(nullptr) {} + ~Message() override; + explicit PROTOBUF_CONSTEXPR Message(::PROTOBUF_NAMESPACE_ID::internal::ConstantInitialized); + + Message(const Message& from); + Message(Message&& from) noexcept + : Message() { + *this = ::std::move(from); + } + + inline Message& operator=(const Message& from) { + CopyFrom(from); + return *this; + } + inline Message& operator=(Message&& from) noexcept { + if (this == &from) return *this; + if (GetOwningArena() == from.GetOwningArena() + #ifdef PROTOBUF_FORCE_COPY_IN_MOVE + && GetOwningArena() != nullptr + #endif // !PROTOBUF_FORCE_COPY_IN_MOVE + ) { + InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() { + return GetDescriptor(); + } + static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() { + return default_instance().GetMetadata().descriptor; + } + static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() { + return default_instance().GetMetadata().reflection; + } + static const Message& default_instance() { + return *internal_default_instance(); + } + static inline const Message* internal_default_instance() { + return reinterpret_cast( + &_Message_default_instance_); + } + static constexpr int kIndexInFileMessages = + 0; + + friend void swap(Message& a, Message& b) { + a.Swap(&b); + } + inline void Swap(Message* other) { + if (other == this) return; + #ifdef PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() != nullptr && + GetOwningArena() == other->GetOwningArena()) { + #else // PROTOBUF_FORCE_COPY_IN_SWAP + if (GetOwningArena() == other->GetOwningArena()) { + #endif // !PROTOBUF_FORCE_COPY_IN_SWAP + InternalSwap(other); + } else { + ::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other); + } + } + void UnsafeArenaSwap(Message* other) { + if (other == this) return; + GOOGLE_DCHECK(GetOwningArena() == other->GetOwningArena()); + InternalSwap(other); + } + + // implements Message ---------------------------------------------- + + Message* New(::PROTOBUF_NAMESPACE_ID::Arena* arena = nullptr) const final { + return CreateMaybeMessage(arena); + } + using ::PROTOBUF_NAMESPACE_ID::Message::CopyFrom; + void CopyFrom(const Message& from); + using ::PROTOBUF_NAMESPACE_ID::Message::MergeFrom; + void MergeFrom( const Message& from) { + Message::MergeImpl(*this, from); + } + private: + static void MergeImpl(::PROTOBUF_NAMESPACE_ID::Message& to_msg, const ::PROTOBUF_NAMESPACE_ID::Message& from_msg); + public: + PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final; + uint8_t* _InternalSerialize( + uint8_t* target, ::PROTOBUF_NAMESPACE_ID::io::EpsCopyOutputStream* stream) const final; + int GetCachedSize() const final { return _impl_._cached_size_.Get(); } + + private: + void SharedCtor(::PROTOBUF_NAMESPACE_ID::Arena* arena, bool is_message_owned); + void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(Message* other); + + private: + friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata; + static ::absl::string_view FullMessageName() { + return "messages.header.Message"; + } + protected: + explicit Message(::PROTOBUF_NAMESPACE_ID::Arena* arena, + bool is_message_owned = false); + public: + + static const ClassData _class_data_; + const ::PROTOBUF_NAMESPACE_ID::Message::ClassData*GetClassData() const final; + + ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + enum : int { + kPayloadFieldNumber = 5, + kTopicFieldNumber = 1, + kMsgTypeFieldNumber = 2, + kSourceTypeFieldNumber = 3, + kSourceIDFieldNumber = 4, + }; + // repeated .google.protobuf.Any payload = 5; + int payload_size() const; + private: + int _internal_payload_size() const; + public: + void clear_payload(); + ::PROTOBUF_NAMESPACE_ID::Any* mutable_payload(int index); + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any >* + mutable_payload(); + private: + const ::PROTOBUF_NAMESPACE_ID::Any& _internal_payload(int index) const; + ::PROTOBUF_NAMESPACE_ID::Any* _internal_add_payload(); + public: + const ::PROTOBUF_NAMESPACE_ID::Any& payload(int index) const; + ::PROTOBUF_NAMESPACE_ID::Any* add_payload(); + const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any >& + payload() const; + + // int32 topic = 1; + void clear_topic(); + int32_t topic() const; + void set_topic(int32_t value); + private: + int32_t _internal_topic() const; + void _internal_set_topic(int32_t value); + public: + + // int32 msgType = 2; + void clear_msgtype(); + int32_t msgtype() const; + void set_msgtype(int32_t value); + private: + int32_t _internal_msgtype() const; + void _internal_set_msgtype(int32_t value); + public: + + // int32 sourceType = 3; + void clear_sourcetype(); + int32_t sourcetype() const; + void set_sourcetype(int32_t value); + private: + int32_t _internal_sourcetype() const; + void _internal_set_sourcetype(int32_t value); + public: + + // int32 sourceID = 4; + void clear_sourceid(); + int32_t sourceid() const; + void set_sourceid(int32_t value); + private: + int32_t _internal_sourceid() const; + void _internal_set_sourceid(int32_t value); + public: + + // @@protoc_insertion_point(class_scope:messages.header.Message) + private: + class _Internal; + + template friend class ::PROTOBUF_NAMESPACE_ID::Arena::InternalHelper; + typedef void InternalArenaConstructable_; + typedef void DestructorSkippable_; + struct Impl_ { + ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any > payload_; + int32_t topic_; + int32_t msgtype_; + int32_t sourcetype_; + int32_t sourceid_; + mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; + }; + union { Impl_ _impl_; }; + friend struct ::TableStruct_message_2eproto; +}; +// =================================================================== + + + + +// =================================================================== + + +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wstrict-aliasing" +#endif // __GNUC__ +// ------------------------------------------------------------------- + +// Message + +// int32 topic = 1; +inline void Message::clear_topic() { + _impl_.topic_ = 0; +} +inline int32_t Message::_internal_topic() const { + return _impl_.topic_; +} +inline int32_t Message::topic() const { + // @@protoc_insertion_point(field_get:messages.header.Message.topic) + return _internal_topic(); +} +inline void Message::_internal_set_topic(int32_t value) { + + _impl_.topic_ = value; +} +inline void Message::set_topic(int32_t value) { + _internal_set_topic(value); + // @@protoc_insertion_point(field_set:messages.header.Message.topic) +} + +// int32 msgType = 2; +inline void Message::clear_msgtype() { + _impl_.msgtype_ = 0; +} +inline int32_t Message::_internal_msgtype() const { + return _impl_.msgtype_; +} +inline int32_t Message::msgtype() const { + // @@protoc_insertion_point(field_get:messages.header.Message.msgType) + return _internal_msgtype(); +} +inline void Message::_internal_set_msgtype(int32_t value) { + + _impl_.msgtype_ = value; +} +inline void Message::set_msgtype(int32_t value) { + _internal_set_msgtype(value); + // @@protoc_insertion_point(field_set:messages.header.Message.msgType) +} + +// int32 sourceType = 3; +inline void Message::clear_sourcetype() { + _impl_.sourcetype_ = 0; +} +inline int32_t Message::_internal_sourcetype() const { + return _impl_.sourcetype_; +} +inline int32_t Message::sourcetype() const { + // @@protoc_insertion_point(field_get:messages.header.Message.sourceType) + return _internal_sourcetype(); +} +inline void Message::_internal_set_sourcetype(int32_t value) { + + _impl_.sourcetype_ = value; +} +inline void Message::set_sourcetype(int32_t value) { + _internal_set_sourcetype(value); + // @@protoc_insertion_point(field_set:messages.header.Message.sourceType) +} + +// int32 sourceID = 4; +inline void Message::clear_sourceid() { + _impl_.sourceid_ = 0; +} +inline int32_t Message::_internal_sourceid() const { + return _impl_.sourceid_; +} +inline int32_t Message::sourceid() const { + // @@protoc_insertion_point(field_get:messages.header.Message.sourceID) + return _internal_sourceid(); +} +inline void Message::_internal_set_sourceid(int32_t value) { + + _impl_.sourceid_ = value; +} +inline void Message::set_sourceid(int32_t value) { + _internal_set_sourceid(value); + // @@protoc_insertion_point(field_set:messages.header.Message.sourceID) +} + +// repeated .google.protobuf.Any payload = 5; +inline int Message::_internal_payload_size() const { + return _impl_.payload_.size(); +} +inline int Message::payload_size() const { + return _internal_payload_size(); +} +inline ::PROTOBUF_NAMESPACE_ID::Any* Message::mutable_payload(int index) { + // @@protoc_insertion_point(field_mutable:messages.header.Message.payload) + return _impl_.payload_.Mutable(index); +} +inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any >* +Message::mutable_payload() { + // @@protoc_insertion_point(field_mutable_list:messages.header.Message.payload) + return &_impl_.payload_; +} +inline const ::PROTOBUF_NAMESPACE_ID::Any& Message::_internal_payload(int index) const { + return _impl_.payload_.Get(index); +} +inline const ::PROTOBUF_NAMESPACE_ID::Any& Message::payload(int index) const { + // @@protoc_insertion_point(field_get:messages.header.Message.payload) + return _internal_payload(index); +} +inline ::PROTOBUF_NAMESPACE_ID::Any* Message::_internal_add_payload() { + return _impl_.payload_.Add(); +} +inline ::PROTOBUF_NAMESPACE_ID::Any* Message::add_payload() { + ::PROTOBUF_NAMESPACE_ID::Any* _add = _internal_add_payload(); + // @@protoc_insertion_point(field_add:messages.header.Message.payload) + return _add; +} +inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::PROTOBUF_NAMESPACE_ID::Any >& +Message::payload() const { + // @@protoc_insertion_point(field_list:messages.header.Message.payload) + return _impl_.payload_; +} + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif // __GNUC__ + +// @@protoc_insertion_point(namespace_scope) +} // namespace header +} // namespace messages + + +// @@protoc_insertion_point(global_scope) + +#include "google/protobuf/port_undef.inc" + +#endif // GOOGLE_PROTOBUF_INCLUDED_message_2eproto_2epb_2eh diff --git a/include/WHISPER/threadSafeQueue.hpp b/include/WHISPER/threadSafeQueue.hpp index b5249dc..f370efb 100644 --- a/include/WHISPER/threadSafeQueue.hpp +++ b/include/WHISPER/threadSafeQueue.hpp @@ -1,69 +1,53 @@ -#pragma once -/* This Source Code Form is subject to the terms of the Mozilla Public -* License, v. 2.0. If a copy of the MPL was not distributed with this -* file, You can obtain one at https://mozilla.org/MPL/2.0/. -*/ +#ifndef SAFE_QUEUE +#define SAFE_QUEUE -/** -* @file -* @copyright MPLv2 -*/ +#include #include #include -#include -#include -#include -namespace WHISPER{ - /** - * @class BasicMessageQueue - * - * This class encapsulates shared storage which is a queue and protect it - * with a mutex. In Addition there is a condition variable to notify other - * threads if there is new data in the queue. - */ - template +namespace WHISPER { + + // A threadsafe-queue. + template class threadSafeQueue { - private: - std::queue> q; - std::mutex mx; - std::condition_variable condVar; public: - /** - * @brief default constructor for class BasicMessageQueue - */ - threadSafeQueue(); + threadSafeQueue() : q(), m(), c() {} - /** - * @brief appends given message to queue - * - * The storage is protected by a mutex. Makes a notify_one call to inform - * in waiting thread that there is a new message in the queue. - * - * @param msg - incoming message - */ - void addElement( std::unique_ptr msg ); + ~threadSafeQueue() {} - /** - * @brief gets the fron message from the queue - * - * This method gets the front message in the queue and deletes it from the - * queue. The storage is protected by a mutex. If the queue is empty the - * function throws an exception. - * - * @result msg - returns the front message from the queue - * @throws length_error - if queue is empty this method throws length_error exception - */ - std::unique_ptr getElement(); + // Add an element to the queue. + void addElement(T t) + { + std::lock_guard lock(m); + q.push(t); + c.notify_one(); + } - /** - * @brief method size - * - * @result size - current size of the message queue - */ - unsigned int size(); + // Get the front element. + // If the queue is empty, wait till a element is avaiable. + T get(void) + { + std::unique_lock lock(m); + while (q.empty()) + { + // release lock as long as the wait and reaquire it afterwards. + c.wait(lock); + } + T val = q.front(); + q.pop(); + return val; + } + unsigned int size() + { + std::unique_lock lock(m); + return q.size(); + } - }; //BasicMessageQueue - -}; // namespace BC \ No newline at end of file + private: + std::queue q; + mutable std::mutex m; + std::condition_variable c; + }; +} +#endif \ No newline at end of file diff --git a/include/WHISPER/whisper.hpp b/include/WHISPER/whisper.hpp index 7048f3a..c94c607 100644 --- a/include/WHISPER/whisper.hpp +++ b/include/WHISPER/whisper.hpp @@ -54,7 +54,7 @@ namespace WHISPER /// variable indicating if the receiveThread should be stopped std::atomic stopReceiveThread; - std::shared_ptr> receiveQueue = nullptr; + std::shared_ptr> receiveQueue = nullptr; @@ -63,24 +63,25 @@ namespace WHISPER public: whispercomm(std::uint32_t id, SourceType owndevicetype):ownID_(id),ownDeviceType_(owndevicetype) - { - receiveQueue = std::make_shared>(); - }; - void connect(std::shared_ptr> receiver); - void publish(std::string msg); + {}; + void connect(std::shared_ptr> receiver); + void publish(std::string msg,std::string topic); void disconnect(); void subscribe(std::string topic); void unsubscribe(std::string topic); protected: - void addMsgToReceiverQueue(std::string); + void addMsgToReceiverQueue(WHISPER::Message); + void setGateway(bool); + virtual void derivedConnect() = 0; + virtual void derivedDisconnect() = 0; + virtual void derivedPublish(std::string msg,std::string topic) = 0; + virtual void derivedReceive() = 0; - virtual void derivedConnect() = 0; - virtual void derivedDisconnect() = 0; - virtual void derivedPublish(std::string msg) = 0; - virtual void derivedReceive() = 0; + virtual void derivedSubscribe(std::string topic) = 0; + virtual void derivedUnsubscribe(std::string topic) = 0; diff --git a/src/WHISPER/InternalUDPService.cpp b/src/WHISPER/InternalUDPService.cpp new file mode 100644 index 0000000..07ec69e --- /dev/null +++ b/src/WHISPER/InternalUDPService.cpp @@ -0,0 +1,167 @@ + + +#include "WHISPER/InternalUDPService.hpp" +#include "WHISPER/Messages/Message.hpp" +#include "WHISPER/whisper.hpp" +#include "zmq.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace WHISPER { + + + InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string address): + whispercomm(id, owndevicetype),port_(port),address_(address) + { + ctx = zmq::context_t(2); + sender = zmq::socket_t(ctx,zmq::socket_type::radio); + receiver = std::make_shared(ctx,zmq::socket_type::dish); + // receiver = zmq::socket_t(ctx,zmq::socket_type::dish); + LOG_S(INFO)<< "sockets are created"; + } + + void InternalUDPService::derivedConnect() + { + ownReceivingPort_ = checkPort(port_); + LOG_S(INFO)<< "could bind to port:" << ownReceivingPort_; + + + if(ownReceivingPort_ == port_){ + std::string portAsString = std::to_string(ownReceivingPort_); + receiver->bind("udp://*:"+portAsString); + }else if (ownReceivingPort_ == 0) { + throw std::invalid_argument( " receiver cant bind to port " ); + }else { + std::string portAsString = std::to_string(ownReceivingPort_); + receiver->bind("udp://*:"+portAsString); + } + + receiver->join("management"); + + + std::string sendingPort = std::to_string(port_); + LOG_S(INFO)<< sendingPort; + + sender.connect("udp://"+address_+":"+sendingPort); + } + + + void InternalUDPService::derivedDisconnect() + { + sender.close(); + receiver->close(); + } + void InternalUDPService::derivedPublish(std::string msg,std::string topic) + { + zmq::message_t tmpmsg(msg.size()); + memcpy (tmpmsg.data (), msg.data(), msg.size()); + const char *tmp = topic.c_str(); + tmpmsg.set_group(topic.c_str()); + sender.send(tmpmsg,zmq::send_flags::none); + + } + void InternalUDPService::derivedReceive() + { + + LOG_S(INFO)<<"zmq receiving funk"; + zmq::message_t msg; + + + receiver->recv(msg,zmq::recv_flags::none); + // LOG_S(INFO)< #include @@ -16,6 +15,7 @@ namespace WHISPER { sourceType_ = msg.sourcetype(); msgType_ = msg.msgtype(); joinMessage = messages::join::Join(); + if ( msg.payload_size()) { if (msg.payload().begin()->Is()) { msg.payload().begin()->UnpackTo(&joinMessage); @@ -44,6 +44,7 @@ namespace WHISPER { auto test = std::make_shared(); test->PackFrom(joinMessage); addPayLoad(test); + } diff --git a/src/WHISPER/Messages/Message.cpp b/src/WHISPER/Messages/Message.cpp index f6716db..77166f1 100644 --- a/src/WHISPER/Messages/Message.cpp +++ b/src/WHISPER/Messages/Message.cpp @@ -1,6 +1,5 @@ -#include "WHISPER/Messages/Protos/join.pb.h" #include #include @@ -47,6 +46,7 @@ namespace WHISPER { void Message::addPayLoad(std::shared_ptr payload){ payload_ = payload; + LOG_S(INFO)<< "pack any size in message class "<ByteSizeLong(); msg.add_payload()->CopyFrom(*payload_); diff --git a/src/WHISPER/threadSafeQueue.cpp b/src/WHISPER/threadSafeQueue.cpp index 4aac344..ae440c3 100644 --- a/src/WHISPER/threadSafeQueue.cpp +++ b/src/WHISPER/threadSafeQueue.cpp @@ -1,60 +1,60 @@ -/* -* This Source Code Form is subject to the terms of the Mozilla Public -* License, v. 2.0. If a copy of the MPL was not distributed with this -* file, You can obtain one at https://mozilla.org/MPL/2.0/. -*/ +// /* +// * This Source Code Form is subject to the terms of the Mozilla Public +// * License, v. 2.0. If a copy of the MPL was not distributed with this +// * file, You can obtain one at https://mozilla.org/MPL/2.0/. +// */ -/** -* @file BasicMessageQueue.cpp -* @brief class which encapsulates queue -* @author Christina Sander -* @date 25.06.2020 -* @copyright MPLv2 -*/ -#include -#include -#include +// /** +// * @file BasicMessageQueue.cpp +// * @brief class which encapsulates queue +// * @author Christina Sander +// * @date 25.06.2020 +// * @copyright MPLv2 +// */ +// #include +// #include +// #include -template -WHISPER::threadSafeQueue threadSafeQueue(){ +// template +// WHISPER::threadSafeQueue threadSafeQueue(){ -} +// } -// appends the given message to the message queue -template -void WHISPER::threadSafeQueue< T>::addElement(std::unique_ptr elem) -{ - std::unique_lock lk(mx); - q.push( std::move(elem) ); - lk.unlock(); - condVar.notify_one(); -} +// // appends the given message to the message queue +// template +// void WHISPER::threadSafeQueue< T>::addElement(std::unique_ptr elem) +// { +// std::unique_lock lk(mx); +// q.push( std::move(elem) ); +// lk.unlock(); +// condVar.notify_one(); +// } -// gets a message from the queue -template -std::unique_ptr WHISPER::threadSafeQueue::getElement() -{ - std::unique_lock lk(mx); - if( 0 == q.size() ) - { - lk.unlock(); - throw std::length_error("Empty Queue\n"); - } - std::unique_ptr elem = std::move( q.front() ); - q.pop(); - lk.unlock(); - return std::move(elem); -} +// // gets a message from the queue +// template +// std::unique_ptr WHISPER::threadSafeQueue::getElement() +// { +// std::unique_lock lk(mx); +// if( 0 == q.size() ) +// { +// lk.unlock(); +// throw std::length_error("Empty Queue\n"); +// } +// std::unique_ptr elem = std::move( q.front() ); +// q.pop(); +// lk.unlock(); +// return std::move(elem); +// } -//returns the size of the message queue -template -unsigned int WHISPER::threadSafeQueue::size() -{ - std::unique_lock lk(mx); - unsigned int size = q.size(); - lk.unlock(); - return size; -} \ No newline at end of file +// //returns the size of the message queue +// template +// unsigned int WHISPER::threadSafeQueue::size() +// { +// std::unique_lock lk(mx); +// unsigned int size = q.size(); +// lk.unlock(); +// return size; +// } \ No newline at end of file diff --git a/src/WHISPER/whisper.cpp b/src/WHISPER/whisper.cpp index b04bd6e..8594dc2 100644 --- a/src/WHISPER/whisper.cpp +++ b/src/WHISPER/whisper.cpp @@ -10,6 +10,8 @@ */ #include +#include +#include /** * @brief namespace for all whisper-com related components */ @@ -18,27 +20,30 @@ namespace WHISPER - void whispercomm::connect(std::shared_ptr> receiver) + void whispercomm::connect(std::shared_ptr> receiver) { this->receiveQueue = receiver; this->derivedConnect(); + receiveThread = std::thread(&WHISPER::whispercomm::receive,this); } void whispercomm::disconnect() { - + derivedDisconnect(); } - void whispercomm::publish(std::string msg){ - this->derivedPublish(msg); + void whispercomm::publish(std::string msg,std::string topic){ + this->derivedPublish(msg,topic); } void whispercomm::receive(){ - this->derivedReceive(); + connected = true; + derivedReceive(); + while(!stopReceiveThread) { derivedReceive(); @@ -50,6 +55,7 @@ namespace WHISPER void whispercomm::subscribe(std::string topic) { this->subscribedTopics.push_back(topic); + derivedSubscribe(topic); } void whispercomm::unsubscribe(std::string topic) @@ -61,12 +67,23 @@ namespace WHISPER it = subscribedTopics.erase(it); } } + derivedUnsubscribe(topic); } - void whispercomm::addMsgToReceiverQueue(std::string) + void whispercomm::addMsgToReceiverQueue(WHISPER::Message msg) { + if (this->receiveQueue != nullptr) + { + + this->receiveQueue->addElement(msg); + } + } + + void whispercomm::setGateway(bool val) + { + gateway = val; } // Add datatypes here diff --git a/src/main.cpp b/src/main.cpp index bdad6d8..e24dc33 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,9 +1,27 @@ +#include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Message.hpp" #include #include #include #include +#include + + +/// variable for stopping the application +bool running = true; + +/** + * @brief killhandler to set running to false on CTRL-C + * + * @param s - the signal to manage + */ +void killHandlerPing(int s) { + + if (s == SIGINT) { + running = false; + } +} int main() @@ -30,7 +48,12 @@ int main() LOG_S(INFO)<< "join message data afer reception "<< receivedJoin.port; break; } + auto receiver = std::make_shared>(); + WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"127.0.0.1"); + service.connect(receiver); - + while (running) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } return 0; } \ No newline at end of file