Merge commit '31cbcd39be6aef3ed43121da5b797d8ec9b0fd31' as 'libs/cppzmq'

This commit is contained in:
Henry Winkel
2022-10-22 14:34:43 +02:00
34 changed files with 7679 additions and 0 deletions

View File

@@ -0,0 +1,53 @@
find_package(Threads)
find_package(Catch2 QUIET)
if (NOT Catch2_FOUND)
include(FetchContent)
FetchContent_Declare(
Catch2
GIT_REPOSITORY https://github.com/catchorg/Catch2.git
GIT_TAG v2.13.9)
FetchContent_MakeAvailable(Catch2)
list(APPEND CMAKE_MODULE_PATH ${catch2_SOURCE_DIR}/contrib)
endif()
add_executable(
unit_tests
buffer.cpp
message.cpp
context.cpp
socket.cpp
socket_ref.cpp
poller.cpp
active_poller.cpp
multipart.cpp
recv_multipart.cpp
send_multipart.cpp
codec_multipart.cpp
monitor.cpp
utilities.cpp
)
target_include_directories(unit_tests PUBLIC ${CATCH_MODULE_PATH})
target_link_libraries(
unit_tests
PRIVATE Catch2::Catch2
PRIVATE cppzmq
PRIVATE ${CMAKE_THREAD_LIBS_INIT}
)
OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF)
if (COVERAGE)
target_compile_options(unit_tests PRIVATE --coverage)
target_link_options(unit_tests PRIVATE --coverage)
message(STATUS "Coverage enabled")
endif()
include(CTest)
include(Catch)
catch_discover_tests(unit_tests)

View File

