diff --git a/CMakeLists.txt b/CMakeLists.txt index 4133fdf..7c1ec89 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,6 +46,8 @@ add_library(whisper-com STATIC include/WHISPER/InternalUDPService.hpp src/WHISPER/InternalUDPService.cpp + include/WHISPER/localClients.hpp + src/WHISPER/localClients.cpp include/WHISPER/threadSafeQueue.hpp src/WHISPER/threadSafeQueue.cpp diff --git a/include/WHISPER/InternalUDPService.hpp b/include/WHISPER/InternalUDPService.hpp index d707488..8d02b27 100644 --- a/include/WHISPER/InternalUDPService.hpp +++ b/include/WHISPER/InternalUDPService.hpp @@ -2,10 +2,12 @@ #include "WHISPER/whisper.hpp" +#include #include #include #include #include +#include #define ZMQ_BUILD_DRAFT_API 1 #include @@ -14,14 +16,14 @@ namespace WHISPER { - struct localClient{ - std::uint32_t port; - std::string addr; - std::uint32_t parentid; - std::uint32_t id; - zmq::socket_t clientSocket; - std::time_t lastResponse; - }; + // struct localClient{ + // std::uint32_t port; + // std::string addr; + // std::uint32_t parentid; + // std::uint32_t id; + // zmq::socket_t clientSocket; + // std::time_t lastResponse; + // }; @@ -50,9 +52,13 @@ namespace WHISPER { std::shared_ptr loopbackSocket = nullptr; /// vector of local udp clients std::list> localclients; - + std::atomic stopClientChecker = false; + std::atomic clientCheckerisRunning = false; + + std::thread clientsCheckerThread; + /** * @brief checks if the given port is already is use @@ -69,7 +75,11 @@ namespace WHISPER { void derivedUnsubscribe(std::string topic) override; void sendToLocalClients(std::string msg,std::string topic); - + + + void startChecker(); + void stopChecker(); + void checkClients(); diff --git a/include/WHISPER/Messages/Protos/message.proto b/include/WHISPER/Messages/Protos/message.proto index 820e29c..955c718 100644 --- a/include/WHISPER/Messages/Protos/message.proto +++ b/include/WHISPER/Messages/Protos/message.proto @@ -12,6 +12,7 @@ message Message { uint32 sourceType = 3; uint32 sourceID = 4; uint32 parentID = 5; + repeated google.protobuf.Any payload = 6; diff --git a/include/WHISPER/localClients.hpp b/include/WHISPER/localClients.hpp new file mode 100644 index 0000000..001d2e5 --- /dev/null +++ b/include/WHISPER/localClients.hpp @@ -0,0 +1,45 @@ +#pragma once + + +#include +#include +#include +#include +#include + + +namespace WHISPER { + + class localClient + { + public: + localClient(std::uint32_t port,std::string addr,std::uint32_t parentid,std::uint32_t id, std::shared_ptr clientSocket); + ~localClient(); + + + + std::uint32_t getPort(); + std::string getAddr(); + std::uint32_t getParentid(); + std::uint32_t getID(); + + void addClientSocket(std::shared_ptr socket); + std::shared_ptr getClientSocket(); + + std::time_t getLastResponse(); + void setLastResponse(std::time_t time); + + private: + + mutable std::mutex mx; + + const std::uint32_t port_; + const std::string addr_; + const std::uint32_t parentid_; + const std::uint32_t id_; + std::shared_ptr clientSocket_; + std::time_t lastResponse_; + + }; + +} \ No newline at end of file diff --git a/include/WHISPER/whisper.hpp b/include/WHISPER/whisper.hpp index d3adbc5..3429109 100644 --- a/include/WHISPER/whisper.hpp +++ b/include/WHISPER/whisper.hpp @@ -46,7 +46,7 @@ namespace WHISPER std::atomic connected; /// attribute identifying this service as a gateway and all packets should be forwarded - bool gateway = false; + std::atomic gateway = false; /// variable for holding the receive thread identifier std::thread receiveThread; diff --git a/src/WHISPER/InternalUDPService.cpp b/src/WHISPER/InternalUDPService.cpp index 151391d..557e568 100644 --- a/src/WHISPER/InternalUDPService.cpp +++ b/src/WHISPER/InternalUDPService.cpp @@ -7,6 +7,8 @@ #include "WHISPER/Messages/Ping.hpp" #include "WHISPER/Messages/Pong.hpp" #include "WHISPER/whisper.hpp" +#include +#include #include #include #include @@ -37,7 +39,7 @@ namespace WHISPER { InternalUDPService::~InternalUDPService() { - if (this->isConnected()== true) { + if (this->isConnected() == true) { disconnect(); } @@ -97,13 +99,15 @@ namespace WHISPER { } } - + sender.close(); if (loopbackSocket.unique()) loopbackSocket.reset(); receiver->close(); if (receiver.unique()) receiver.reset(); + setConnected(false); + } @@ -160,25 +164,30 @@ namespace WHISPER { if (localclients.size() > 0) { for (auto it = localclients.begin(); it != localclients.end();it++) { - if (it->get()->port == join.port || it->get()->id == join.deviceId_) { + if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_) { clientAllreadyIn = true; } } } if (clientAllreadyIn == false) { // localClient client; - auto client = std::make_shared(); - client->port = join.port; - client->id = join.deviceId_; - client->parentid = join.parentId_; - client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); - client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); + auto tmpAddr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); + auto tmpsocket = std::make_shared(ctx,zmq::socket_type::radio); + + auto client = std::make_shared(join.port,tmpAddr,join.parentId_,join.deviceId_, tmpsocket); + // client->port = join.port; + // client->id = join.deviceId_; + // client->parentid = join.parentId_; + // client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); + // client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); + client->setLastResponse(std::time(nullptr)); LOG_S(INFO)<< "udp://"+join.sourceAddr+":" + std::to_string(join.port); - client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); + client->getClientSocket()->connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); localclients.emplace_back(client); + startChecker(); setGateway(true); LOG_S(INFO)<< "new client joined"; @@ -187,14 +196,13 @@ namespace WHISPER { }else if(msgType == WHISPER::LEAVE) { - LOG_S(INFO)<<"client left"; WHISPER::Leave Leave(message); if (localclients.size() > 0) { for (auto it = localclients.begin(); it != localclients.end();it++) { - if (it->get()->port == Leave.port || (it->get()->id == Leave.deviceId_ && it->get()->parentid == Leave.deviceId_)) { - it->get()->clientSocket.close(); + if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_)) { + it->get()->getClientSocket()->close(); it = localclients.erase(it); LOG_S(INFO)<<"client left"; } @@ -202,12 +210,13 @@ namespace WHISPER { } if (localclients.size() == 0) { + stopChecker(); setGateway(false); } }else if(msgType == WHISPER::PING) { - if (receivedMessage.deviceId_ != getOwnID() && receivedMessage.parentId_ != getParentID()) + if (receivedMessage.deviceId_ != getOwnID() ) { WHISPER::Pong pong(getParentID(),getOwnID(),getOwnDeviceType(),ownReceivingPort_); this->publish(pong.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); @@ -218,10 +227,11 @@ namespace WHISPER { WHISPER::Pong pong(message); if (localclients.size() > 0) { + for (auto it = localclients.begin(); it != localclients.end();it++) - { - if (pong.deviceId_ == it->get()->id && pong.parentId_ == it->get()->parentid) { - it->get()->lastResponse = std::time(nullptr); + { + if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid()) { + it->get()->setLastResponse(std::time(nullptr)); } } } @@ -242,10 +252,10 @@ namespace WHISPER { auto tmpMsg = WHISPER::Message(msg); for (auto it = localclients.begin(); it != localclients.end();it++) { - if (tmpMsg.deviceId_ != it->get()->id) { + if (tmpMsg.deviceId_ != it->get()->getID()) { zmq::message_t tmp(msg.begin(),msg.end()); tmp.set_group(topic.c_str()); - it->get()->clientSocket.send(tmp,zmq::send_flags::none); + it->get()->getClientSocket()->send(tmp,zmq::send_flags::none); } @@ -268,6 +278,50 @@ namespace WHISPER { } + void InternalUDPService::startChecker() + { + + clientsCheckerThread = std::thread(&InternalUDPService::checkClients,this); + stopClientChecker = false; + } + void InternalUDPService::stopChecker() + { + while (clientCheckerisRunning == true ) { + stopClientChecker = true; + // std::this_thread::sleep_for(std::chrono::milliseconds(100)); + LOG_S(INFO)<<"waiting for checker thread"; + if (clientsCheckerThread.joinable() == true ) { + clientsCheckerThread.join(); + } + } + } + void InternalUDPService::checkClients() + { + clientCheckerisRunning = true; + + while(stopClientChecker == false) + { + + if (localclients.size() > 0) { + WHISPER::Ping ping(getParentID(),getOwnID(),getOwnDeviceType(),port_); + sendToLocalClients(ping.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); + + for (auto it = localclients.begin(); it != localclients.end();it++) + { + if ((std::time(nullptr) - it->get()->getLastResponse()) >= 8 ) { + it = localclients.erase(it); + LOG_S(ERROR)<<"connection to local Client lost"; + } + } + + } + + std::this_thread::sleep_for(std::chrono::milliseconds(4000)); + } + + clientCheckerisRunning = false; + } + diff --git a/src/WHISPER/Messages/Pong.cpp b/src/WHISPER/Messages/Pong.cpp index f9429d1..78b8847 100644 --- a/src/WHISPER/Messages/Pong.cpp +++ b/src/WHISPER/Messages/Pong.cpp @@ -36,7 +36,7 @@ namespace WHISPER { WHISPER::Pong::Pong(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port): - Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PING,src),port_(port) + Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PONG,src),port_(port) { // message_ = messages::pong::Pong(); if (message_.IsInitialized()) { diff --git a/src/WHISPER/localClients.cpp b/src/WHISPER/localClients.cpp new file mode 100644 index 0000000..0b5e3c1 --- /dev/null +++ b/src/WHISPER/localClients.cpp @@ -0,0 +1,66 @@ + #include + + namespace WHISPER { + + + localClient::localClient(std::uint32_t port,std::string addr,std::uint32_t parentid,std::uint32_t id, std::shared_ptr clientSocket) + :port_(port),addr_(addr),parentid_(parentid),id_(id),clientSocket_(clientSocket),mx() + { + lastResponse_ = std::time(nullptr); + } + + +localClient::~localClient() +{ + clientSocket_->close(); + clientSocket_.reset(); +} + + + +std::uint32_t localClient::getPort() +{ + std::lock_guard lock(mx); + return port_; +} +std::string localClient::getAddr() +{ + std::lock_guard lock(mx); + return addr_; +} +std::uint32_t localClient::getParentid() +{ + std::lock_guard lock(mx); + return parentid_; +} +std::uint32_t localClient::getID() +{ + std::lock_guard lock(mx); + return id_; +} +std::time_t localClient::getLastResponse() +{ + std::lock_guard lock(mx); + return lastResponse_; +} +void localClient::setLastResponse(std::time_t time) +{ + std::lock_guard lock(mx); + lastResponse_ = time; +} +std::shared_ptr localClient::getClientSocket() +{ + std::lock_guard lock(mx); + return clientSocket_; +} + +void localClient::addClientSocket(std::shared_ptr socket) +{ + std::lock_guard lock(mx); + clientSocket_ = socket; +} + + + + + } \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 43c0ce3..5b8f7d3 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -44,7 +44,7 @@ int main() // WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178"); auto receiver = std::make_shared>(); - WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1"); + WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178"); service.connect(receiver); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]); @@ -58,7 +58,7 @@ int main() // WHISPER::RawTrack RawTrack(ID,WHISPER::MsgTopics::TRACK,WHISPER::SourceType::SHIP,0001); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); diff --git a/src/mainRecv.cpp b/src/mainRecv.cpp index 86ecb9f..652fb43 100644 --- a/src/mainRecv.cpp +++ b/src/mainRecv.cpp @@ -48,7 +48,7 @@ int main() auto receiver = std::make_shared>(); - WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1"); + WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178"); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]); service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);