#include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Join.hpp" #include "WHISPER/Messages/Leave.hpp" #include "WHISPER/Messages/Message.hpp" #include "WHISPER/whisper.hpp" #include #include #include #include #include #include #include #include #include #include #include #include namespace WHISPER { InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress,std::string myAdress): whispercomm(id, owndevicetype),port_(port),destinationAdress_(destinationAdress),myAdress_(myAdress) { ctx = zmq::context_t(2); sender = zmq::socket_t(ctx,zmq::socket_type::radio); receiver = std::make_shared(ctx,zmq::socket_type::dish); // receiver = zmq::socket_t(ctx,zmq::socket_type::dish); LOG_S(INFO)<< "sockets are created"; } void InternalUDPService::derivedConnect() { ownReceivingPort_ = checkPort(port_); std::string sendingPort = std::to_string(port_); if(ownReceivingPort_ == port_){ std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); }else if (ownReceivingPort_ == 0) { throw std::invalid_argument( " receiver cant bind to port " ); }else { loopbackSocket = std::make_shared(ctx,zmq::socket_type::radio); loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); LOG_S(WARNING)<<"local receiving port is: " << ownReceivingPort_; } ///subscribe to the basic message domain ///zmq upd sockets only work with pub sub subscribe(WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); ///used to set a custom time out to the socket receiver->set(zmq::sockopt::rcvtimeo,100); sender.connect("udp://"+destinationAdress_+":"+sendingPort); // sender.set(zmq::sockopt::multicast_loop ,1); LOG_S(INFO)<<"own ID: "<< getOwnID(); WHISPER::Join join(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); this->publish(join.serialize(),WHISPER::MsgTopicsMap[WHISPER::MsgTopics::MANAGEMENT]); } void InternalUDPService::derivedDisconnect() { WHISPER::Leave Leave(getOwnID(),getOwnDeviceType(),ownReceivingPort_,myAdress_); this->publish(Leave.serialize(),WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)Leave.topic_]); sender.close(); loopbackSocket = nullptr; receiver->close(); } void InternalUDPService::derivedPublish(std::string msg,std::string topic) { zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); sender.send(tmpmsg,zmq::send_flags::none); zmq::message_t localmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); sendToLocalClients(msg, topic); if (loopbackSocket != nullptr) { zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); loopbackSocket->send(tmpmsg,zmq::send_flags::none); } } void InternalUDPService::derivedReceive() { zmq::recv_result_t res; zmq::message_t msg; ///res is the size of the received message or error; cant figure out what value is set when ther is an error res = receiver->recv(msg,zmq::recv_flags::none); if (res.has_value() && res.value() >= 0) { std::string message = msg.to_string(); WHISPER::Message receivedMessage(message); sendToLocalClients(receivedMessage.serialize(), WHISPER::MsgTopicsMap[(WHISPER::MsgTopics)receivedMessage.topic_]); if (receivedMessage.deviceId_ != getOwnID()) { int msgType = receivedMessage.msgType_; if (msgType == WHISPER::JOIN) { WHISPER::Join join(message); // LOG_S(INFO)<<"joined id "<myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { LOG_S(INFO)<<"go for it"; bool clientAllreadyIn = false; if (localclients.size() > 0) { for (auto it = localclients.begin(); it != localclients.end();it++) { if (it->get()->port == join.port) { clientAllreadyIn = true; } } } if (clientAllreadyIn == false) { // localClient client; auto client = std::make_shared(); client->port = join.port; client->id = join.deviceId_; client->addr = "udp://"+join.sourceAddr+":" + std::to_string(join.port); client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); LOG_S(INFO)<< "udp://"+join.sourceAddr+":" + std::to_string(join.port); client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); localclients.emplace_back(client); setGateway(true); LOG_S(INFO)<< "new client joined"; } } }else if(msgType == WHISPER::LEAVE) { LOG_S(INFO)<<"client left"; WHISPER::Leave Leave(message); if (localclients.size() > 0) { for (auto it = localclients.begin(); it != localclients.end();it++) { if (it->get()->port == Leave.port) { it->get()->clientSocket.close(); it = localclients.erase(it); LOG_S(INFO)<<"client left"; } } } if (localclients.size() == 0) { setGateway(false); } }else if(msgType == WHISPER::PING) { }else if(msgType == WHISPER::PONG) { }else { addMsgToReceiverQueue(WHISPER::Message(message)); } } }else if(res.has_value() && res.value()== -1 ){ } } void InternalUDPService::sendToLocalClients(std::string msg,std::string topic) { if (localclients.size() > 0 && isGateway() == true) { auto tmpMsg = WHISPER::Message(msg); for (auto it = localclients.begin(); it != localclients.end();it++) { if (tmpMsg.deviceId_ != it->get()->id) { zmq::message_t tmp(msg.begin(),msg.end()); tmp.set_group(topic.c_str()); it->get()->clientSocket.send(tmp,zmq::send_flags::none); } } } } void InternalUDPService::derivedSubscribe(std::string topic) { receiver->join(topic.c_str()); } void InternalUDPService::derivedUnsubscribe(std::string topic) { receiver->leave(topic.c_str()); } std::uint16_t InternalUDPService::checkPort(std::uint16_t port) { std::uint16_t localPort = port; struct sockaddr_in local; socklen_t localLen; int sockfd; // Creating socket file descriptor if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { perror("socket creation failed"); exit(EXIT_FAILURE); } local.sin_family = AF_INET; // IPv4 local.sin_addr.s_addr = INADDR_ANY; local.sin_port = htons(port); int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local)); if (err == -1 && errno == EADDRINUSE) { // set port to auto on binding local.sin_port = 0; err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local)); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket ")); } // if we could bind to a port we have to get the port value from the socket memset((char *) &local, 0, sizeof(local)); localLen=sizeof(local); err=getsockname(sockfd, (struct sockaddr *) &local, &localLen); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr errno=") + std::to_string(errno)); } //sometime getsock return port 0, i do not know why yet (dmeyer) //therefore i check for port 0 and try again localPort=ntohs(local.sin_port); if (localPort==0) { err=getsockname(sockfd, (struct sockaddr *) &local, &localLen); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr ")); } localPort=ntohs(local.sin_port); } } else if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket")); } close(sockfd); return localPort; } }