@@ -0,0 +1,446 @@
#include <zmq_addon.hpp>
#include "testutil.hpp"
#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_BUILD_DRAFT_API)
#include <array>
#include <memory>
TEST_CASE("create destroy", "[active_poller]")
{
zmq::active_poller_t active_poller;
CHECK(active_poller.empty());
}
static_assert(!std::is_copy_constructible<zmq::active_poller_t>::value,
"active_poller_t should not be copy-constructible");
static_assert(!std::is_copy_assignable<zmq::active_poller_t>::value,
"active_poller_t should not be copy-assignable");
static const zmq::active_poller_t::handler_type no_op_handler =
[](zmq::event_flags) {};
TEST_CASE("move construct empty", "[active_poller]")
{
zmq::active_poller_t a;
CHECK(a.empty());
zmq::active_poller_t b = std::move(a);
CHECK(b.empty());
CHECK(0u == a.size());
CHECK(0u == b.size());
}
TEST_CASE("move assign empty", "[active_poller]")
{
zmq::active_poller_t a;
CHECK(a.empty());
zmq::active_poller_t b;
CHECK(b.empty());
b = std::move(a);
CHECK(0u == a.size());
CHECK(0u == b.size());
CHECK(a.empty());
CHECK(b.empty());
}
TEST_CASE("move construct non empty", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t a;
a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {});
CHECK_FALSE(a.empty());
CHECK(1u == a.size());
zmq::active_poller_t b = std::move(a);
CHECK(a.empty());
CHECK(0u == a.size());
CHECK_FALSE(b.empty());
CHECK(1u == b.size());
}
TEST_CASE("move assign non empty", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t a;
a.add(socket, zmq::event_flags::pollin, no_op_handler);
CHECK_FALSE(a.empty());
CHECK(1u == a.size());
zmq::active_poller_t b;
b = std::move(a);
CHECK(a.empty());
CHECK(0u == a.size());
CHECK_FALSE(b.empty());
CHECK(1u == b.size());
}
TEST_CASE("add handler", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
CHECK_NOTHROW(
active_poller.add(socket, zmq::event_flags::pollin, no_op_handler));
}
TEST_CASE("add null handler fails", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
zmq::active_poller_t::handler_type handler;
CHECK_THROWS_AS(active_poller.add(socket, zmq::event_flags::pollin, handler),
std::invalid_argument);
}
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
// this behaviour was added by https://github.com/zeromq/libzmq/pull/3100
TEST_CASE("add handler invalid events type", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
short invalid_events_type = 2 << 10;
CHECK_THROWS_AS(
active_poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type),
no_op_handler),
zmq::error_t);
CHECK(active_poller.empty());
CHECK(0u == active_poller.size());
}
#endif
TEST_CASE("add handler twice throws", "[active_poller]")
{
common_server_client_setup s;
CHECK(s.client.send(zmq::message_t{}, zmq::send_flags::none));
zmq::active_poller_t active_poller;
bool message_received = false;
active_poller.add(
s.server, zmq::event_flags::pollin,
[&message_received](zmq::event_flags) { message_received = true; });
CHECK_THROWS_ZMQ_ERROR(
EINVAL, active_poller.add(s.server, zmq::event_flags::pollin, no_op_handler));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(message_received); // handler unmodified
}
TEST_CASE("wait with no handlers throws", "[active_poller]")
{
zmq::active_poller_t active_poller;
CHECK_THROWS_ZMQ_ERROR(EFAULT,
active_poller.wait(std::chrono::milliseconds{10}));
}
TEST_CASE("remove unregistered throws", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
CHECK_THROWS_ZMQ_ERROR(EINVAL, active_poller.remove(socket));
}
TEST_CASE("remove registered empty", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
active_poller.add(socket, zmq::event_flags::pollin, no_op_handler);
CHECK_NOTHROW(active_poller.remove(socket));
}
TEST_CASE("remove registered non empty", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
active_poller.add(socket, zmq::event_flags::pollin, no_op_handler);
CHECK_NOTHROW(active_poller.remove(socket));
}
namespace
{
struct server_client_setup : common_server_client_setup
{
zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) {
events = e;
};
zmq::event_flags events = zmq::event_flags::none;
};
const std::string hi_str = "Hi";
}
TEST_CASE("poll basic", "[active_poller]")
{
server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::active_poller_t active_poller;
bool message_received = false;
zmq::active_poller_t::handler_type handler =
[&message_received](zmq::event_flags events) {
CHECK(zmq::event_flags::none != (events & zmq::event_flags::pollin));
message_received = true;
};
CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(message_received);
}
/// \todo this contains multiple test cases that should be split up
TEST_CASE("client server", "[active_poller]")
{
const std::string send_msg = hi_str;
// Setup server and client
server_client_setup s;
// Setup active_poller
zmq::active_poller_t active_poller;
zmq::event_flags events;
zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) {
if (zmq::event_flags::none != (e & zmq::event_flags::pollin)) {
zmq::message_t zmq_msg;
CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message
std::string recv_msg(zmq_msg.data<char>(), zmq_msg.size());
CHECK(send_msg == recv_msg);
} else if (zmq::event_flags::none != (e & ~zmq::event_flags::pollout)) {
INFO("Unexpected event type " << static_cast<short>(events));
REQUIRE(false);
}
events = e;
};
CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(events == zmq::event_flags::pollin);
// Re-add server socket with pollout flag
CHECK_NOTHROW(active_poller.remove(s.server));
CHECK_NOTHROW(active_poller.add(
s.server, zmq::event_flags::pollin | zmq::event_flags::pollout, handler));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(events == zmq::event_flags::pollout);
}
TEST_CASE("add invalid socket throws", "[active_poller]")
{
zmq::context_t context;
zmq::active_poller_t active_poller;
zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move(a)};
CHECK_THROWS_AS(active_poller.add(a, zmq::event_flags::pollin, no_op_handler),
zmq::error_t);
}
TEST_CASE("remove invalid socket throws", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller;
CHECK_NOTHROW(
active_poller.add(socket, zmq::event_flags::pollin, no_op_handler));
CHECK(1u == active_poller.size());
std::vector<zmq::socket_t> sockets;
sockets.emplace_back(std::move(socket));
CHECK_THROWS_AS(active_poller.remove(socket), zmq::error_t);
CHECK(1u == active_poller.size());
}
TEST_CASE("wait on added empty handler", "[active_poller]")
{
server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::active_poller_t active_poller;
CHECK_NOTHROW(
active_poller.add(s.server, zmq::event_flags::pollin, no_op_handler));
CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1}));
}
TEST_CASE("modify empty throws", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::push};
zmq::active_poller_t active_poller;
CHECK_THROWS_AS(active_poller.modify(socket, zmq::event_flags::pollin),
zmq::error_t);
}
TEST_CASE("modify invalid socket throws", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{std::move(a)};
zmq::active_poller_t active_poller;
CHECK_THROWS_AS(active_poller.modify(a, zmq::event_flags::pollin),
zmq::error_t);
}
TEST_CASE("modify not added throws", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{context, zmq::socket_type::push};
zmq::active_poller_t active_poller;
CHECK_NOTHROW(active_poller.add(a, zmq::event_flags::pollin, no_op_handler));
CHECK_THROWS_AS(active_poller.modify(b, zmq::event_flags::pollin),
zmq::error_t);
}
TEST_CASE("modify simple", "[active_poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::active_poller_t active_poller;
CHECK_NOTHROW(active_poller.add(a, zmq::event_flags::pollin, no_op_handler));
CHECK_NOTHROW(
active_poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
}
TEST_CASE("poll client server", "[active_poller]")
{
// Setup server and client
server_client_setup s;
// Setup active_poller
zmq::active_poller_t active_poller;
CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, s.handler));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify events
CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500}));
CHECK(s.events == zmq::event_flags::pollin);
// Modify server socket with pollout flag
CHECK_NOTHROW(active_poller.modify(s.server, zmq::event_flags::pollin
| zmq::event_flags::pollout));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
CHECK(s.events == (zmq::event_flags::pollin | zmq::event_flags::pollout));
}
TEST_CASE("wait one return", "[active_poller]")
{
// Setup server and client
server_client_setup s;
int count = 0;
// Setup active_poller
zmq::active_poller_t active_poller;
CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin,
[&count](zmq::event_flags) { ++count; }));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify events
CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
CHECK(1u == count);
}
TEST_CASE("wait on move constructed active_poller", "[active_poller]")
{
server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::active_poller_t a;
CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, no_op_handler));
zmq::active_poller_t b{std::move(a)};
CHECK(1u == b.size());
CHECK(0u == a.size());
CHECK_THROWS_ZMQ_ERROR(EFAULT, a.wait(std::chrono::milliseconds{10}));
CHECK(b.wait(std::chrono::milliseconds{-1}));
}
TEST_CASE("wait on move assigned active_poller", "[active_poller]")
{
server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::active_poller_t a;
CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, no_op_handler));
zmq::active_poller_t b;
b = {std::move(a)};
CHECK(1u == b.size());
CHECK(0u == a.size());
CHECK_THROWS_ZMQ_ERROR(EFAULT, a.wait(std::chrono::milliseconds{10}));
CHECK(b.wait(std::chrono::milliseconds{-1}));
}
TEST_CASE("received on move constructed active_poller", "[active_poller]")
{
// Setup server and client
server_client_setup s;
int count = 0;
// Setup active_poller a
zmq::active_poller_t a;
CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin,
[&count](zmq::event_flags) { ++count; }));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify it is received
CHECK(1 == a.wait(std::chrono::milliseconds{500}));
CHECK(1u == count);
// Move construct active_poller b
zmq::active_poller_t b{std::move(a)};
// client sends message again
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify it is received
CHECK(1 == b.wait(std::chrono::milliseconds{500}));
CHECK(2u == count);
}
TEST_CASE("remove from handler", "[active_poller]")
{
constexpr size_t ITER_NO = 10;
// Setup servers and clients
std::vector<server_client_setup> setup_list;
for (size_t i = 0; i < ITER_NO; ++i)
setup_list.emplace_back(server_client_setup{});
// Setup active_poller
zmq::active_poller_t active_poller;
int count = 0;
for (size_t i = 0; i < ITER_NO; ++i) {
CHECK_NOTHROW(active_poller.add(
setup_list[i].server, zmq::event_flags::pollin,
[&, i](zmq::event_flags events) {
CHECK(events == zmq::event_flags::pollin);
active_poller.remove(setup_list[ITER_NO - i - 1].server);
CHECK((ITER_NO - i - 1) == active_poller.size());
}));
++count;
}
CHECK(ITER_NO == active_poller.size());
// Clients send messages
for (auto &s : setup_list) {
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
}
// Wait for all servers to receive a message
for (auto &s : setup_list) {
zmq::pollitem_t items[] = {{s.server, 0, ZMQ_POLLIN, 0}};
zmq::poll(&items[0], 1);
}
// Fire all handlers in one wait
CHECK(ITER_NO == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(ITER_NO == count);
}
#endif

View File

@@ -0,0 +1,306 @@
#include <catch2/catch.hpp>
#include <zmq.hpp>
#ifdef ZMQ_CPP17
static_assert(std::is_nothrow_swappable_v<zmq::const_buffer>);
static_assert(std::is_nothrow_swappable_v<zmq::mutable_buffer>);
static_assert(std::is_trivially_copyable_v<zmq::const_buffer>);
static_assert(std::is_trivially_copyable_v<zmq::mutable_buffer>);
#endif
#ifdef ZMQ_CPP11
using BT = int16_t;
TEST_CASE("buffer default ctor", "[buffer]")
{
constexpr zmq::mutable_buffer mb;
constexpr zmq::const_buffer cb;
CHECK(mb.size() == 0);
CHECK(mb.data() == nullptr);
CHECK(cb.size() == 0);
CHECK(cb.data() == nullptr);
}
TEST_CASE("buffer data ctor", "[buffer]")
{
std::vector<BT> v(10);
zmq::const_buffer cb(v.data(), v.size() * sizeof(BT));
CHECK(cb.size() == v.size() * sizeof(BT));
CHECK(cb.data() == v.data());
zmq::mutable_buffer mb(v.data(), v.size() * sizeof(BT));
CHECK(mb.size() == v.size() * sizeof(BT));
CHECK(mb.data() == v.data());
zmq::const_buffer from_mut = mb;
CHECK(mb.size() == from_mut.size());
CHECK(mb.data() == from_mut.data());
const auto cmb = mb;
static_assert(std::is_same<decltype(cmb.data()), void *>::value, "");
constexpr const void *cp = nullptr;
constexpr void *p = nullptr;
constexpr zmq::const_buffer cecb = zmq::buffer(p, 0);
constexpr zmq::mutable_buffer cemb = zmq::buffer(p, 0);
CHECK(cecb.data() == nullptr);
CHECK(cemb.data() == nullptr);
}
TEST_CASE("const_buffer operator+", "[buffer]")
{
std::vector<BT> v(10);
zmq::const_buffer cb(v.data(), v.size() * sizeof(BT));
const size_t shift = 4;
auto shifted = cb + shift;
CHECK(shifted.size() == v.size() * sizeof(BT) - shift);
CHECK(shifted.data() == v.data() + shift / sizeof(BT));
auto shifted2 = shift + cb;
CHECK(shifted.size() == shifted2.size());
CHECK(shifted.data() == shifted2.data());
auto cbinp = cb;
cbinp += shift;
CHECK(shifted.size() == cbinp.size());
CHECK(shifted.data() == cbinp.data());
}
TEST_CASE("mutable_buffer operator+", "[buffer]")
{
std::vector<BT> v(10);
zmq::mutable_buffer mb(v.data(), v.size() * sizeof(BT));
const size_t shift = 4;
auto shifted = mb + shift;
CHECK(shifted.size() == v.size() * sizeof(BT) - shift);
CHECK(shifted.data() == v.data() + shift / sizeof(BT));
auto shifted2 = shift + mb;
CHECK(shifted.size() == shifted2.size());
CHECK(shifted.data() == shifted2.data());
auto mbinp = mb;
mbinp += shift;
CHECK(shifted.size() == mbinp.size());
CHECK(shifted.data() == mbinp.data());
}
TEST_CASE("mutable_buffer creation basic", "[buffer]")
{
std::vector<BT> v(10);
zmq::mutable_buffer mb(v.data(), v.size() * sizeof(BT));
zmq::mutable_buffer mb2 = zmq::buffer(v.data(), v.size() * sizeof(BT));
CHECK(mb.data() == mb2.data());
CHECK(mb.size() == mb2.size());
zmq::mutable_buffer mb3 = zmq::buffer(mb);
CHECK(mb.data() == mb3.data());
CHECK(mb.size() == mb3.size());
zmq::mutable_buffer mb4 = zmq::buffer(mb, 10 * v.size() * sizeof(BT));
CHECK(mb.data() == mb4.data());
CHECK(mb.size() == mb4.size());
zmq::mutable_buffer mb5 = zmq::buffer(mb, 4);
CHECK(mb.data() == mb5.data());
CHECK(4 == mb5.size());
}
TEST_CASE("const_buffer creation basic", "[buffer]")
{
const std::vector<BT> v(10);
zmq::const_buffer cb(v.data(), v.size() * sizeof(BT));
zmq::const_buffer cb2 = zmq::buffer(v.data(), v.size() * sizeof(BT));
CHECK(cb.data() == cb2.data());
CHECK(cb.size() == cb2.size());
zmq::const_buffer cb3 = zmq::buffer(cb);
CHECK(cb.data() == cb3.data());
CHECK(cb.size() == cb3.size());
zmq::const_buffer cb4 = zmq::buffer(cb, 10 * v.size() * sizeof(BT));
CHECK(cb.data() == cb4.data());
CHECK(cb.size() == cb4.size());
zmq::const_buffer cb5 = zmq::buffer(cb, 4);
CHECK(cb.data() == cb5.data());
CHECK(4 == cb5.size());
}
TEST_CASE("mutable_buffer creation C array", "[buffer]")
{
BT d[10] = {};
zmq::mutable_buffer b = zmq::buffer(d);
CHECK(b.size() == 10 * sizeof(BT));
CHECK(b.data() == static_cast<BT*>(d));
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == static_cast<BT*>(d));
}
TEST_CASE("const_buffer creation C array", "[buffer]")
{
const BT d[10] = {};
zmq::const_buffer b = zmq::buffer(d);
CHECK(b.size() == 10 * sizeof(BT));
CHECK(b.data() == static_cast<const BT*>(d));
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == static_cast<const BT*>(d));
}
TEST_CASE("mutable_buffer creation array", "[buffer]")
{
std::array<BT, 10> d = {};
zmq::mutable_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(BT));
CHECK(b.data() == d.data());
zmq::mutable_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
TEST_CASE("const_buffer creation array", "[buffer]")
{
const std::array<BT, 10> d = {};
zmq::const_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(BT));
CHECK(b.data() == d.data());
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
TEST_CASE("const_buffer creation array 2", "[buffer]")
{
std::array<const BT, 10> d = {{}};
zmq::const_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(BT));
CHECK(b.data() == d.data());
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
TEST_CASE("mutable_buffer creation vector", "[buffer]")
{
std::vector<BT> d(10);
zmq::mutable_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(BT));
CHECK(b.data() == d.data());
zmq::mutable_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
d.clear();
b = zmq::buffer(d);
CHECK(b.size() == 0);
CHECK(b.data() == nullptr);
}
TEST_CASE("const_buffer creation vector", "[buffer]")
{
std::vector<BT> d(10);
zmq::const_buffer b = zmq::buffer(static_cast<const std::vector<BT> &>(d));
CHECK(b.size() == d.size() * sizeof(BT));
CHECK(b.data() == d.data());
zmq::const_buffer b2 = zmq::buffer(static_cast<const std::vector<BT> &>(d), 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
d.clear();
b = zmq::buffer(static_cast<const std::vector<BT> &>(d));
CHECK(b.size() == 0);
CHECK(b.data() == nullptr);
}
TEST_CASE("const_buffer creation string", "[buffer]")
{
const std::wstring d(10, L'a');
zmq::const_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(wchar_t));
CHECK(b.data() == d.data());
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
TEST_CASE("mutable_buffer creation string", "[buffer]")
{
std::wstring d(10, L'a');
zmq::mutable_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(wchar_t));
CHECK(b.data() == d.data());
zmq::mutable_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
#if CPPZMQ_HAS_STRING_VIEW
TEST_CASE("const_buffer creation string_view", "[buffer]")
{
std::wstring dstr(10, L'a');
std::wstring_view d = dstr;
zmq::const_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(wchar_t));
CHECK(b.data() == d.data());
zmq::const_buffer b2 = zmq::buffer(d, 4);
CHECK(b2.size() == 4);
CHECK(b2.data() == d.data());
}
#endif
TEST_CASE("const_buffer creation with str_buffer", "[buffer]")
{
const wchar_t wd[10] = {};
zmq::const_buffer b = zmq::str_buffer(wd);
CHECK(b.size() == 9 * sizeof(wchar_t));
CHECK(b.data() == static_cast<const wchar_t*>(wd));
zmq::const_buffer b2_null = zmq::buffer("hello");
constexpr zmq::const_buffer b2 = zmq::str_buffer("hello");
CHECK(b2_null.size() == 6);
CHECK(b2.size() == 5);
CHECK(std::string(static_cast<const char*>(b2.data()), b2.size()) == "hello");
}
TEST_CASE("const_buffer creation with zbuf string literal char", "[buffer]")
{
using namespace zmq::literals;
constexpr zmq::const_buffer b = "hello"_zbuf;
CHECK(b.size() == 5);
CHECK(std::memcmp(b.data(), "hello", b.size()) == 0);
}
TEST_CASE("const_buffer creation with zbuf string literal wchar_t", "[buffer]")
{
using namespace zmq::literals;
constexpr zmq::const_buffer b = L"hello"_zbuf;
CHECK(b.size() == 5 * sizeof(wchar_t));
CHECK(std::memcmp(b.data(), L"hello", b.size()) == 0);
}
TEST_CASE("const_buffer creation with zbuf string literal char16_t", "[buffer]")
{
using namespace zmq::literals;
constexpr zmq::const_buffer b = u"hello"_zbuf;
CHECK(b.size() == 5 * sizeof(char16_t));
CHECK(std::memcmp(b.data(), u"hello", b.size()) == 0);
}
TEST_CASE("const_buffer creation with zbuf string literal char32_t", "[buffer]")
{
using namespace zmq::literals;
constexpr zmq::const_buffer b = U"hello"_zbuf;
CHECK(b.size() == 5 * sizeof(char32_t));
CHECK(std::memcmp(b.data(), U"hello", b.size()) == 0);
}
TEST_CASE("buffer of structs", "[buffer]")
{
struct some_pod
{
int64_t val;
char arr[8];
};
struct some_non_pod
{
int64_t val;
char arr[8];
std::vector<int> s; // not trivially copyable
};
static_assert(zmq::detail::is_pod_like<some_pod>::value, "");
static_assert(!zmq::detail::is_pod_like<some_non_pod>::value, "");
std::array<some_pod, 1> d;
zmq::mutable_buffer b = zmq::buffer(d);
CHECK(b.size() == d.size() * sizeof(some_pod));
CHECK(b.data() == d.data());
}
#endif

View File

