FIX: fixed some issues with subscribung and unsubscribing from topics

This commit is contained in:
Henry Winkel
2023-01-18 11:21:57 +01:00
parent f7d57ccbb8
commit 49bdb28535
7 changed files with 83 additions and 40 deletions

View File

@@ -89,18 +89,18 @@ target_include_directories(whisper-com PUBLIC
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
src) src)
# add_executable(main add_executable(main
# src/main.cpp src/main.cpp
# ) )
# add_dependencies(main protoc) add_dependencies(main protoc)
# target_link_libraries(main target_link_libraries(main
# whisper-com whisper-com
# loguru loguru
# ) )
# add_executable(mainRcv # add_executable(mainRcv

View File

@@ -51,7 +51,6 @@ namespace WHISPER {
std::list<std::shared_ptr<localClient>> localclients; std::list<std::shared_ptr<localClient>> localclients;
std::atomic<bool> stopReceiving = false;
/** /**
@@ -76,7 +75,7 @@ namespace WHISPER {
public: public:
InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress, std::string myAdress); InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress, std::string myAdress);
~InternalUDPService();

View File

@@ -39,8 +39,6 @@ namespace WHISPER
std::uint32_t ownID_; std::uint32_t ownID_;
/// device Type /// device Type
SourceType ownDeviceType_; SourceType ownDeviceType_;
/// all topics this service subscribed
std::vector<std::string> subscribedTopics;
/// show if the service is connected or not /// show if the service is connected or not
std::atomic<bool> connected; std::atomic<bool> connected;
@@ -56,6 +54,7 @@ namespace WHISPER
std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiveQueue = nullptr; std::shared_ptr<threadSafeQueue<WHISPER::Message>> receiveQueue = nullptr;
std::atomic<bool> Connected = false;
void receive(); void receive();
@@ -75,12 +74,16 @@ namespace WHISPER
protected: protected:
/// all topics this service subscribed
std::vector<std::string> subscribedTopics;
void addMsgToReceiverQueue(WHISPER::Message); void addMsgToReceiverQueue(WHISPER::Message);
void setGateway(bool); void setGateway(bool);
bool isGateway(); bool isGateway();
void setConnected(bool val);
bool isConnected();
virtual void derivedConnect() = 0; virtual void derivedConnect() = 0;

View File

@@ -5,6 +5,7 @@
#include "WHISPER/Messages/Leave.hpp" #include "WHISPER/Messages/Leave.hpp"
#include "WHISPER/Messages/Message.hpp" #include "WHISPER/Messages/Message.hpp"
#include "WHISPER/whisper.hpp" #include "WHISPER/whisper.hpp"
#include <vector>
#include <zmq.hpp> #include <zmq.hpp>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
@@ -32,6 +33,14 @@ namespace WHISPER {
LOG_S(INFO)<< "sockets are created"; LOG_S(INFO)<< "sockets are created";
} }
InternalUDPService::~InternalUDPService()
{
if (this->isConnected()== true) {
disconnect();
}
}
void InternalUDPService::derivedConnect() void InternalUDPService::derivedConnect()
{ {
ownReceivingPort_ = checkPort(port_); ownReceivingPort_ = checkPort(port_);
@@ -66,6 +75,7 @@ namespace WHISPER {
// sender.set(zmq::sockopt::multicast_loop ,1); // sender.set(zmq::sockopt::multicast_loop ,1);
LOG_S(INFO)<<"own ID: "<< getOwnID(); LOG_S(INFO)<<"own ID: "<< getOwnID();
WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
@@ -77,10 +87,21 @@ namespace WHISPER {
{ {
WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_);
this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]); this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]);
unsubscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]);
if (subscribedTopics.size() > 0) {
for ( std::vector<std::string>::iterator it = subscribedTopics.begin(); it != subscribedTopics.end(); it++) {
unsubscribe(*it);
}
}
sender.close(); sender.close();
loopbackSocket = nullptr; if (loopbackSocket.unique()) loopbackSocket.reset();
receiver->close(); receiver->close();
if (receiver.unique()) receiver.reset();
} }

View File

@@ -11,6 +11,7 @@
#include <WHISPER/whisper.hpp> #include <WHISPER/whisper.hpp>
#include <cstddef> #include <cstddef>
#include <cstring>
#include <iterator> #include <iterator>
/** /**
* @brief namespace for all whisper-com related components * @brief namespace for all whisper-com related components
@@ -30,7 +31,7 @@ namespace WHISPER
this->derivedConnect(); this->derivedConnect();
receiveThread = std::thread(&WHISPER::whispercomm::receive,this); receiveThread = std::thread(&WHISPER::whispercomm::receive,this);
this->setConnected(true);
} }
void whispercomm::disconnect() void whispercomm::disconnect()
@@ -38,7 +39,7 @@ namespace WHISPER
stopReceiveThread = true; stopReceiveThread = true;
if(receiveThread.joinable()) receiveThread.join(); if(receiveThread.joinable()) receiveThread.join();
derivedDisconnect(); derivedDisconnect();
this->setConnected(false);
} }
void whispercomm::publish(std::string msg,std::string topic){ void whispercomm::publish(std::string msg,std::string topic){
@@ -71,6 +72,7 @@ namespace WHISPER
void whispercomm::unsubscribe(std::string topic) void whispercomm::unsubscribe(std::string topic)
{ {
for (std::vector<std::string>::iterator it = subscribedTopics.begin(); it != subscribedTopics.end();it++) for (std::vector<std::string>::iterator it = subscribedTopics.begin(); it != subscribedTopics.end();it++)
{ {
if (*it == topic) if (*it == topic)
@@ -105,5 +107,13 @@ namespace WHISPER
} }
void whispercomm::setConnected(bool val){
this->Connected = val;
}
bool whispercomm::isConnected()
{
return this->Connected;
}
// Add datatypes here // Add datatypes here
} // namespace WHISPER } // namespace WHISPER

View File

@@ -1,7 +1,6 @@
#include "WHISPER/InternalUDPService.hpp" #include "WHISPER/InternalUDPService.hpp"
#include "WHISPER/Messages/Message.hpp" #include "WHISPER/Messages/Message.hpp"
#include "WHISPER/Messages/Track.hpp"
#include "zmq.hpp"
#include <iostream> #include <iostream>
#include <loguru.hpp> #include <loguru.hpp>
@@ -55,25 +54,25 @@ int main()
WHISPER::RawTrack RawTrack(ID,WHISPER::MsgTopics::TRACK,WHISPER::SourceType::SHIP,0001); // WHISPER::RawTrack RawTrack(ID,WHISPER::MsgTopics::TRACK,WHISPER::SourceType::SHIP,0001);
int size = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
while (running) {
// zmq::message_t msg(string.begin(),string.end());
// msg.set_group("data");
// sock.send(msg,zmq::send_flags::none);
if (size != receiver->size()) {
LOG_S(INFO)<<"received messages " << size;
size = receiver->size();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
// service.publish(RawTrack.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
// while (running) {
// // zmq::message_t msg(string.begin(),string.end());
// // msg.set_group("data");
// // sock.send(msg,zmq::send_flags::none);
// // if (size != receiver->size()) {
// // LOG_S(INFO)<<"received messages " << size;
// // size = receiver->size();
// // }
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
// }
service.disconnect(); service.disconnect();

View File

@@ -34,14 +34,25 @@ SCENARIO("A test scenario","[keywords]")
{ {
GIVEN("Preliminaries") GIVEN("Preliminaries")
{ {
int i = 40; WHISPER::InternalUDPService service(1,WHISPER::SourceType::SIMCOMTROLER,8000,"127.0.0.255","127.0.0.1");
auto receiver = std::make_shared<WHISPER::threadSafeQueue<WHISPER::Message>>();
service.connect(receiver) ;
// service.subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
int i = 0;
WHEN("doing something") WHEN("doing something")
{ {
i = i + 2;
LOG_S(INFO)<<"wating";
// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
service.disconnect();
LOG_S(INFO)<<"closing";
THEN("expecting something to happen") THEN("expecting something to happen")
{ {
REQUIRE(i == 42 ); // service.unsubscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::TRACK]);
REQUIRE(receiver.unique() == true );
} // THEN } // THEN
} // WHEN } // WHEN
} // GIVEN } // GIVEN