ADD: added clocalClientsClass and ping pong check

This commit is contained in:
Henry Winkel
2023-01-19 17:29:35 +01:00
parent 73b15d235a
commit 3474cbd5cb
10 changed files with 212 additions and 34 deletions

View File

@@ -46,6 +46,8 @@ add_library(whisper-com STATIC
include/WHISPER/InternalUDPService.hpp
src/WHISPER/InternalUDPService.cpp
include/WHISPER/localClients.hpp
src/WHISPER/localClients.cpp
include/WHISPER/threadSafeQueue.hpp
src/WHISPER/threadSafeQueue.cpp

View File

@@ -2,10 +2,12 @@
#include "WHISPER/whisper.hpp"
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <vector>
#include <WHISPER/localClients.hpp>
#define ZMQ_BUILD_DRAFT_API 1
#include <zmq.hpp>
@@ -14,14 +16,14 @@
namespace WHISPER {
struct localClient{
std::uint32_t port;
std::string addr;
std::uint32_t parentid;
std::uint32_t id;
zmq::socket_t clientSocket;
std::time_t lastResponse;
};
// struct localClient{
// std::uint32_t port;
// std::string addr;
// std::uint32_t parentid;
// std::uint32_t id;
// zmq::socket_t clientSocket;
// std::time_t lastResponse;
// };
@@ -52,6 +54,10 @@ namespace WHISPER {
std::list<std::shared_ptr<localClient>> localclients;
std::atomic<bool> stopClientChecker = false;
std::atomic<bool> clientCheckerisRunning = false;
std::thread clientsCheckerThread;
/**
@@ -71,6 +77,10 @@ namespace WHISPER {
void sendToLocalClients(std::string msg,std::string topic);
void startChecker();
void stopChecker();
void checkClients();

View File

@@ -13,6 +13,7 @@ message Message {
uint32 sourceID = 4;
uint32 parentID = 5;
repeated google.protobuf.Any payload = 6;
}

View File

@@ -0,0 +1,45 @@
#pragma once
#include <memory>
#include <mutex>
#include <zmq.hpp>
#include <cstdint>
#include <string>
namespace WHISPER {
class localClient
{
public:
localClient(std::uint32_t port,std::string addr,std::uint32_t parentid,std::uint32_t id, std::shared_ptr<zmq::socket_t> clientSocket);
~localClient();
std::uint32_t getPort();
std::string getAddr();
std::uint32_t getParentid();
std::uint32_t getID();
void addClientSocket(std::shared_ptr<zmq::socket_t> socket);
std::shared_ptr<zmq::socket_t> getClientSocket();
std::time_t getLastResponse();
void setLastResponse(std::time_t time);
private:
mutable std::mutex mx;
const std::uint32_t port_;
const std::string addr_;
const std::uint32_t parentid_;
const std::uint32_t id_;
std::shared_ptr<zmq::socket_t> clientSocket_;
std::time_t lastResponse_;
};
}

View File

@@ -46,7 +46,7 @@ namespace WHISPER
std::atomic<bool> connected;
/// attribute identifying this service as a gateway and all packets should be forwarded
bool gateway = false;
std::atomic<bool> gateway = false;
/// variable for holding the receive thread identifier
std::thread receiveThread;

View File

@@ -7,6 +7,8 @@
#include "WHISPER/Messages/Ping.hpp"
#include "WHISPER/Messages/Pong.hpp"
#include "WHISPER/whisper.hpp"
#include <sys/types.h>
#include <thread>
#include <vector>
#include <zmq.hpp>
#include <chrono>
@@ -37,7 +39,7 @@ namespace WHISPER {
InternalUDPService::~InternalUDPService()
{
if (this->isConnected()== true) {
if (this->isConnected() == true) {
disconnect();
}
@@ -104,6 +106,8 @@ namespace WHISPER {
receiver->close();
if (receiver.unique()) receiver.reset();
setConnected(false);
}
@@ -160,25 +164,30 @@ namespace WHISPER {
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == join.port || it->get()->id == join.deviceId_) {
if (it->get()->getPort() == join.port || it->get()->getID() == join.deviceId_) {
clientAllreadyIn = true;
}
}
}
if (clientAllreadyIn == false) {
// localClient client;
auto client = std::make_shared<localClient>();
client->port = join.port;
client->id = join.deviceId_;
client->parentid = join.parentId_;
client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio);
auto tmpAddr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
auto tmpsocket = std::make_shared<zmq::socket_t>(ctx,zmq::socket_type::radio);
auto client = std::make_shared<localClient>(join.port,tmpAddr,join.parentId_,join.deviceId_, tmpsocket);
// client->port = join.port;
// client->id = join.deviceId_;
// client->parentid = join.parentId_;
// client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port);
// client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio);
client->setLastResponse(std::time(nullptr));
LOG_S(INFO)<< "udp://"+join.sourceAddr+":" + std::to_string(join.port);
client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port));
client->getClientSocket()->connect("udp://"+join.sourceAddr+":" + std::to_string(join.port));
localclients.emplace_back(client);
startChecker();
setGateway(true);
LOG_S(INFO)<< "new client joined";
@@ -187,14 +196,13 @@ namespace WHISPER {
}else if(msgType == WHISPER::LEAVE)
{
LOG_S(INFO)<<"client left";
WHISPER::Leave Leave(message);
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (it->get()->port == Leave.port || (it->get()->id == Leave.deviceId_ && it->get()->parentid == Leave.deviceId_)) {
it->get()->clientSocket.close();
if (it->get()->getPort() == Leave.port || (it->get()->getID() == Leave.deviceId_ && it->get()->getParentid() == Leave.deviceId_)) {
it->get()->getClientSocket()->close();
it = localclients.erase(it);
LOG_S(INFO)<<"client left";
}
@@ -202,12 +210,13 @@ namespace WHISPER {
}
if (localclients.size() == 0) {
stopChecker();
setGateway(false);
}
}else if(msgType == WHISPER::PING)
{
if (receivedMessage.deviceId_ != getOwnID() && receivedMessage.parentId_ != getParentID())
if (receivedMessage.deviceId_ != getOwnID() )
{
WHISPER::Pong pong(getParentID(),getOwnID(),getOwnDeviceType(),ownReceivingPort_);
this->publish(pong.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
@@ -218,10 +227,11 @@ namespace WHISPER {
WHISPER::Pong pong(message);
if (localclients.size() > 0) {
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (pong.deviceId_ == it->get()->id && pong.parentId_ == it->get()->parentid) {
it->get()->lastResponse = std::time(nullptr);
if (pong.deviceId_ == it->get()->getID() && pong.parentId_ == it->get()->getParentid()) {
it->get()->setLastResponse(std::time(nullptr));
}
}
}
@@ -242,10 +252,10 @@ namespace WHISPER {
auto tmpMsg = WHISPER::Message(msg);
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if (tmpMsg.deviceId_ != it->get()->id) {
if (tmpMsg.deviceId_ != it->get()->getID()) {
zmq::message_t tmp(msg.begin(),msg.end());
tmp.set_group(topic.c_str());
it->get()->clientSocket.send(tmp,zmq::send_flags::none);
it->get()->getClientSocket()->send(tmp,zmq::send_flags::none);
}
@@ -268,6 +278,50 @@ namespace WHISPER {
}
void InternalUDPService::startChecker()
{
clientsCheckerThread = std::thread(&InternalUDPService::checkClients,this);
stopClientChecker = false;
}
void InternalUDPService::stopChecker()
{
while (clientCheckerisRunning == true ) {
stopClientChecker = true;
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
LOG_S(INFO)<<"waiting for checker thread";
if (clientsCheckerThread.joinable() == true ) {
clientsCheckerThread.join();
}
}
}
void InternalUDPService::checkClients()
{
clientCheckerisRunning = true;
while(stopClientChecker == false)
{
if (localclients.size() > 0) {
WHISPER::Ping ping(getParentID(),getOwnID(),getOwnDeviceType(),port_);
sendToLocalClients(ping.serialize(), WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
for (auto it = localclients.begin(); it != localclients.end();it++)
{
if ((std::time(nullptr) - it->get()->getLastResponse()) >= 8 ) {
it = localclients.erase(it);
LOG_S(ERROR)<<"connection to local Client lost";
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(4000));
}
clientCheckerisRunning = false;
}

View File

@@ -36,7 +36,7 @@ namespace WHISPER {
WHISPER::Pong::Pong(std::uint32_t parentID,std::uint32_t deviceID, SourceType src,std::uint32_t port):
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PING,src),port_(port)
Message(parentID,deviceID,WHISPER::MsgTopics::MANAGEMENT,WHISPER::PONG,src),port_(port)
{
// message_ = messages::pong::Pong();
if (message_.IsInitialized()) {

View File

@@ -0,0 +1,66 @@
#include <WHISPER/localClients.hpp>
namespace WHISPER {
localClient::localClient(std::uint32_t port,std::string addr,std::uint32_t parentid,std::uint32_t id, std::shared_ptr<zmq::socket_t> clientSocket)
:port_(port),addr_(addr),parentid_(parentid),id_(id),clientSocket_(clientSocket),mx()
{
lastResponse_ = std::time(nullptr);
}
localClient::~localClient()
{
clientSocket_->close();
clientSocket_.reset();
}
std::uint32_t localClient::getPort()
{
std::lock_guard<std::mutex> lock(mx);
return port_;
}
std::string localClient::getAddr()
{
std::lock_guard<std::mutex> lock(mx);
return addr_;
}
std::uint32_t localClient::getParentid()
{
std::lock_guard<std::mutex> lock(mx);
return parentid_;
}
std::uint32_t localClient::getID()
{
std::lock_guard<std::mutex> lock(mx);
return id_;
}
std::time_t localClient::getLastResponse()
{
std::lock_guard<std::mutex> lock(mx);
return lastResponse_;
}
void localClient::setLastResponse(std::time_t time)
{
std::lock_guard<std::mutex> lock(mx);
lastResponse_ = time;
}
std::shared_ptr<zmq::socket_t> localClient::getClientSocket()
{
std::lock_guard<std::mutex> lock(mx);
return clientSocket_;
}
void localClient::addClientSocket(std::shared_ptr<zmq::socket_t> socket)
{
std::lock_guard<std::mutex> lock(mx);
clientSocket_ = socket;
}
}

View File

@@ -44,7 +44,7 @@ int main()
// WHISPER::Join join(1,1,WHISPER::SourceType::SHIP,8000,"192.168.1.178");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1");
WHISPER::InternalUDPService service(0,1,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178");
service.connect(receiver);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);
@@ -58,7 +58,7 @@ int main()
// WHISPER::RawTrack RawTrack(ID,WHISPER::MsgTopics::TRACK,WHISPER::SourceType::SHIP,0001);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);

View File

@@ -48,7 +48,7 @@ int main()
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"127.0.0.255","127.0.0.1");
WHISPER::InternalUDPService service(0,2,WHISPER::SHIP,8000,"192.168.1.255","192.168.1.178");
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::DATA]);