@@ -0,0 +1,210 @@
#include <catch2/catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_CPP11
TEST_CASE("multipart codec empty", "[codec_multipart]")
{
using namespace zmq;
multipart_t mmsg;
message_t msg = mmsg.encode();
CHECK(msg.size() == 0);
multipart_t mmsg2;
mmsg2.decode_append(msg);
CHECK(mmsg2.size() == 0);
}
TEST_CASE("multipart codec small", "[codec_multipart]")
{
using namespace zmq;
multipart_t mmsg;
mmsg.addstr("Hello World");
message_t msg = mmsg.encode();
CHECK(msg.size() == 1 + 11); // small size packing
mmsg.addstr("Second frame");
msg = mmsg.encode();
CHECK(msg.size() == 1 + 11 + 1 + 12);
multipart_t mmsg2;
mmsg2.decode_append(msg);
CHECK(mmsg2.size() == 2);
std::string part0 = mmsg2[0].to_string();
CHECK(part0 == "Hello World");
CHECK(mmsg2[1].to_string() == "Second frame");
}
TEST_CASE("multipart codec big", "[codec_multipart]")
{
using namespace zmq;
message_t big(495); // large size packing
big.data<char>()[0] = 'X';
multipart_t mmsg;
mmsg.pushmem(big.data(), big.size());
message_t msg = mmsg.encode();
CHECK(msg.size() == 5 + 495);
CHECK(msg.data<unsigned char>()[0] == std::numeric_limits<uint8_t>::max());
CHECK(msg.data<unsigned char>()[5] == 'X');
CHECK(mmsg.size() == 1);
mmsg.decode_append(msg);
CHECK(mmsg.size() == 2);
CHECK(mmsg[0].data<char>()[0] == 'X');
}
TEST_CASE("multipart codec decode bad data overflow", "[codec_multipart]")
{
using namespace zmq;
char bad_data[3] = {5, 'h', 'i'};
message_t wrong_size(bad_data, 3);
CHECK(wrong_size.size() == 3);
CHECK(wrong_size.data<char>()[0] == 5);
CHECK_THROWS_AS(
multipart_t::decode(wrong_size),
std::out_of_range);
}
TEST_CASE("multipart codec decode bad data extra data", "[codec_multipart]")
{
using namespace zmq;
char bad_data[3] = {1, 'h', 'i'};
message_t wrong_size(bad_data, 3);
CHECK(wrong_size.size() == 3);
CHECK(wrong_size.data<char>()[0] == 1);
CHECK_THROWS_AS(
multipart_t::decode(wrong_size),
std::out_of_range);
}
// After exercising it, this test is disabled over concern of running
// on hosts which lack enough free memory to allow the absurdly large
// message part to be allocated.
#if 0
TEST_CASE("multipart codec encode too big", "[codec_multipart]")
{
using namespace zmq;
const size_t too_big_size = 1L + std::numeric_limits<uint32_t>::max();
CHECK(too_big_size > std::numeric_limits<uint32_t>::max());
char* too_big_data = new char[too_big_size];
multipart_t mmsg(too_big_data, too_big_size);
delete [] too_big_data;
CHECK(mmsg.size() == 1);
CHECK(mmsg[0].size() > std::numeric_limits<uint32_t>::max());
CHECK_THROWS_AS(
mmsg.encode(),
std::range_error);
}
#endif
TEST_CASE("multipart codec free function with vector of message_t", "[codec_multipart]")
{
using namespace zmq;
std::vector<message_t> parts;
parts.emplace_back("Hello", 5);
parts.emplace_back("World",5);
auto msg = encode(parts);
CHECK(msg.size() == 1 + 5 + 1 + 5 );
CHECK(msg.data<unsigned char>()[0] == 5);
CHECK(msg.data<unsigned char>()[1] == 'H');
CHECK(msg.data<unsigned char>()[6] == 5);
CHECK(msg.data<unsigned char>()[7] == 'W');
std::vector<message_t> parts2;
decode(msg, std::back_inserter(parts2));
CHECK(parts.size() == 2);
CHECK(parts[0].size() == 5);
CHECK(parts[1].size() == 5);
}
TEST_CASE("multipart codec free function with vector of const_buffer", "[codec_multipart]")
{
using namespace zmq;
std::vector<const_buffer> parts;
parts.emplace_back("Hello", 5);
parts.emplace_back("World",5);
auto msg = encode(parts);
CHECK(msg.size() == 1 + 5 + 1 + 5 );
CHECK(msg.data<unsigned char>()[0] == 5);
CHECK(msg.data<unsigned char>()[1] == 'H');
CHECK(msg.data<unsigned char>()[6] == 5);
CHECK(msg.data<unsigned char>()[7] == 'W');
std::vector<message_t> parts2;
decode(msg, std::back_inserter(parts2));
CHECK(parts.size() == 2);
CHECK(parts[0].size() == 5);
CHECK(parts[1].size() == 5);
}
TEST_CASE("multipart codec free function with vector of mutable_buffer", "[codec_multipart]")
{
using namespace zmq;
std::vector<mutable_buffer> parts;
char hello[6] = "Hello";
parts.emplace_back(hello, 5);
char world[6] = "World";
parts.emplace_back(world,5);
auto msg = encode(parts);
CHECK(msg.size() == 1 + 5 + 1 + 5 );
CHECK(msg.data<unsigned char>()[0] == 5);
CHECK(msg.data<unsigned char>()[1] == 'H');
CHECK(msg.data<unsigned char>()[6] == 5);
CHECK(msg.data<unsigned char>()[7] == 'W');
std::vector<message_t> parts2;
decode(msg, std::back_inserter(parts2));
CHECK(parts.size() == 2);
CHECK(parts[0].size() == 5);
CHECK(parts[1].size() == 5);
}
TEST_CASE("multipart codec free function with multipart_t", "[codec_multipart]")
{
using namespace zmq;
multipart_t mmsg;
mmsg.addstr("Hello");
mmsg.addstr("World");
auto msg = encode(mmsg);
CHECK(msg.size() == 1 + 5 + 1 + 5);
CHECK(msg.data<unsigned char>()[0] == 5);
CHECK(msg.data<unsigned char>()[1] == 'H');
CHECK(msg.data<unsigned char>()[6] == 5);
CHECK(msg.data<unsigned char>()[7] == 'W');
multipart_t mmsg2;
decode(msg, std::back_inserter(mmsg2));
CHECK(mmsg2.size() == 2);
CHECK(mmsg2[0].size() == 5);
CHECK(mmsg2[1].size() == 5);
}
TEST_CASE("multipart codec static method decode to multipart_t", "[codec_multipart]")
{
using namespace zmq;
multipart_t mmsg;
mmsg.addstr("Hello");
mmsg.addstr("World");
auto msg = encode(mmsg);
auto mmsg2 = multipart_t::decode(msg);
CHECK(mmsg2.size() == 2);
CHECK(mmsg2[0].size() == 5);
CHECK(mmsg2[1].size() == 5);
}
#endif

View File

@@ -0,0 +1,84 @@
#include <catch2/catch.hpp>
#include <zmq.hpp>
#if (__cplusplus >= 201703L)
static_assert(std::is_nothrow_swappable<zmq::context_t>::value,
"context_t should be nothrow swappable");
#endif
TEST_CASE("context construct default and destroy", "[context]")
{
zmq::context_t context;
}
TEST_CASE("context create, close and destroy", "[context]")
{
zmq::context_t context;
context.close();
CHECK(NULL == context.handle());
}
TEST_CASE("context shutdown", "[context]")
{
zmq::context_t context;
context.shutdown();
CHECK(NULL != context.handle());
context.close();
CHECK(NULL == context.handle());
}
TEST_CASE("context shutdown again", "[context]")
{
zmq::context_t context;
context.shutdown();
context.shutdown();
CHECK(NULL != context.handle());
context.close();
CHECK(NULL == context.handle());
}
#ifdef ZMQ_CPP11
TEST_CASE("context swap", "[context]")
{
zmq::context_t context1;
zmq::context_t context2;
using std::swap;
swap(context1, context2);
}
TEST_CASE("context - use socket after shutdown", "[context]")
{
zmq::context_t context;
zmq::socket_t sock(context, zmq::socket_type::rep);
context.shutdown();
try
{
sock.connect("inproc://test");
zmq::message_t msg;
(void)sock.recv(msg, zmq::recv_flags::dontwait);
REQUIRE(false);
}
catch (const zmq::error_t& e)
{
REQUIRE(e.num() == ETERM);
}
}
TEST_CASE("context set/get options", "[context]")
{
zmq::context_t context;
#if defined(ZMQ_BLOCKY) && defined(ZMQ_IO_THREADS)
context.set(zmq::ctxopt::blocky, false);
context.set(zmq::ctxopt::io_threads, 5);
CHECK(context.get(zmq::ctxopt::io_threads) == 5);
#endif
CHECK_THROWS_AS(
context.set(static_cast<zmq::ctxopt>(-42), 5),
zmq::error_t);
CHECK_THROWS_AS(
context.get(static_cast<zmq::ctxopt>(-42)),
zmq::error_t);
}
#endif

View File

