diff --git a/CMakeLists.txt b/CMakeLists.txt index 369362d..82af62d 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,18 @@ target_include_directories(whisper-com PUBLIC loguru ) + add_executable(mainRcv + + src/mainRecv.cpp + + + ) + target_link_libraries(mainRcv + loguru + whisper-com + + + ) # # Everything TEST related diff --git a/include/WHISPER/InternalUDPService.hpp b/include/WHISPER/InternalUDPService.hpp index bed1e21..fd94105 100644 --- a/include/WHISPER/InternalUDPService.hpp +++ b/include/WHISPER/InternalUDPService.hpp @@ -3,7 +3,9 @@ #include "WHISPER/whisper.hpp" #include +#include #include +#include #define ZMQ_BUILD_DRAFT_API 1 #include @@ -11,17 +13,31 @@ namespace WHISPER { + + struct localClient{ + short port; + zmq::socket_t clientSocket; + }; + + + + + class InternalUDPService : public whispercomm { private: - std::string address_; + std::string destinationAdress_; + std::string myAdress_; std::uint16_t port_; std::uint16_t ownReceivingPort_; zmq::context_t ctx; zmq::socket_t sender; - std::shared_ptr receiver; + std::shared_ptr receiver = nullptr; + std::shared_ptr loopbackSocket = nullptr; + + std::vector> localclients; std::uint16_t checkPort(std::uint16_t port); @@ -32,9 +48,12 @@ namespace WHISPER { void derivedSubscribe(std::string topic) override; void derivedUnsubscribe(std::string topic) override; + + + public: - InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string address); + InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress, std::string myAdress); diff --git a/include/WHISPER/Messages/Message.hpp b/include/WHISPER/Messages/Message.hpp index d1ea29b..d9b0b59 100644 --- a/include/WHISPER/Messages/Message.hpp +++ b/include/WHISPER/Messages/Message.hpp @@ -21,6 +21,10 @@ namespace WHISPER { JOIN=1, /// participant is leaving LEAVE, + /// PING + PING, + /// PONG + PONG, /// owntrack informaton OWN_TRACK, /// raw track message diff --git a/include/WHISPER/whisper.hpp b/include/WHISPER/whisper.hpp index c94c607..d9e24db 100644 --- a/include/WHISPER/whisper.hpp +++ b/include/WHISPER/whisper.hpp @@ -70,6 +70,8 @@ namespace WHISPER void subscribe(std::string topic); void unsubscribe(std::string topic); + std::uint32_t getOwnID(); + protected: void addMsgToReceiverQueue(WHISPER::Message); diff --git a/src/WHISPER/InternalUDPService.cpp b/src/WHISPER/InternalUDPService.cpp index 07ec69e..b7c903a 100644 --- a/src/WHISPER/InternalUDPService.cpp +++ b/src/WHISPER/InternalUDPService.cpp @@ -1,6 +1,7 @@ #include "WHISPER/InternalUDPService.hpp" +#include "WHISPER/Messages/Join.hpp" #include "WHISPER/Messages/Message.hpp" #include "WHISPER/whisper.hpp" #include "zmq.hpp" @@ -17,8 +18,8 @@ namespace WHISPER { - InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string address): - whispercomm(id, owndevicetype),port_(port),address_(address) + InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress,std::string myAdress): + whispercomm(id, owndevicetype),port_(port),destinationAdress_(destinationAdress),myAdress_(myAdress) { ctx = zmq::context_t(2); sender = zmq::socket_t(ctx,zmq::socket_type::radio); @@ -30,7 +31,7 @@ namespace WHISPER { void InternalUDPService::derivedConnect() { ownReceivingPort_ = checkPort(port_); - LOG_S(INFO)<< "could bind to port:" << ownReceivingPort_; + std::string sendingPort = std::to_string(port_); if(ownReceivingPort_ == port_){ @@ -39,17 +40,24 @@ namespace WHISPER { }else if (ownReceivingPort_ == 0) { throw std::invalid_argument( " receiver cant bind to port " ); }else { + LOG_S(INFO)<< "new port to bind: " << ownReceivingPort_; + + loopbackSocket = std::make_shared(ctx,zmq::socket_type::radio); + loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); + std::string portAsString = std::to_string(ownReceivingPort_); + receiver->bind("udp://*:"+portAsString); } receiver->join("management"); - std::string sendingPort = std::to_string(port_); LOG_S(INFO)<< sendingPort; - sender.connect("udp://"+address_+":"+sendingPort); + sender.connect("udp://"+destinationAdress_+":"+sendingPort); + // sender.set(zmq::sockopt::multicast_loop ,1); + } @@ -60,36 +68,81 @@ namespace WHISPER { } void InternalUDPService::derivedPublish(std::string msg,std::string topic) { - zmq::message_t tmpmsg(msg.size()); - memcpy (tmpmsg.data (), msg.data(), msg.size()); - const char *tmp = topic.c_str(); + zmq::message_t tmpmsg(msg.begin(),msg.end()); + tmpmsg.set_group(topic.c_str()); sender.send(tmpmsg,zmq::send_flags::none); + if (loopbackSocket != nullptr) { + LOG_S(INFO)<<"loop back send"; + zmq::message_t tmpmsg(msg.begin(),msg.end()); + tmpmsg.set_group(topic.c_str()); + loopbackSocket->send(tmpmsg,zmq::send_flags::none); + } } void InternalUDPService::derivedReceive() { - LOG_S(INFO)<<"zmq receiving funk"; - zmq::message_t msg; - - + zmq::message_t msg; receiver->recv(msg,zmq::recv_flags::none); - // LOG_S(INFO)<myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { + + bool clientAllreadyIn = false; + if (localclients.size() > 0) { + for (std::vector>::iterator it = localclients.begin(); it != localclients.end();it++) + { + if (it->get()->port == join.port) { + clientAllreadyIn = true; + } + } + } + if (clientAllreadyIn == false) { + auto client = std::make_shared(); + client->port = join.port; + client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); + client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); + localclients.emplace_back(client); + setGateway(true); + + } + } + + }else if(msgType == WHISPER::LEAVE) + { + + }else if(msgType == WHISPER::PING) + { + + }else if(msgType == WHISPER::PONG) + { + + }else + { + addMsgToReceiverQueue(WHISPER::Message(message)); + } + } + + } void InternalUDPService::derivedSubscribe(std::string topic) { - // receiver.join(topic.c_str()); + receiver->join(topic.c_str()); } void InternalUDPService::derivedUnsubscribe(std::string topic) { - // receiver.leave(topic.c_str()); + receiver->leave(topic.c_str()); } std::uint16_t InternalUDPService::checkPort(std::uint16_t port) @@ -156,7 +209,8 @@ namespace WHISPER { close(sockfd); - + + LOG_S(WARNING)<< "new assignet port is" << localPort; return localPort; } diff --git a/src/WHISPER/Messages/Message.cpp b/src/WHISPER/Messages/Message.cpp index 77166f1..70b32a3 100644 --- a/src/WHISPER/Messages/Message.cpp +++ b/src/WHISPER/Messages/Message.cpp @@ -12,6 +12,8 @@ namespace WHISPER { msg = messages::header::Message(); try { msg.ParseFromString(stringMessage); + + deviceId_ = msg.sourceid(); topic_ = msg.topic(); sourceType_ = msg.sourcetype(); msgType_ = msg.msgtype(); @@ -60,7 +62,6 @@ namespace WHISPER { std::string Message::serialize(){ std::string serializedMessage; - LOG_S(INFO)<> receiver) diff --git a/src/main.cpp b/src/main.cpp index e24dc33..878c5de 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,5 +1,6 @@ #include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Message.hpp" +#include "zmq.hpp" #include #include @@ -26,33 +27,66 @@ void killHandlerPing(int s) { int main() { - GOOGLE_PROTOBUF_VERIFY_VERSION; + // GOOGLE_PROTOBUF_VERIFY_VERSION; - WHISPER::Join join(1,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"127.0.0.1"); - std::string msg = join.serialize(); - LOG_S(INFO)<<" serialized Message is "<>(); - WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"127.0.0.1"); + WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.19"); service.connect(receiver); + service.publish(join.serialize(), "management"); + service.subscribe("data"); + + + // zmq::context_t ctx(2); + // zmq::socket_t sock(ctx,zmq::socket_type::radio); + // sock.connect("udp://127.0.0.255:8000"); + + + // std::string string = "hello world form 2"; + // zmq::message_t msg(string.begin(),string.end()); + // // memcpy (msg.data (), string.data(), string.size()); + // LOG_S(INFO)<<"message contains "<size()) { + LOG_S(INFO)<<"received messages " << size; + size = receiver->size(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); } return 0; diff --git a/src/mainRecv.cpp b/src/mainRecv.cpp new file mode 100644 index 0000000..28c19f3 --- /dev/null +++ b/src/mainRecv.cpp @@ -0,0 +1,61 @@ +#include "zmq.hpp" +#include +#include + +#include +#include + +#include +#include + + +/// variable for stopping the application +bool running = true; + +/** + * @brief killhandler to set running to false on CTRL-C + * + * @param s - the signal to manage + */ +void killHandlerPing(int s) { + + if (s == SIGINT) { + running = false; + } +} + + +int main() +{ + + + + // zmq::context_t ctx(2); + // zmq::socket_t sock(ctx,zmq::socket_type::dish); + // sock.bind("udp://*:8000"); + // zmq::message_t temp; + // sock.join("data"); + + WHISPER::Join join(2,1,WHISPER::MsgType::JOIN,WHISPER::SourceType::SHIP,8000,"192.168.0.19"); + + auto receiver = std::make_shared>(); + WHISPER::InternalUDPService service(1,WHISPER::SHIP,8000,"192.168.0.255","192.168.0.255"); + service.connect(receiver); + service.publish(join.serialize(), "management"); + + // zmq::context_t ctx(2); + // zmq::socket_t sock(ctx,zmq::socket_type::radio); + // sock.connect("udp://127.0.0.1:8000"); + // std::string string = "hello world form 2"; + + + while (running) { + // LOG_S(INFO)<<"received messages " << receiver->size(); + service.publish(join.serialize(), "management"); + // zmq::message_t msg(string.begin(),string.end()); + // msg.set_group("management"); + // sock.send(msg,zmq::send_flags::none); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } +return 0; +} \ No newline at end of file