#include "WHISPER/InternalUDPService.hpp" #include "WHISPER/Messages/Join.hpp" #include "WHISPER/Messages/Message.hpp" #include "WHISPER/whisper.hpp" #include "zmq.hpp" #include #include #include #include #include #include #include #include namespace WHISPER { InternalUDPService::InternalUDPService(std::uint32_t id, SourceType owndevicetype,std::uint16_t port, std::string destinationAdress,std::string myAdress): whispercomm(id, owndevicetype),port_(port),destinationAdress_(destinationAdress),myAdress_(myAdress) { ctx = zmq::context_t(2); sender = zmq::socket_t(ctx,zmq::socket_type::radio); receiver = std::make_shared(ctx,zmq::socket_type::dish); // receiver = zmq::socket_t(ctx,zmq::socket_type::dish); LOG_S(INFO)<< "sockets are created"; } void InternalUDPService::derivedConnect() { ownReceivingPort_ = checkPort(port_); std::string sendingPort = std::to_string(port_); if(ownReceivingPort_ == port_){ std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); }else if (ownReceivingPort_ == 0) { throw std::invalid_argument( " receiver cant bind to port " ); }else { LOG_S(INFO)<< "new port to bind: " << ownReceivingPort_; loopbackSocket = std::make_shared(ctx,zmq::socket_type::radio); loopbackSocket->connect("udp://127.0.0.255:"+sendingPort); std::string portAsString = std::to_string(ownReceivingPort_); receiver->bind("udp://*:"+portAsString); } receiver->join("management"); LOG_S(INFO)<< sendingPort; sender.connect("udp://"+destinationAdress_+":"+sendingPort); // sender.set(zmq::sockopt::multicast_loop ,1); } void InternalUDPService::derivedDisconnect() { sender.close(); receiver->close(); } void InternalUDPService::derivedPublish(std::string msg,std::string topic) { zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); sender.send(tmpmsg,zmq::send_flags::none); if (loopbackSocket != nullptr) { LOG_S(INFO)<<"loop back send"; zmq::message_t tmpmsg(msg.begin(),msg.end()); tmpmsg.set_group(topic.c_str()); loopbackSocket->send(tmpmsg,zmq::send_flags::none); } } void InternalUDPService::derivedReceive() { zmq::message_t msg; receiver->recv(msg,zmq::recv_flags::none); std::string message = msg.to_string(); WHISPER::Message receivedMessage(message); if (receivedMessage.deviceId_ != getOwnID()) { int msgType = receivedMessage.msgType_; if (msgType == WHISPER::JOIN) { WHISPER::Join join(message); if (this->myAdress_ == join.sourceAddr || join.sourceAddr == "127.0.0.1") { bool clientAllreadyIn = false; if (localclients.size() > 0) { for (std::vector>::iterator it = localclients.begin(); it != localclients.end();it++) { if (it->get()->port == join.port) { clientAllreadyIn = true; } } } if (clientAllreadyIn == false) { auto client = std::make_shared(); client->port = join.port; client->clientSocket = zmq::socket_t(ctx,zmq::socket_type::radio); client->clientSocket.connect("udp://"+join.sourceAddr+":" + std::to_string(join.port)); localclients.emplace_back(client); setGateway(true); } } }else if(msgType == WHISPER::LEAVE) { }else if(msgType == WHISPER::PING) { }else if(msgType == WHISPER::PONG) { }else { addMsgToReceiverQueue(WHISPER::Message(message)); } } } void InternalUDPService::derivedSubscribe(std::string topic) { receiver->join(topic.c_str()); } void InternalUDPService::derivedUnsubscribe(std::string topic) { receiver->leave(topic.c_str()); } std::uint16_t InternalUDPService::checkPort(std::uint16_t port) { std::uint16_t localPort = port; struct sockaddr_in local; socklen_t localLen; int sockfd; // Creating socket file descriptor if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { perror("socket creation failed"); exit(EXIT_FAILURE); } local.sin_family = AF_INET; // IPv4 local.sin_addr.s_addr = INADDR_ANY; local.sin_port = htons(port); int err = bind(sockfd, (const struct sockaddr *)&local, sizeof(local)); if (err == -1 && errno == EADDRINUSE) { // no we are not a gateway setGateway(false); // set port to auto on binding local.sin_port = 0; err = bind(sockfd, (const struct sockaddr *) &local, sizeof(local)); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket ")); } // if we could bind to a port we have to get the port value from the socket memset((char *) &local, 0, sizeof(local)); localLen=sizeof(local); err=getsockname(sockfd, (struct sockaddr *) &local, &localLen); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr errno=") + std::to_string(errno)); } //sometime getsock return port 0, i do not know why yet (dmeyer) //therefore i check for port 0 and try again localPort=ntohs(local.sin_port); if (localPort==0) { err=getsockname(sockfd, (struct sockaddr *) &local, &localLen); if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not get port nr ")); } localPort=ntohs(local.sin_port); } } else if(err == -1) { throw std::runtime_error(std::string(__PRETTY_FUNCTION__) + std::string("could not bind port to socket")); } close(sockfd); LOG_S(WARNING)<< "new assignet port is" << localPort; return localPort; } }