@@ -0,0 +1,246 @@
#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
#include <zmq.hpp>
#if defined(ZMQ_CPP11)
static_assert(!std::is_copy_constructible<zmq::message_t>::value,
"message_t should not be copy-constructible");
static_assert(!std::is_copy_assignable<zmq::message_t>::value,
"message_t should not be copy-assignable");
#endif
#if (__cplusplus >= 201703L)
static_assert(std::is_nothrow_swappable<zmq::message_t>::value,
"message_t should be nothrow swappable");
#endif
TEST_CASE("message default constructed", "[message]")
{
const zmq::message_t message;
CHECK(0u == message.size());
CHECK(message.empty());
}
#ifdef ZMQ_CPP11
TEST_CASE("message swap", "[message]")
{
const std::string data = "foo";
zmq::message_t message1;
zmq::message_t message2(data.data(), data.size());
using std::swap;
swap(message1, message2);
CHECK(message1.size() == data.size());
CHECK(message2.size() == 0);
swap(message1, message2);
CHECK(message1.size() == 0);
CHECK(message2.size() == data.size());
}
#endif
namespace
{
const char *const data = "Hi";
}
TEST_CASE("message constructor with iterators", "[message]")
{
const std::string hi(data);
const zmq::message_t hi_msg(hi.begin(), hi.end());
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
TEST_CASE("message constructor with size", "[message]")
{
const zmq::message_t msg(5);
CHECK(msg.size() == 5);
}
TEST_CASE("message constructor with buffer and size", "[message]")
{
const std::string hi(data);
const zmq::message_t hi_msg(hi.data(), hi.size());
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
TEST_CASE("message constructor with char array", "[message]")
{
const zmq::message_t hi_msg(data, strlen(data));
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL)
TEST_CASE("message constructor with container - deprecated", "[message]")
{
zmq::message_t hi_msg("Hi"); // deprecated
REQUIRE(3u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 3));
}
TEST_CASE("message constructor with container of trivial data", "[message]")
{
int buf[3] = {1, 2, 3};
zmq::message_t msg(buf);
REQUIRE(sizeof(buf) == msg.size());
CHECK(0 == memcmp(buf, msg.data(), msg.size()));
}
TEST_CASE("message constructor with strings", "[message]")
{
SECTION("string")
{
const std::string hi(data);
zmq::message_t hi_msg(hi);
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
#if CPPZMQ_HAS_STRING_VIEW
SECTION("string_view")
{
const std::string_view hi(data);
zmq::message_t hi_msg(hi);
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
#endif
}
#endif
#ifdef ZMQ_HAS_RVALUE_REFS
TEST_CASE("message move constructor", "[message]")
{
zmq::message_t hi_msg(zmq::message_t(data, strlen(data)));
}
TEST_CASE("message assign move empty before", "[message]")
{
zmq::message_t hi_msg;
hi_msg = zmq::message_t(data, strlen(data));
CHECK(2u == hi_msg.size());
CHECK(0 == memcmp(data, hi_msg.data(), 2));
}
TEST_CASE("message assign move empty after", "[message]")
{
zmq::message_t hi_msg(data, strlen(data));
CHECK(!hi_msg.empty());
hi_msg = zmq::message_t();
CHECK(0u == hi_msg.size());
CHECK(hi_msg.empty());
}
TEST_CASE("message assign move empty before and after", "[message]")
{
zmq::message_t hi_msg;
hi_msg = zmq::message_t();
CHECK(0u == hi_msg.size());
}
#endif
TEST_CASE("message equality self", "[message]")
{
const zmq::message_t hi_msg(data, strlen(data));
CHECK(hi_msg == hi_msg);
}
TEST_CASE("message equality equal", "[message]")
{
const zmq::message_t hi_msg_a(data, strlen(data));
const zmq::message_t hi_msg_b(data, strlen(data));
CHECK(hi_msg_a == hi_msg_b);
}
TEST_CASE("message equality equal empty", "[message]")
{
const zmq::message_t msg_a;
const zmq::message_t msg_b;
CHECK(msg_a == msg_b);
}
TEST_CASE("message equality non equal", "[message]")
{
const zmq::message_t msg_a("Hi", 2);
const zmq::message_t msg_b("Hello", 5);
CHECK(msg_a != msg_b);
}
TEST_CASE("message equality non equal rhs empty", "[message]")
{
const zmq::message_t msg_a("Hi", 2);
const zmq::message_t msg_b;
CHECK(msg_a != msg_b);
}
TEST_CASE("message equality non equal lhs empty", "[message]")
{
const zmq::message_t msg_a;
const zmq::message_t msg_b("Hi", 2);
CHECK(msg_a != msg_b);
}
TEST_CASE("message to string", "[message]")
{
const zmq::message_t a;
const zmq::message_t b("Foo", 3);
CHECK(a.to_string() == "");
CHECK(b.to_string() == "Foo");
#if CPPZMQ_HAS_STRING_VIEW
CHECK(a.to_string_view() == "");
CHECK(b.to_string_view() == "Foo");
#endif
#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL)
const zmq::message_t depr("Foo"); // deprecated
CHECK(depr.to_string() != "Foo");
CHECK(depr.to_string() == std::string("Foo", 4));
#endif
}
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
TEST_CASE("message routing id persists", "[message]")
{
zmq::message_t msg;
msg.set_routing_id(123);
CHECK(123u == msg.routing_id());
}
TEST_CASE("message group persists", "[message]")
{
zmq::message_t msg;
msg.set_group("mygroup");
CHECK(std::string(msg.group()) == "mygroup");
}
#endif
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 2, 0)
TEST_CASE("message is not shared", "[message]")
{
zmq::message_t msg;
CHECK(msg.get(ZMQ_SHARED) == 0);
}
TEST_CASE("message is shared", "[message]")
{
size_t msg_sz = 1024; // large enough to be a type_lmsg
zmq::message_t msg1(msg_sz);
zmq::message_t msg2;
msg2.copy(msg1);
CHECK(msg1.get(ZMQ_SHARED) == 1);
CHECK(msg2.get(ZMQ_SHARED) == 1);
CHECK(msg1.size() == msg_sz);
CHECK(msg2.size() == msg_sz);
}
TEST_CASE("message move is not shared", "[message]")
{
size_t msg_sz = 1024; // large enough to be a type_lmsg
zmq::message_t msg1(msg_sz);
zmq::message_t msg2;
msg2.move(msg1);
CHECK(msg1.get(ZMQ_SHARED) == 0);
CHECK(msg2.get(ZMQ_SHARED) == 0);
CHECK(msg2.size() == msg_sz);
CHECK(msg1.size() == 0);
}
#endif

View File

@@ -0,0 +1,152 @@
#include "testutil.hpp"
#ifdef ZMQ_CPP11
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class mock_monitor_t : public zmq::monitor_t
{
public:
void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE
{
++connected;
++total;
}
int total{0};
int connected{0};
};
#endif
TEST_CASE("monitor create destroy", "[monitor]")
{
zmq::monitor_t monitor;
}
#if defined(ZMQ_CPP11)
TEST_CASE("monitor move construct", "[monitor]")
{
zmq::context_t ctx;
zmq::socket_t sock(ctx, ZMQ_DEALER);
SECTION("move ctor empty") {
zmq::monitor_t monitor1;
zmq::monitor_t monitor2 = std::move(monitor1);
}
SECTION("move ctor init") {
zmq::monitor_t monitor1;
monitor1.init(sock, "inproc://monitor-client");
zmq::monitor_t monitor2 = std::move(monitor1);
}
}
TEST_CASE("monitor move assign", "[monitor]")
{
zmq::context_t ctx;
zmq::socket_t sock(ctx, ZMQ_DEALER);
SECTION("move assign empty") {
zmq::monitor_t monitor1;
zmq::monitor_t monitor2;
monitor1 = std::move(monitor2);
}
SECTION("move assign init") {
zmq::monitor_t monitor1;
monitor1.init(sock, "inproc://monitor-client");
zmq::monitor_t monitor2;
monitor2 = std::move(monitor1);
}
SECTION("move assign init both") {
zmq::monitor_t monitor1;
monitor1.init(sock, "inproc://monitor-client");
zmq::monitor_t monitor2;
zmq::socket_t sock2(ctx, ZMQ_DEALER);
monitor2.init(sock2, "inproc://monitor-client2");
monitor2 = std::move(monitor1);
}
}
TEST_CASE("monitor init event count", "[monitor]")
{
common_server_client_setup s{false};
mock_monitor_t monitor;
const int expected_event_count = 1;
monitor.init(s.client, "inproc://foo");
CHECK_FALSE(monitor.check_event(0));
s.init();
while (monitor.check_event(1000) && monitor.total < expected_event_count) {
}
CHECK(monitor.connected == 1);
CHECK(monitor.total == expected_event_count);
}
TEST_CASE("monitor init abort", "[monitor]")
{
class mock_monitor : public mock_monitor_t
{
public:
mock_monitor(std::function<void(void)> handle_connected) :
handle_connected{std::move(handle_connected)}
{
}
void on_event_connected(const zmq_event_t &e, const char *m) ZMQ_OVERRIDE
{
mock_monitor_t::on_event_connected(e, m);
handle_connected();
}
std::function<void(void)> handle_connected;
};
common_server_client_setup s(false);
std::mutex mutex;
std::condition_variable cond_var;
bool done{false};
mock_monitor monitor([&]()
{
std::lock_guard<std::mutex> lock(mutex);
done = true;
cond_var.notify_one();
});
monitor.init(s.client, "inproc://foo");
auto thread = std::thread([&monitor]
{
while (monitor.check_event(-1)) {
}
});
s.init();
{
std::unique_lock<std::mutex> lock(mutex);
CHECK(cond_var.wait_for(lock, std::chrono::seconds(1),
[&done] { return done; }));
}
CHECK(monitor.connected == 1);
monitor.abort();
thread.join();
}
TEST_CASE("monitor from move assigned socket", "[monitor]")
{
zmq::context_t ctx;
zmq::socket_t sock;
sock = std::move([&ctx] {
zmq::socket_t sock(ctx, ZMQ_DEALER);
return sock;
}());
zmq::monitor_t monitor1;
monitor1.init(sock, "inproc://monitor-client");
// On failure, this test might hang indefinitely instead of immediately
// failing
}
#endif

View File

@@ -0,0 +1,212 @@
#include <catch2/catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_HAS_RVALUE_REFS
#ifdef ZMQ_CPP17
static_assert(std::is_invocable<decltype(&zmq::multipart_t::send),
zmq::multipart_t *,
zmq::socket_ref,
int>::value,
"Can't multipart_t::send with socket_ref");
static_assert(std::is_invocable<decltype(&zmq::multipart_t::recv),
zmq::multipart_t *,
zmq::socket_ref,
int>::value,
"Can't multipart_t::recv with socket_ref");
#endif
static_assert(std::is_constructible<zmq::multipart_t, zmq::socket_ref>::value,
"Can't construct with socket_ref");
/// \todo split this up into separate test cases
///
TEST_CASE("multipart legacy test", "[multipart]")
{
using namespace zmq;
bool ok = true;
(void) ok;
float num = 0;
(void) num;
std::string str = "";
message_t msg;
// Create two PAIR sockets and connect over inproc
context_t context(1);
socket_t output(context, ZMQ_PAIR);
socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
// Test send and receive of single-frame message
multipart_t multipart;
assert(multipart.empty());
multipart.push(message_t("Hello", 5));
assert(multipart.size() == 1);
ok = multipart.send(output);
assert(multipart.empty());
assert(ok);
ok = multipart.recv(input);
assert(multipart.size() == 1);
assert(ok);
msg = multipart.pop();
assert(multipart.empty());
assert(std::string(msg.data<char>(), msg.size()) == "Hello");
// Test send and receive of multi-frame message
multipart.addstr("A");
multipart.addstr("BB");
multipart.addstr("CCC");
assert(multipart.size() == 3);
multipart_t copy = multipart.clone();
assert(copy.size() == 3);
ok = copy.send(output);
assert(copy.empty());
assert(ok);
ok = copy.recv(input);
assert(copy.size() == 3);
assert(ok);
assert(copy.equal(&multipart));
// Test equality operators
assert(copy == multipart);
assert(multipart == copy);
multipart.pop();
assert(copy != multipart);
assert(multipart != copy);
multipart_t emptyMessage1 {};
multipart_t emptyMessage2 {};
assert(emptyMessage1 == emptyMessage2);
assert(emptyMessage2 == emptyMessage1);
multipart.clear();
assert(multipart.empty());
// Test message frame manipulation
multipart.add(message_t("Frame5", 6));
multipart.addstr("Frame6");
multipart.addstr("Frame7");
multipart.addtyp(8.0f);
multipart.addmem("Frame9", 6);
multipart.push(message_t("Frame4", 6));
multipart.pushstr("Frame3");
multipart.pushstr("Frame2");
multipart.pushtyp(1.0f);
multipart.pushmem("Frame0", 6);
assert(multipart.size() == 10);
const message_t &front_msg = multipart.front();
assert(multipart.size() == 10);
assert(std::string(front_msg.data<char>(), front_msg.size()) == "Frame0");
const message_t &back_msg = multipart.back();
assert(multipart.size() == 10);
assert(std::string(back_msg.data<char>(), back_msg.size()) == "Frame9");
msg = multipart.remove();
assert(multipart.size() == 9);
assert(std::string(msg.data<char>(), msg.size()) == "Frame9");
msg = multipart.pop();
assert(multipart.size() == 8);
assert(std::string(msg.data<char>(), msg.size()) == "Frame0");
num = multipart.poptyp<float>();
assert(multipart.size() == 7);
assert(num == 1.0f);
str = multipart.popstr();
assert(multipart.size() == 6);
assert(str == "Frame2");
str = multipart.popstr();
assert(multipart.size() == 5);
assert(str == "Frame3");
str = multipart.popstr();
assert(multipart.size() == 4);
assert(str == "Frame4");
str = multipart.popstr();
assert(multipart.size() == 3);
assert(str == "Frame5");
str = multipart.popstr();
assert(multipart.size() == 2);
assert(str == "Frame6");
str = multipart.popstr();
assert(multipart.size() == 1);
assert(str == "Frame7");
num = multipart.poptyp<float>();
assert(multipart.empty());
assert(num == 8.0f);
// Test message constructors and concatenation
multipart_t head("One", 3);
head.addstr("Two");
assert(head.size() == 2);
multipart_t tail(std::string("One-hundred"));
tail.pushstr("Ninety-nine");
assert(tail.size() == 2);
multipart_t tmp(message_t("Fifty", 5));
assert(tmp.size() == 1);
multipart_t mid = multipart_t::create(49.0f);
mid.append(std::move(tmp));
assert(mid.size() == 2);
assert(tmp.empty());
multipart_t merged(std::move(mid));
merged.prepend(std::move(head));
merged.append(std::move(tail));
assert(merged.size() == 6);
assert(head.empty());
assert(tail.empty());
ok = merged.send(output);
assert(merged.empty());
assert(ok);
multipart_t received(input);
assert(received.size() == 6);
str = received.popstr();
assert(received.size() == 5);
assert(str == "One");
str = received.popstr();
assert(received.size() == 4);
assert(str == "Two");
num = received.poptyp<float>();
assert(received.size() == 3);
assert(num == 49.0f);
str = received.popstr();
assert(received.size() == 2);
assert(str == "Fifty");
str = received.popstr();
assert(received.size() == 1);
assert(str == "Ninety-nine");
str = received.popstr();
assert(received.empty());
assert(str == "One-hundred");
}
#endif

View File

@@ -0,0 +1,356 @@
#include "testutil.hpp"
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
#include <array>
#include <memory>
#ifdef ZMQ_CPP17
static_assert(std::is_nothrow_swappable_v<zmq::poller_t<>>);
#endif
static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<>), "");
static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<zmq::
no_user_data>), "");
static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<int>), "");
static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<>), "");
static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<int>), "");
static_assert(!std::is_copy_constructible<zmq::poller_t<>>::value,
"poller_t should not be copy-constructible");
static_assert(!std::is_copy_assignable<zmq::poller_t<>>::value,
"poller_t should not be copy-assignable");
TEST_CASE("event flags", "[poller]")
{
CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout)
== static_cast<zmq::event_flags>(ZMQ_POLLIN | ZMQ_POLLOUT));
CHECK((zmq::event_flags::pollin & zmq::event_flags::pollout)
== static_cast<zmq::event_flags>(ZMQ_POLLIN & ZMQ_POLLOUT));
CHECK((zmq::event_flags::pollin ^ zmq::event_flags::pollout)
== static_cast<zmq::event_flags>(ZMQ_POLLIN ^ ZMQ_POLLOUT));
CHECK(~zmq::event_flags::pollin == static_cast<zmq::event_flags>(~ZMQ_POLLIN));
}
TEST_CASE("poller create destroy", "[poller]")
{
zmq::poller_t<> a;
#ifdef ZMQ_CPP17 // CTAD
zmq::poller_t b;
zmq::poller_event e;
#endif
}
TEST_CASE("poller move construct empty", "[poller]")
{
zmq::poller_t<> a;
zmq::poller_t<> b = std::move(a);
}
TEST_CASE("poller move assign empty", "[poller]")
{
zmq::poller_t<> a;
zmq::poller_t<> b;
b = std::move(a);
}
TEST_CASE("poller swap", "[poller]")
{
zmq::poller_t<> a;
zmq::poller_t<> b;
using std::swap;
swap(a, b);
}
TEST_CASE("poller move construct non empty", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> a;
a.add(socket, zmq::event_flags::pollin);
zmq::poller_t<> b = std::move(a);
}
TEST_CASE("poller move assign non empty", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> a;
a.add(socket, zmq::event_flags::pollin);
zmq::poller_t<> b;
b = std::move(a);
}
TEST_CASE("poller add nullptr", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<void> poller;
CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, nullptr));
}
TEST_CASE("poller add non nullptr", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<int> poller;
int i;
CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, &i));
}
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
// this behaviour was added by https://github.com/zeromq/libzmq/pull/3100
TEST_CASE("poller add handler invalid events type", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
short invalid_events_type = 2 << 10;
CHECK_THROWS_AS(
poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type)),
zmq::error_t);
}
#endif
TEST_CASE("poller add handler twice throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
poller.add(socket, zmq::event_flags::pollin);
/// \todo the actual error code should be checked
CHECK_THROWS_AS(poller.add(socket, zmq::event_flags::pollin),
zmq::error_t);
}
TEST_CASE("poller wait with no handlers throws", "[poller]")
{
zmq::poller_t<> poller;
std::vector<zmq::poller_event<>> events;
/// \todo the actual error code should be checked
CHECK_THROWS_AS(poller.wait_all(events, std::chrono::milliseconds{10}),
zmq::error_t);
}
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 3)
TEST_CASE("poller add/remove size checks", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
CHECK(poller.size() == 0);
poller.add(socket, zmq::event_flags::pollin);
CHECK(poller.size() == 1);
CHECK_NOTHROW(poller.remove(socket));
CHECK(poller.size() == 0);
}
#endif
TEST_CASE("poller remove unregistered throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
/// \todo the actual error code should be checked
CHECK_THROWS_AS(poller.remove(socket), zmq::error_t);
}
TEST_CASE("poller remove registered empty", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
poller.add(socket, zmq::event_flags::pollin);
CHECK_NOTHROW(poller.remove(socket));
}
TEST_CASE("poller remove registered non empty", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<int> poller;
int empty{};
poller.add(socket, zmq::event_flags::pollin, &empty);
CHECK_NOTHROW(poller.remove(socket));
}
const std::string hi_str = "Hi";
TEST_CASE("poller poll basic", "[poller]")
{
common_server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::poller_t<int> poller;
std::vector<zmq::poller_event<int>> events{1};
int i = 0;
CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &i));
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{-1}));
CHECK(s.server == events[0].socket);
CHECK(&i == events[0].user_data);
}
TEST_CASE("poller add invalid socket throws", "[poller]")
{
zmq::context_t context;
zmq::poller_t<> poller;
zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move(a)};
CHECK_THROWS_AS(poller.add(a, zmq::event_flags::pollin), zmq::error_t);
}
TEST_CASE("poller remove invalid socket throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin));
std::vector<zmq::socket_t> sockets;
sockets.emplace_back(std::move(socket));
CHECK_THROWS_AS(poller.remove(socket), zmq::error_t);
CHECK_NOTHROW(poller.remove(sockets[0]));
}
TEST_CASE("poller modify empty throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::push};
zmq::poller_t<> poller;
CHECK_THROWS_AS(poller.modify(socket, zmq::event_flags::pollin),
zmq::error_t);
}
TEST_CASE("poller modify invalid socket throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{std::move(a)};
zmq::poller_t<> poller;
CHECK_THROWS_AS(poller.modify(a, zmq::event_flags::pollin), zmq::error_t);
}
TEST_CASE("poller modify not added throws", "[poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{context, zmq::socket_type::push};
zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
CHECK_THROWS_AS(poller.modify(b, zmq::event_flags::pollin), zmq::error_t);
}
TEST_CASE("poller modify simple", "[poller]")
{
zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push};
zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
CHECK_NOTHROW(
poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
}
TEST_CASE("poller poll client server", "[poller]")
{
// Setup server and client
common_server_client_setup s;
// Setup poller
zmq::poller_t<zmq::socket_t> poller;
CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &s.server));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify events
std::vector<zmq::poller_event<zmq::socket_t>> events(1);
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
CHECK(zmq::event_flags::pollin == events[0].events);
// Modify server socket with pollout flag
CHECK_NOTHROW(
poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout
));
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout) == events[0].events)
;
}
TEST_CASE("poller wait one return", "[poller]")
{
// Setup server and client
common_server_client_setup s;
// Setup poller
zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin));
// client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
// wait for message and verify events
std::vector<zmq::poller_event<>> events(1);
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
}
TEST_CASE("poller wait on move constructed", "[poller]")
{
common_server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::poller_t<> a;
CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
zmq::poller_t<> b{std::move(a)};
std::vector<zmq::poller_event<>> events(1);
/// \todo the actual error code should be checked
CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}),
zmq::error_t);
CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
}
TEST_CASE("poller wait on move assigned", "[poller]")
{
common_server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
zmq::poller_t<> a;
CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
zmq::poller_t<> b;
b = {std::move(a)};
/// \todo the TEST_CASE error code should be checked
std::vector<zmq::poller_event<>> events(1);
CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}),
zmq::error_t);
CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
}
TEST_CASE("poller remove from handler", "[poller]")
{
constexpr size_t ITER_NO = 10;
// Setup servers and clients
std::vector<common_server_client_setup> setup_list;
for (size_t i = 0; i < ITER_NO; ++i)
setup_list.emplace_back(common_server_client_setup{});
// Setup poller
zmq::poller_t<> poller;
for (size_t i = 0; i < ITER_NO; ++i) {
CHECK_NOTHROW(poller.add(setup_list[i].server, zmq::event_flags::pollin));
}
// Clients send messages
for (auto &s : setup_list) {
CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
}
// Wait for all servers to receive a message
for (auto &s : setup_list) {
zmq::pollitem_t items[] = {{s.server, 0, ZMQ_POLLIN, 0}};
zmq::poll(&items[0], 1);
}
// Fire all handlers in one wait
std::vector<zmq::poller_event<>> events(ITER_NO);
CHECK(ITER_NO == poller.wait_all(events, std::chrono::milliseconds{-1}));
}
#endif

View File

@@ -0,0 +1,139 @@
#include <catch2/catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_CPP11
TEST_CASE("recv_multipart test", "[recv_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 1 message") {
input.send(zmq::str_buffer("hello"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 1);
REQUIRE(msgs.size() == 1);
CHECK(msgs[0].size() == 5);
}
SECTION("send 2 messages") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 2);
REQUIRE(msgs.size() == 2);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 6);
}
SECTION("send no messages, dontwait") {
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs),
zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("send 1 partial message, dontwait") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs),
zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("recv with invalid socket") {
std::vector<zmq::message_t> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)),
zmq::error_t);
}
}
TEST_CASE("recv_multipart_n test", "[recv_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 1 message") {
input.send(zmq::str_buffer("hello"));
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 1);
CHECK(msgs[0].size() == 5);
}
SECTION("send 1 message 2") {
input.send(zmq::str_buffer("hello"));
std::array<zmq::message_t, 2> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 1);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 0);
}
SECTION("send 2 messages, recv 1") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart_n(output, msgs.data(), msgs.size()),
std::runtime_error);
}
SECTION("recv 0") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart_n(output, msgs.data(), 0),
std::runtime_error);
}
SECTION("send 2 messages") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 2> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 2);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 6);
}
SECTION("send no messages, dontwait") {
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(),
zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs[0].size() == 0);
}
SECTION("send 1 partial message, dontwait") {
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(),
zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs[0].size() == 0);
}
SECTION("recv with invalid socket") {
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart_n(zmq::socket_ref(), msgs.data(), msgs.size()),
zmq::error_t);
}
}
#endif

View File

@@ -0,0 +1,121 @@
#include <catch2/catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_CPP11
#include <forward_list>
TEST_CASE("send_multipart test", "[send_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 0 messages") {
std::vector<zmq::message_t> imsgs;
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 0);
}
SECTION("send 1 message") {
std::array<zmq::message_t, 1> imsgs = {zmq::message_t(3)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 1);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 1);
REQUIRE(omsgs.size() == 1);
CHECK(omsgs[0].size() == 3);
}
SECTION("send 2 messages") {
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, const_buffer") {
std::array<zmq::const_buffer, 2> imsgs = {zmq::str_buffer("foo"),
zmq::str_buffer("bar!")};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, mutable_buffer") {
char buf[4] = {};
std::array<zmq::mutable_buffer, 2> imsgs = {
zmq::buffer(buf, 3), zmq::buffer(buf)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, dontwait") {
zmq::socket_t push(context, ZMQ_PUSH);
push.bind("inproc://multipart.test.push");
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(push, imsgs, zmq::send_flags::dontwait);
REQUIRE_FALSE(iret);
}
SECTION("send, misc. containers") {
std::vector<zmq::message_t> msgs_vec;
msgs_vec.emplace_back(3);
msgs_vec.emplace_back(4);
auto iret = zmq::send_multipart(input, msgs_vec);
REQUIRE(iret);
CHECK(*iret == 2);
std::forward_list<zmq::message_t> msgs_list;
msgs_list.emplace_front(4);
msgs_list.emplace_front(3);
iret = zmq::send_multipart(input, msgs_list);
REQUIRE(iret);
CHECK(*iret == 2);
// init. list
const auto msgs_il = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
iret = zmq::send_multipart(input, msgs_il);
REQUIRE(iret);
CHECK(*iret == 2);
// rvalue
iret = zmq::send_multipart(input,
std::initializer_list<zmq::const_buffer>{
zmq::str_buffer("foo"),
zmq::str_buffer("bar!")});
REQUIRE(iret);
CHECK(*iret == 2);
}
SECTION("send with invalid socket") {
std::vector<zmq::message_t> msgs(1);
CHECK_THROWS_AS(zmq::send_multipart(zmq::socket_ref(), msgs),
zmq::error_t);
}
}
#endif

View File

@@ -0,0 +1,700 @@
#include <catch2/catch.hpp>
#include <zmq.hpp>
#ifdef ZMQ_CPP11
#include <future>
#endif
#if (__cplusplus >= 201703L)
static_assert(std::is_nothrow_swappable<zmq::socket_t>::value,
"socket_t should be nothrow swappable");
#endif
TEST_CASE("socket default ctor", "[socket]")
{
zmq::socket_t socket;
}
TEST_CASE("socket create destroy", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket(context, ZMQ_ROUTER);
}
#ifdef ZMQ_CPP11
TEST_CASE("socket create assign", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket(context, ZMQ_ROUTER);
CHECK(static_cast<bool>(socket));
CHECK(socket.handle() != nullptr);
socket = {};
CHECK(!static_cast<bool>(socket));
CHECK(socket.handle() == nullptr);
}
TEST_CASE("socket create by enum and destroy", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket(context, zmq::socket_type::router);
}
TEST_CASE("socket swap", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket1(context, zmq::socket_type::router);
zmq::socket_t socket2(context, zmq::socket_type::dealer);
using std::swap;
swap(socket1, socket2);
}
#ifdef ZMQ_CPP11
TEST_CASE("socket options", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket(context, zmq::socket_type::router);
#ifdef ZMQ_IMMEDIATE
socket.set(zmq::sockopt::immediate, 0);
socket.set(zmq::sockopt::immediate, false);
CHECK(socket.get(zmq::sockopt::immediate) == false);
// unit out of range
CHECK_THROWS_AS(socket.set(zmq::sockopt::immediate, 80), zmq::error_t);
#endif
#ifdef ZMQ_LINGER
socket.set(zmq::sockopt::linger, 55);
CHECK(socket.get(zmq::sockopt::linger) == 55);
#endif
#ifdef ZMQ_ROUTING_ID
const std::string id = "foobar";
socket.set(zmq::sockopt::routing_id, "foobar");
socket.set(zmq::sockopt::routing_id, zmq::buffer(id));
socket.set(zmq::sockopt::routing_id, id);
#if CPPZMQ_HAS_STRING_VIEW
socket.set(zmq::sockopt::routing_id, std::string_view{id});
#endif
std::string id_ret(10, ' ');
auto size = socket.get(zmq::sockopt::routing_id, zmq::buffer(id_ret));
id_ret.resize(size);
CHECK(id == id_ret);
auto stropt = socket.get(zmq::sockopt::routing_id);
CHECK(id == stropt);
std::string id_ret_small(3, ' ');
// truncated
CHECK_THROWS_AS(socket.get(zmq::sockopt::routing_id, zmq::buffer(id_ret_small)),
zmq::error_t);
#endif
}
template<class T>
void check_array_opt(T opt,
zmq::socket_t &sock,
std::string info,
bool set_only = false)
{
const std::string val = "foobar";
INFO("setting " + info);
sock.set(opt, val);
if (set_only)
return;
INFO("getting " + info);
auto s = sock.get(opt);
CHECK(s == val);
}
template<class T>
void check_array_opt_get(T opt, zmq::socket_t &sock, std::string info)
{
INFO("getting " + info);
(void) sock.get(opt);
}
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 0, 0)
template<class T> void check_bin_z85(T opt, zmq::socket_t &sock, std::string str_val)
{
std::vector<uint8_t> bin_val(32);
const auto dret = zmq_z85_decode(bin_val.data(), str_val.c_str());
CHECK(dret != nullptr);
sock.set(opt, str_val);
sock.set(opt, zmq::buffer(bin_val));
auto sv = sock.get(opt);
CHECK(sv == str_val);
auto bv = sock.get(opt, 32);
REQUIRE(bv.size() == bin_val.size());
CHECK(std::memcmp(bv.data(), bin_val.data(), bin_val.size()) == 0);
}
#endif
TEST_CASE("socket check array options", "[socket]")
{
zmq::context_t context;
zmq::socket_t router(context, zmq::socket_type::router);
zmq::socket_t xpub(context, zmq::socket_type::xpub);
zmq::socket_t sub(context, zmq::socket_type::sub);
#ifdef ZMQ_BINDTODEVICE
// requires setting CAP_NET_RAW
//check_array_opt(zmq::sockopt::bindtodevice, router, "bindtodevice");
#endif
#ifdef ZMQ_CONNECT_ROUTING_ID
check_array_opt(zmq::sockopt::connect_routing_id, router, "connect_routing_id",
true);
#endif
#ifdef ZMQ_LAST_ENDPOINT
check_array_opt_get(zmq::sockopt::last_endpoint, router, "last_endpoint");
#endif
#ifdef ZMQ_METADATA
router.set(zmq::sockopt::metadata, zmq::str_buffer("X-foo:bar"));
#endif
#ifdef ZMQ_PLAIN_PASSWORD
check_array_opt(zmq::sockopt::plain_password, router, "plain_password");
#endif
#ifdef ZMQ_PLAIN_USERNAME
check_array_opt(zmq::sockopt::plain_username, router, "plain_username");
#endif
#ifdef ZMQ_ROUTING_ID
check_array_opt(zmq::sockopt::routing_id, router, "routing_id");
#endif
#ifdef ZMQ_SOCKS_PROXY
check_array_opt(zmq::sockopt::socks_proxy, router, "socks_proxy");
#endif
#ifdef ZMQ_SUBSCRIBE
check_array_opt(zmq::sockopt::subscribe, sub, "subscribe", true);
#endif
#ifdef ZMQ_UNSUBSCRIBE
check_array_opt(zmq::sockopt::unsubscribe, sub, "unsubscribe", true);
#endif
#ifdef ZMQ_XPUB_WELCOME_MSG
check_array_opt(zmq::sockopt::xpub_welcome_msg, xpub, "xpub_welcome_msg", true);
#endif
#ifdef ZMQ_ZAP_DOMAIN
check_array_opt(zmq::sockopt::zap_domain, router, "zap_domain");
#endif
// curve
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 0, 0) && defined(ZMQ_HAS_CAPABILITIES)
if (zmq_has("curve") == 1) {
const std::string spk = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
const std::string ssk = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
const std::string cpk = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
const std::string csk = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
zmq::socket_t curve_server(context, zmq::socket_type::router);
curve_server.set(zmq::sockopt::curve_server, true);
CHECK(curve_server.get(zmq::sockopt::curve_server));
check_bin_z85(zmq::sockopt::curve_secretkey, curve_server, ssk);
zmq::socket_t curve_client(context, zmq::socket_type::router);
curve_client.set(zmq::sockopt::curve_server, false);
CHECK_FALSE(curve_client.get(zmq::sockopt::curve_server));
check_bin_z85(zmq::sockopt::curve_serverkey, curve_client, spk);
check_bin_z85(zmq::sockopt::curve_publickey, curve_client, cpk);
check_bin_z85(zmq::sockopt::curve_secretkey, curve_client, csk);
}
#endif
// gssapi
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) && defined(ZMQ_HAS_CAPABILITIES)
if (zmq_has("gssapi") == 1 && false) // TODO enable
{
zmq::socket_t gss_server(context, zmq::socket_type::router);
gss_server.set(zmq::sockopt::gssapi_server, true);
CHECK(gss_server.get(zmq::sockopt::gssapi_server) == 1);
gss_server.set(zmq::sockopt::gssapi_plaintext, false);
CHECK(gss_server.get(zmq::sockopt::gssapi_plaintext) == 0);
check_array_opt(zmq::sockopt::gssapi_principal, gss_server,
"gssapi_principal");
zmq::socket_t gss_client(context, zmq::socket_type::router);
CHECK(gss_client.get(zmq::sockopt::gssapi_server) == 0);
check_array_opt(zmq::sockopt::gssapi_principal, gss_client,
"gssapi_principal");
check_array_opt(zmq::sockopt::gssapi_service_principal, gss_client,
"gssapi_service_principal");
}
#endif
}
template<class T, class Opt>
void check_integral_opt(Opt opt,
zmq::socket_t &sock,
std::string info,
bool set_only = false)
{
const T val = 1;
INFO("setting " + info);
sock.set(opt, val);
if (set_only)
return;
INFO("getting " + info);
auto s = sock.get(opt);
CHECK(s == val);
}
template<class T, class Opt>
void check_integral_opt_get(Opt opt, zmq::socket_t &sock, std::string info)
{
INFO("getting " + info);
(void) sock.get(opt);
}
TEST_CASE("socket check integral options", "[socket]")
{
zmq::context_t context;
zmq::socket_t router(context, zmq::socket_type::router);
zmq::socket_t xpub(context, zmq::socket_type::xpub);
zmq::socket_t req(context, zmq::socket_type::req);
#ifdef ZMQ_STREAM_NOTIFY
zmq::socket_t stream(context, zmq::socket_type::stream);
#endif
#ifdef ZMQ_AFFINITY
check_integral_opt<uint64_t>(zmq::sockopt::affinity, router, "affinity");
#endif
#ifdef ZMQ_BACKLOG
check_integral_opt<int>(zmq::sockopt::backlog, router, "backlog");
#endif
#ifdef ZMQ_CONFLATE
check_integral_opt<int>(zmq::sockopt::conflate, router, "conflate");
#endif
#ifdef ZMQ_CONNECT_TIMEOUT
check_integral_opt<int>(zmq::sockopt::connect_timeout, router,
"connect_timeout");
#endif
#ifdef ZMQ_EVENTS
check_integral_opt_get<int>(zmq::sockopt::events, router, "events");
#endif
#ifdef ZMQ_FD
check_integral_opt_get<zmq::fd_t>(zmq::sockopt::fd, router, "fd");
#endif
#ifdef ZMQ_HANDSHAKE_IVL
check_integral_opt<int>(zmq::sockopt::handshake_ivl, router, "handshake_ivl");
#endif
#ifdef ZMQ_HEARTBEAT_IVL
check_integral_opt<int>(zmq::sockopt::heartbeat_ivl, router, "heartbeat_ivl");
#endif
#ifdef ZMQ_HEARTBEAT_TIMEOUT
check_integral_opt<int>(zmq::sockopt::heartbeat_timeout, router,
"heartbeat_timeout");
#endif
#ifdef ZMQ_HEARTBEAT_TTL
router.set(zmq::sockopt::heartbeat_ttl, 100);
CHECK(router.get(zmq::sockopt::heartbeat_ttl) == 100);
#endif
#ifdef ZMQ_IMMEDIATE
check_integral_opt<int>(zmq::sockopt::immediate, router, "immediate");
#endif
#ifdef ZMQ_INVERT_MATCHING
check_integral_opt<int>(zmq::sockopt::invert_matching, router,
"invert_matching");
#endif
#ifdef ZMQ_IPV6
check_integral_opt<int>(zmq::sockopt::ipv6, router, "ipv6");
#endif
#ifdef ZMQ_LINGER
check_integral_opt<int>(zmq::sockopt::linger, router, "linger");
#endif
#ifdef ZMQ_MAXMSGSIZE
check_integral_opt<int64_t>(zmq::sockopt::maxmsgsize, router, "maxmsgsize");
#endif
#ifdef ZMQ_MECHANISM
check_integral_opt_get<int>(zmq::sockopt::mechanism, router, "mechanism");
#endif
#ifdef ZMQ_MULTICAST_HOPS
check_integral_opt<int>(zmq::sockopt::multicast_hops, router, "multicast_hops");
#endif
#ifdef ZMQ_MULTICAST_LOOP
check_integral_opt<int>(zmq::sockopt::multicast_loop, router, "multicast_loop");
#endif
#ifdef ZMQ_MULTICAST_MAXTPDU
check_integral_opt<int>(zmq::sockopt::multicast_maxtpdu, router,
"multicast_maxtpdu");
#endif
#ifdef ZMQ_PLAIN_SERVER
check_integral_opt<int>(zmq::sockopt::plain_server, router, "plain_server");
#endif
#ifdef ZMQ_USE_FD
check_integral_opt<int>(zmq::sockopt::use_fd, router, "use_fd");
#endif
#ifdef ZMQ_PROBE_ROUTER
check_integral_opt<int>(zmq::sockopt::probe_router, router, "probe_router",
true);
#endif
#ifdef ZMQ_RATE
check_integral_opt<int>(zmq::sockopt::rate, router, "rate");
#endif
#ifdef ZMQ_RCVBUF
check_integral_opt<int>(zmq::sockopt::rcvbuf, router, "rcvbuf");
#endif
#ifdef ZMQ_RCVHWM
check_integral_opt<int>(zmq::sockopt::rcvhwm, router, "rcvhwm");
#endif
#ifdef ZMQ_RCVMORE
check_integral_opt_get<int>(zmq::sockopt::rcvmore, router, "rcvmore");
#endif
#ifdef ZMQ_RCVTIMEO
check_integral_opt<int>(zmq::sockopt::rcvtimeo, router, "rcvtimeo");
#endif
#ifdef ZMQ_RECONNECT_IVL
check_integral_opt<int>(zmq::sockopt::reconnect_ivl, router, "reconnect_ivl");
#endif
#ifdef ZMQ_RECONNECT_IVL_MAX
check_integral_opt<int>(zmq::sockopt::reconnect_ivl_max, router,
"reconnect_ivl_max");
#endif
#ifdef ZMQ_RECOVERY_IVL
check_integral_opt<int>(zmq::sockopt::recovery_ivl, router, "recovery_ivl");
#endif
#ifdef ZMQ_REQ_CORRELATE
check_integral_opt<int>(zmq::sockopt::req_correlate, req, "req_correlate", true);
#endif
#ifdef ZMQ_REQ_RELAXED
check_integral_opt<int>(zmq::sockopt::req_relaxed, req, "req_relaxed", true);
#endif
#ifdef ZMQ_ROUTER_HANDOVER
check_integral_opt<int>(zmq::sockopt::router_handover, router, "router_handover",
true);
#endif
#ifdef ZMQ_ROUTER_MANDATORY
check_integral_opt<int>(zmq::sockopt::router_mandatory, router,
"router_mandatory", true);
#endif
#ifdef ZMQ_ROUTER_NOTIFY
check_integral_opt<int>(zmq::sockopt::router_notify, router, "router_notify");
#endif
#ifdef ZMQ_SNDBUF
check_integral_opt<int>(zmq::sockopt::sndbuf, router, "sndbuf");
#endif
#ifdef ZMQ_SNDHWM
check_integral_opt<int>(zmq::sockopt::sndhwm, router, "sndhwm");
#endif
#ifdef ZMQ_SNDTIMEO
check_integral_opt<int>(zmq::sockopt::sndtimeo, router, "sndtimeo");
#endif
#ifdef ZMQ_STREAM_NOTIFY
check_integral_opt<int>(zmq::sockopt::stream_notify, stream, "stream_notify",
true);
#endif
#ifdef ZMQ_TCP_KEEPALIVE
check_integral_opt<int>(zmq::sockopt::tcp_keepalive, router, "tcp_keepalive");
#endif
#ifdef ZMQ_TCP_KEEPALIVE_CNT
check_integral_opt<int>(zmq::sockopt::tcp_keepalive_cnt, router,
"tcp_keepalive_cnt");
#endif
#ifdef ZMQ_TCP_KEEPALIVE_IDLE
check_integral_opt<int>(zmq::sockopt::tcp_keepalive_idle, router,
"tcp_keepalive_idle");
#endif
#ifdef ZMQ_TCP_KEEPALIVE_INTVL
check_integral_opt<int>(zmq::sockopt::tcp_keepalive_intvl, router,
"tcp_keepalive_intvl");
#endif
#ifdef ZMQ_TCP_MAXRT
check_integral_opt<int>(zmq::sockopt::tcp_maxrt, router, "tcp_maxrt");
#endif
#ifdef ZMQ_THREAD_SAFE
check_integral_opt_get<bool>(zmq::sockopt::thread_safe, router, "thread_safe");
#endif
#ifdef ZMQ_TOS
check_integral_opt<int>(zmq::sockopt::tos, router, "tos");
#endif
#ifdef ZMQ_TYPE
check_integral_opt_get<int>(zmq::sockopt::type, router, "type");
#ifdef ZMQ_CPP11
check_integral_opt_get<zmq::socket_type>(zmq::sockopt::socket_type, router, "socket_type");
#endif // ZMQ_CPP11
#endif // ZMQ_TYPE
#ifdef ZMQ_HAVE_VMCI
#ifdef ZMQ_VMCI_BUFFER_SIZE
check_integral_opt<uint64_t>(zmq::sockopt::vmci_buffer_size, router,
"vmci_buffer_size");
#endif
#ifdef ZMQ_VMCI_BUFFER_MIN_SIZE
check_integral_opt<uint64_t>(zmq::sockopt::vmci_buffer_min_size, router,
"vmci_buffer_min_size");
#endif
#ifdef ZMQ_VMCI_BUFFER_MAX_SIZE
check_integral_opt<uint64_t>(zmq::sockopt::vmci_buffer_max_size, router,
"vmci_buffer_max_size");
#endif
#ifdef ZMQ_VMCI_CONNECT_TIMEOUT
check_integral_opt<int>(zmq::sockopt::vmci_connect_timeout, router,
"vmci_connect_timeout");
#endif
#endif
#ifdef ZMQ_XPUB_VERBOSE
check_integral_opt<int>(zmq::sockopt::xpub_verbose, xpub, "xpub_verbose", true);
#endif
#ifdef ZMQ_XPUB_VERBOSER
check_integral_opt<int>(zmq::sockopt::xpub_verboser, xpub, "xpub_verboser",
true);
#endif
#ifdef ZMQ_XPUB_MANUAL
check_integral_opt<int>(zmq::sockopt::xpub_manual, xpub, "xpub_manual", true);
#endif
#ifdef ZMQ_XPUB_NODROP
check_integral_opt<int>(zmq::sockopt::xpub_nodrop, xpub, "xpub_nodrop", true);
#endif
#ifdef ZMQ_ZAP_ENFORCE_DOMAIN
check_integral_opt<int>(zmq::sockopt::zap_enforce_domain, router,
"zap_enforce_domain");
#endif
}
#endif
TEST_CASE("socket flags", "[socket]")
{
CHECK((zmq::recv_flags::dontwait | zmq::recv_flags::none)
== static_cast<zmq::recv_flags>(ZMQ_DONTWAIT | 0));
CHECK((zmq::recv_flags::dontwait & zmq::recv_flags::none)
== static_cast<zmq::recv_flags>(ZMQ_DONTWAIT & 0));
CHECK((zmq::recv_flags::dontwait ^ zmq::recv_flags::none)
== static_cast<zmq::recv_flags>(ZMQ_DONTWAIT ^ 0));
CHECK(~zmq::recv_flags::dontwait == static_cast<zmq::recv_flags>(~ZMQ_DONTWAIT));
CHECK((zmq::send_flags::dontwait | zmq::send_flags::sndmore)
== static_cast<zmq::send_flags>(ZMQ_DONTWAIT | ZMQ_SNDMORE));
CHECK((zmq::send_flags::dontwait & zmq::send_flags::sndmore)
== static_cast<zmq::send_flags>(ZMQ_DONTWAIT & ZMQ_SNDMORE));
CHECK((zmq::send_flags::dontwait ^ zmq::send_flags::sndmore)
== static_cast<zmq::send_flags>(ZMQ_DONTWAIT ^ ZMQ_SNDMORE));
CHECK(~zmq::send_flags::dontwait == static_cast<zmq::send_flags>(~ZMQ_DONTWAIT));
}
TEST_CASE("socket readme example", "[socket]")
{
zmq::context_t ctx;
zmq::socket_t sock(ctx, zmq::socket_type::push);
sock.bind("inproc://test");
sock.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}
#endif
TEST_CASE("socket sends and receives const buffer", "[socket]")
{
zmq::context_t context;
zmq::socket_t sender(context, ZMQ_PAIR);
zmq::socket_t receiver(context, ZMQ_PAIR);
receiver.bind("inproc://test");
sender.connect("inproc://test");
const char *str = "Hi";
#ifdef ZMQ_CPP11
CHECK(2 == *sender.send(zmq::buffer(str, 2)));
char buf[2];
const auto res = receiver.recv(zmq::buffer(buf));
CHECK(res);
CHECK(!res->truncated());
CHECK(2 == res->size);
#else
CHECK(2 == sender.send(str, 2));
char buf[2];
CHECK(2 == receiver.recv(buf, 2));
#endif
CHECK(0 == memcmp(buf, str, 2));
}
#ifdef ZMQ_CPP11
TEST_CASE("socket send none sndmore", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::router);
s.bind("inproc://test");
std::vector<char> buf(4);
auto res = s.send(zmq::buffer(buf), zmq::send_flags::sndmore);
CHECK(res);
CHECK(*res == buf.size());
res = s.send(zmq::buffer(buf));
CHECK(res);
CHECK(*res == buf.size());
}
TEST_CASE("socket send dontwait", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::push);
s.bind("inproc://test");
std::vector<char> buf(4);
auto res = s.send(zmq::buffer(buf), zmq::send_flags::dontwait);
CHECK(!res);
res =
s.send(zmq::buffer(buf), zmq::send_flags::dontwait | zmq::send_flags::sndmore);
CHECK(!res);
zmq::message_t msg;
auto resm = s.send(msg, zmq::send_flags::dontwait);
CHECK(!resm);
CHECK(msg.size() == 0);
}
TEST_CASE("socket send exception", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::pull);
s.bind("inproc://test");
std::vector<char> buf(4);
CHECK_THROWS_AS(s.send(zmq::buffer(buf)), zmq::error_t);
}
TEST_CASE("socket recv none", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::pair);
zmq::socket_t s2(context, zmq::socket_type::pair);
s2.bind("inproc://test");
s.connect("inproc://test");
std::vector<char> sbuf(4);
const auto res_send = s2.send(zmq::buffer(sbuf));
CHECK(res_send);
CHECK(res_send.has_value());
std::vector<char> buf(2);
const auto res = s.recv(zmq::buffer(buf));
CHECK(res.has_value());
CHECK(res->truncated());
CHECK(res->untruncated_size == sbuf.size());
CHECK(res->size == buf.size());
const auto res_send2 = s2.send(zmq::buffer(sbuf));
CHECK(res_send2.has_value());
std::vector<char> buf2(10);
const auto res2 = s.recv(zmq::buffer(buf2));
CHECK(res2.has_value());
CHECK(!res2->truncated());
CHECK(res2->untruncated_size == sbuf.size());
CHECK(res2->size == sbuf.size());
}
TEST_CASE("socket send recv message_t", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::pair);
zmq::socket_t s2(context, zmq::socket_type::pair);
s2.bind("inproc://test");
s.connect("inproc://test");
zmq::message_t smsg(10);
const auto res_send = s2.send(smsg, zmq::send_flags::none);
CHECK(res_send);
CHECK(*res_send == 10);
CHECK(smsg.size() == 0);
zmq::message_t rmsg;
const auto res = s.recv(rmsg);
CHECK(res);
CHECK(*res == 10);
CHECK(res.value() == 10);
CHECK(rmsg.size() == *res);
}
TEST_CASE("socket send recv message_t by pointer", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::pair);
zmq::socket_t s2(context, zmq::socket_type::pair);
s2.bind("inproc://test");
s.connect("inproc://test");
zmq::message_t smsg(size_t{10});
const auto res_send = s2.send(smsg, zmq::send_flags::none);
CHECK(res_send);
CHECK(*res_send == 10);
CHECK(smsg.size() == 0);
zmq::message_t rmsg;
const bool res = s.recv(&rmsg);
CHECK(res);
}
TEST_CASE("socket recv dontwait", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::pull);
s.bind("inproc://test");
std::vector<char> buf(4);
constexpr auto flags = zmq::recv_flags::none | zmq::recv_flags::dontwait;
auto res = s.recv(zmq::buffer(buf), flags);
CHECK(!res);
zmq::message_t msg;
auto resm = s.recv(msg, flags);
CHECK(!resm);
CHECK_THROWS_AS(resm.value(), std::exception);
CHECK(msg.size() == 0);
}
TEST_CASE("socket recv exception", "[socket]")
{
zmq::context_t context;
zmq::socket_t s(context, zmq::socket_type::push);
s.bind("inproc://test");
std::vector<char> buf(4);
CHECK_THROWS_AS(s.recv(zmq::buffer(buf)), zmq::error_t);
}
TEST_CASE("socket proxy", "[socket]")
{
zmq::context_t context;
zmq::socket_t front(context, ZMQ_ROUTER);
zmq::socket_t back(context, ZMQ_ROUTER);
zmq::socket_t capture(context, ZMQ_DEALER);
front.bind("inproc://test1");
back.bind("inproc://test2");
capture.bind("inproc://test3");
auto f = std::async(std::launch::async, [&]() {
auto s1 = std::move(front);
auto s2 = std::move(back);
auto s3 = std::move(capture);
try {
zmq::proxy(s1, s2, zmq::socket_ref(s3));
}
catch (const zmq::error_t &e) {
return e.num() == ETERM;
}
return false;
});
context.close();
CHECK(f.get());
}
TEST_CASE("socket proxy steerable", "[socket]")
{
zmq::context_t context;
zmq::socket_t front(context, ZMQ_ROUTER);
zmq::socket_t back(context, ZMQ_ROUTER);
zmq::socket_t control(context, ZMQ_SUB);
front.bind("inproc://test1");
back.bind("inproc://test2");
control.connect("inproc://test3");
auto f = std::async(std::launch::async, [&]() {
auto s1 = std::move(front);
auto s2 = std::move(back);
auto s3 = std::move(control);
try {
zmq::proxy_steerable(s1, s2, zmq::socket_ref(), s3);
}
catch (const zmq::error_t &e) {
return e.num() == ETERM;
}
return false;
});
context.close();
CHECK(f.get());
}
#endif

View File

@@ -0,0 +1,118 @@
#include <catch2/catch.hpp>
#include <zmq.hpp>
#ifdef ZMQ_CPP11
#ifdef ZMQ_CPP17
static_assert(std::is_nothrow_swappable_v<zmq::socket_ref>);
#endif
static_assert(sizeof(zmq::socket_ref) == sizeof(void *), "size mismatch");
static_assert(alignof(zmq::socket_ref) == alignof(void *), "alignment mismatch");
static_assert(ZMQ_IS_TRIVIALLY_COPYABLE(zmq::socket_ref),
"needs to be trivially copyable");
TEST_CASE("socket_ref default init", "[socket_ref]")
{
zmq::socket_ref sr;
CHECK(!sr);
CHECK(sr == nullptr);
CHECK(nullptr == sr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref create from nullptr", "[socket_ref]")
{
zmq::socket_ref sr = nullptr;
CHECK(sr == nullptr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref create from handle", "[socket_ref]")
{
void *np = nullptr;
zmq::socket_ref sr{zmq::from_handle, np};
CHECK(sr == nullptr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref compare", "[socket_ref]")
{
zmq::socket_ref sr1;
zmq::socket_ref sr2;
CHECK(sr1 == sr2);
CHECK(!(sr1 != sr2));
}
TEST_CASE("socket_ref compare from socket_t", "[socket_ref]")
{
zmq::context_t context;
zmq::socket_t s1(context, zmq::socket_type::router);
zmq::socket_t s2(context, zmq::socket_type::dealer);
zmq::socket_ref sr1 = s1;
zmq::socket_ref sr2 = s2;
CHECK(sr1);
CHECK(sr2);
CHECK(sr1 == s1);
CHECK(sr2 == s2);
CHECK(sr1.handle() == s1.handle());
CHECK(sr1 != sr2);
CHECK(sr1.handle() != sr2.handle());
CHECK(sr1 != nullptr);
CHECK(nullptr != sr1);
CHECK(sr2 != nullptr);
const bool comp1 = (sr1 < sr2) != (sr1 >= sr2);
CHECK(comp1);
const bool comp2 = (sr1 > sr2) != (sr1 <= sr2);
CHECK(comp2);
std::hash<zmq::socket_ref> hash;
CHECK(hash(sr1) != hash(sr2));
CHECK(hash(sr1) == hash(s1));
}
TEST_CASE("socket_ref assignment", "[socket_ref]")
{
zmq::context_t context;
zmq::socket_t s1(context, zmq::socket_type::router);
zmq::socket_t s2(context, zmq::socket_type::dealer);
zmq::socket_ref sr1 = s1;
zmq::socket_ref sr2 = s2;
sr1 = s2;
CHECK(sr1 == sr2);
CHECK(sr1.handle() == sr2.handle());
sr1 = std::move(sr2);
CHECK(sr1 == sr2);
CHECK(sr1.handle() == sr2.handle());
sr2 = nullptr;
CHECK(sr1 != sr2);
sr1 = nullptr;
CHECK(sr1 == sr2);
}
TEST_CASE("socket_ref swap", "[socket_ref]")
{
zmq::socket_ref sr1;
zmq::socket_ref sr2;
using std::swap;
swap(sr1, sr2);
}
TEST_CASE("socket_ref type punning", "[socket_ref]")
{
struct SVP
{
void *p;
} svp;
struct SSR
{
zmq::socket_ref sr;
} ssr;
zmq::context_t context;
zmq::socket_t socket(context, zmq::socket_type::router);
CHECK(socket.handle() != nullptr);
svp.p = socket.handle();
// static_cast to silence incorrect warning
std::memcpy(static_cast<void *>(&ssr), &svp, sizeof(ssr));
CHECK(ssr.sr == socket);
}
#endif

View File

@@ -0,0 +1,54 @@
#pragma once
#include <catch2/catch.hpp>
#include <zmq.hpp>
#if defined(ZMQ_CPP11)
inline std::string bind_ip4_loopback(zmq::socket_t &socket)
{
socket.bind("tcp://127.0.0.1:*");
std::string endpoint(100, ' ');
endpoint.resize(socket.get(zmq::sockopt::last_endpoint, zmq::buffer(endpoint)));
return endpoint;
}
struct common_server_client_setup
{
common_server_client_setup(bool initialize = true)
{
if (initialize)
init();
}
void init()
{
endpoint = bind_ip4_loopback(server);
REQUIRE_NOTHROW(client.connect(endpoint));
}
zmq::context_t context;
zmq::socket_t server{context, zmq::socket_type::pair};
zmq::socket_t client{context, zmq::socket_type::pair};
std::string endpoint;
};
#endif
#define CHECK_THROWS_ZMQ_ERROR(ecode, expr) \
do { \
try { \
expr; \
CHECK(false); \
} \
catch (const zmq::error_t &ze) { \
INFO(std::string("Unexpected error code: ") + ze.what()); \
CHECK(ze.num() == ecode); \
} \
catch (const std::exception &ex) { \
INFO(std::string("Unexpected exception: ") + ex.what()); \
CHECK(false); \
} \
catch (...) { \
CHECK(false); \
} \
} while (false)

View File

@@ -0,0 +1,110 @@
#include <catch2/catch.hpp>
#include <zmq.hpp>
#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL)
namespace test_ns
{
struct T_nr
{
};
struct T_mr
{
void *begin() const noexcept { return nullptr; }
void *end() const noexcept { return nullptr; }
};
struct T_fr
{
};
inline void *begin(const T_fr &) noexcept
{
return nullptr;
}
inline void *end(const T_fr &) noexcept
{
return nullptr;
}
struct T_mfr
{
void *begin() const noexcept { return nullptr; }
void *end() const noexcept { return nullptr; }
};
inline void *begin(const T_mfr &) noexcept
{
return nullptr;
}
inline void *end(const T_mfr &) noexcept
{
return nullptr;
}
// types with associated namespace std
struct T_assoc_ns_nr : std::exception
{
};
struct T_assoc_ns_mr : std::exception
{
void *begin() const noexcept { return nullptr; }
void *end() const noexcept { return nullptr; }
};
struct T_assoc_ns_fr : std::exception
{
};
inline void *begin(const T_assoc_ns_fr &) noexcept
{
return nullptr;
}
inline void *end(const T_assoc_ns_fr &) noexcept
{
return nullptr;
}
struct T_assoc_ns_mfr : std::exception
{
void *begin() const noexcept { return nullptr; }
void *end() const noexcept { return nullptr; }
};
inline void *begin(const T_assoc_ns_mfr &) noexcept
{
return nullptr;
}
inline void *end(const T_assoc_ns_mfr &) noexcept
{
return nullptr;
}
} // namespace test_ns
TEST_CASE("range SFINAE", "[utilities]")
{
CHECK(!zmq::detail::is_range<int>::value);
CHECK(zmq::detail::is_range<std::string>::value);
CHECK(zmq::detail::is_range<std::string &>::value);
CHECK(zmq::detail::is_range<const std::string &>::value);
CHECK(zmq::detail::is_range<decltype("hello")>::value);
CHECK(zmq::detail::is_range<std::initializer_list<int>>::value);
CHECK(!zmq::detail::is_range<test_ns::T_nr>::value);
CHECK(zmq::detail::is_range<test_ns::T_mr>::value);
CHECK(zmq::detail::is_range<test_ns::T_fr>::value);
CHECK(zmq::detail::is_range<test_ns::T_mfr>::value);
CHECK(!zmq::detail::is_range<test_ns::T_assoc_ns_nr>::value);
CHECK(zmq::detail::is_range<test_ns::T_assoc_ns_mr>::value);
CHECK(zmq::detail::is_range<test_ns::T_assoc_ns_fr>::value);
CHECK(zmq::detail::is_range<test_ns::T_assoc_ns_mfr>::value);
}
#endif