Squashed 'libs/libzmq/' content from commit 2b2fb9c7

git-subtree-dir: libs/libzmq
git-subtree-split: 2b2fb9c7082dbc16c1323b97040a4edcfa2b997b
This commit is contained in:
Henry Winkel
2022-10-22 14:31:55 +02:00
commit e988fee167
1118 changed files with 149390 additions and 0 deletions

View File

@@ -0,0 +1,131 @@
/*
Copyright (c) 2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if __cplusplus >= 201103L
#include "radix_tree.hpp"
#include "trie.hpp"
#include <chrono>
#include <cstddef>
#include <cstdio>
#include <random>
#include <ratio>
#include <vector>
const std::size_t nkeys = 10000;
const std::size_t nqueries = 1000000;
const std::size_t warmup_runs = 10;
const std::size_t samples = 10;
const std::size_t key_length = 20;
const char *chars = "abcdefghijklmnopqrstuvwxyz0123456789";
const int chars_len = 36;
template <class T>
void benchmark_lookup (T &subscriptions_,
std::vector<unsigned char *> &queries_)
{
using namespace std::chrono;
std::vector<duration<long, std::nano> > samples_vec;
samples_vec.reserve (samples);
for (std::size_t run = 0; run < warmup_runs; ++run) {
for (auto &query : queries_)
subscriptions_.check (query, key_length);
}
for (std::size_t run = 0; run < samples; ++run) {
duration<long, std::nano> interval (0);
for (auto &query : queries_) {
auto start = steady_clock::now ();
subscriptions_.check (query, key_length);
auto end = steady_clock::now ();
interval += end - start;
}
samples_vec.push_back (interval / queries_.size ());
}
std::size_t sum = 0;
for (const auto &sample : samples_vec)
sum += sample.count ();
std::printf ("Average lookup time = %.1lf ns\n",
static_cast<double> (sum) / samples);
}
int main ()
{
// Generate input set.
std::minstd_rand rng (123456789);
std::vector<unsigned char *> input_set;
std::vector<unsigned char *> queries;
input_set.reserve (nkeys);
queries.reserve (nqueries);
for (std::size_t i = 0; i < nkeys; ++i) {
unsigned char *key = new unsigned char[key_length];
for (std::size_t j = 0; j < key_length; j++)
key[j] = static_cast<unsigned char> (chars[rng () % chars_len]);
input_set.emplace_back (key);
}
for (std::size_t i = 0; i < nqueries; ++i)
queries.push_back (input_set[rng () % nkeys]);
// Initialize both data structures.
//
// Keeping initialization out of the benchmarking function helps
// heaptrack detect peak memory consumption of the radix tree.
zmq::trie_t trie;
zmq::radix_tree_t radix_tree;
for (auto &key : input_set) {
trie.add (key, key_length);
radix_tree.add (key, key_length);
}
// Create a benchmark.
std::printf ("keys = %llu, queries = %llu, key size = %llu\n",
static_cast<unsigned long long> (nkeys),
static_cast<unsigned long long> (nqueries),
static_cast<unsigned long long> (key_length));
std::puts ("[trie]");
benchmark_lookup (trie, queries);
std::puts ("[radix_tree]");
benchmark_lookup (radix_tree, queries);
for (auto &op : input_set)
delete[] op;
}
#else
int main ()
{
}
#endif

147
perf/generate_csv.sh Executable file
View File

@@ -0,0 +1,147 @@
#!/bin/bash
#
# This script assumes that 2 machines are used to generate performance results.
# First machine is assumed to be the one where this script runs.
# Second machine is the "REMOTE_IP_SSH" machine; we assume to have passwordless SSH access.
#
# Usage example:
# export REMOTE_IP_SSH=10.0.0.1
# export LOCAL_TEST_ENDPOINT="tcp://192.168.1.1:1234"
# export REMOTE_TEST_ENDPOINT="tcp://192.168.1.2:1234"
# export REMOTE_LIBZMQ_PATH="/home/fmontorsi/libzmq/perf"
# ./generate_csv.sh
#
set -u
# configurable values (via environment variables):
REMOTE_IP_SSH=${REMOTE_IP_SSH:-127.0.0.1}
REMOTE_LIBZMQ_PATH=${REMOTE_LIBZMQ_PATH:-/root/libzmq/perf}
LOCAL_TEST_ENDPOINT=${LOCAL_TEST_ENDPOINT:-tcp://192.168.1.1:1234}
REMOTE_TEST_ENDPOINT=${REMOTE_TEST_ENDPOINT:-tcp://192.168.1.2:1234}
# constant values:
MESSAGE_SIZE_LIST="8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 131072"
OUTPUT_DIR="results"
OUTPUT_FILE_PREFIX="results.txt"
OUTPUT_FILE_CSV_PREFIX="results.csv"
# utility functions:
function verify_ssh()
{
ssh $REMOTE_IP_SSH "ls /" >/dev/null
if [ $? -ne 0 ]; then
echo "Cannot connect via SSH passwordless to the REMOTE_IP_SSH $REMOTE_IP_SSH. Please fix the problem and retry."
exit 2
fi
ssh $REMOTE_IP_SSH "ls $REMOTE_LIBZMQ_PATH" >/dev/null
if [ $? -ne 0 ]; then
echo "The folder $REMOTE_LIBZMQ_PATH is not valid. Please fix the problem and retry."
exit 2
fi
echo "SSH connection to the remote $REMOTE_IP_SSH is working fine."
}
function run_remote_perf_util()
{
local MESSAGE_SIZE_BYTES="$1"
local REMOTE_PERF_UTIL="$2"
local NUM_MESSAGES="$3"
echo "Launching on $REMOTE_IP_SSH the utility [$REMOTE_PERF_UTIL] for messages ${MESSAGE_SIZE_BYTES}B long"
ssh $REMOTE_IP_SSH "$REMOTE_LIBZMQ_PATH/$REMOTE_PERF_UTIL $TEST_ENDPOINT $MESSAGE_SIZE_BYTES $NUM_MESSAGES" &
if [ $? -ne 0 ]; then
echo "Failed to launch remote perf util."
exit 2
fi
}
function generate_output_file()
{
local LOCAL_PERF_UTIL="$1" # must be the utility generating the TXT output
local REMOTE_PERF_UTIL="$2"
local OUTPUT_FILE_PREFIX="$3"
local NUM_MESSAGES="$4"
local CSV_HEADER_LINE="$5"
# derived values:
local OUTPUT_FILE_TXT="${OUTPUT_DIR}/${OUTPUT_FILE_PREFIX}.txt" # useful just for human-friendly debugging
local OUTPUT_FILE_CSV="${OUTPUT_DIR}/${OUTPUT_FILE_PREFIX}.csv" # actually used to later produce graphs
local MESSAGE_SIZE_ARRAY=($MESSAGE_SIZE_LIST)
echo "Killing still-running ZMQ performance utils, if any"
pkill $LOCAL_PERF_UTIL # in case it's running from a previous test
if [ ! -z "$REMOTE_PERF_UTIL" ]; then
ssh $REMOTE_IP_SSH "pkill $REMOTE_PERF_UTIL" # in case it's running from a previous test
fi
echo "Resetting output file $OUTPUT_FILE_TXT and $OUTPUT_FILE_CSV"
mkdir -p ${OUTPUT_DIR}
> $OUTPUT_FILE_TXT
echo "$CSV_HEADER_LINE" > $OUTPUT_FILE_CSV
for MESSAGE_SIZE in ${MESSAGE_SIZE_ARRAY[@]}; do
echo "Launching locally the utility [$LOCAL_PERF_UTIL] for messages ${MESSAGE_SIZE}B long"
./$LOCAL_PERF_UTIL $TEST_ENDPOINT $MESSAGE_SIZE $NUM_MESSAGES >${OUTPUT_FILE_TXT}-${MESSAGE_SIZE} &
if [ ! -z "$REMOTE_PERF_UTIL" ]; then
run_remote_perf_util $MESSAGE_SIZE $REMOTE_PERF_UTIL $NUM_MESSAGES
fi
wait
# produce the complete human-readable output file:
cat ${OUTPUT_FILE_TXT}-${MESSAGE_SIZE} >>${OUTPUT_FILE_TXT}
# produce a machine-friendly file for later plotting:
local DATALINE="$(cat ${OUTPUT_FILE_TXT}-${MESSAGE_SIZE} | grep -o '[0-9.]*' | tr '\n' ',')"
echo ${DATALINE::-1} >>$OUTPUT_FILE_CSV
rm -f ${OUTPUT_FILE_TXT}-${MESSAGE_SIZE}
done
echo "All measurements completed and saved into $OUTPUT_FILE_TXT and $OUTPUT_FILE_CSV"
}
# main:
verify_ssh
THROUGHPUT_CSV_HEADER_LINE="# message_size,message_count,PPS[msg/s],throughput[Mb/s]"
# PUSH/PULL TCP throughput CSV file:
TEST_ENDPOINT="$LOCAL_TEST_ENDPOINT"
generate_output_file "local_thr" "remote_thr" \
"pushpull_tcp_thr_results" \
"1000000" \
"$THROUGHPUT_CSV_HEADER_LINE"
# PUSH/PULL INPROC throughput CSV file:
# NOTE: in this case there is no remote utility to run and no ENDPOINT to provide:
TEST_ENDPOINT="" # inproc does not require any endpoint
generate_output_file "inproc_thr" "" \
"pushpull_inproc_thr_results" \
"10000000" \
"$THROUGHPUT_CSV_HEADER_LINE"
# PUB/SUB proxy INPROC throughput CSV file:
# NOTE: in this case there is no remote utility to run and no ENDPOINT to provide:
TEST_ENDPOINT="" # inproc does not require any endpoint
generate_output_file "proxy_thr" "" \
"pubsubproxy_inproc_thr_results" \
"10000000" \
"$THROUGHPUT_CSV_HEADER_LINE"
# REQ/REP TCP latency CSV file:
# NOTE: in this case it's the remote_lat utility that prints out the data, so we swap the local/remote arguments to the bash func:
TEST_ENDPOINT="$REMOTE_TEST_ENDPOINT"
generate_output_file "remote_lat" "local_lat" \
"reqrep_tcp_lat_results" \
"10000" \
"# message_size,message_count,latency[us]"

73
perf/generate_graphs.py Executable file
View File

@@ -0,0 +1,73 @@
#!/usr/bin/python3
#
# This script assumes that the set of CSV files produced by "generate_csv.sh" is provided as input
# and that locally there is the "results" folder.
#
# results for TCP:
INPUT_FILE_PUSHPULL_TCP_THROUGHPUT="results/pushpull_tcp_thr_results.csv"
INPUT_FILE_REQREP_TCP_LATENCY="results/reqrep_tcp_lat_results.csv"
TCP_LINK_GPBS=100
# results for INPROC:
INPUT_FILE_PUSHPULL_INPROC_THROUGHPUT="results/pushpull_inproc_thr_results.csv"
INPUT_FILE_PUBSUBPROXY_INPROC_THROUGHPUT="results/pubsubproxy_inproc_thr_results.csv"
# dependencies
#
# pip3 install matplotlib
#
import matplotlib.pyplot as plt
import numpy as np
# functions
def plot_throughput(csv_filename, title, is_tcp=False):
message_size_bytes, message_count, pps, mbps = np.loadtxt(csv_filename, delimiter=',', unpack=True)
fig, ax1 = plt.subplots()
# PPS axis
color = 'tab:red'
ax1.set_xlabel('Message size [B]')
ax1.set_ylabel('PPS [Mmsg/s]', color=color)
ax1.semilogx(message_size_bytes, pps / 1e6, label='PPS [Mmsg/s]', marker='x', color=color)
ax1.tick_params(axis='y', labelcolor=color)
# GBPS axis
color = 'tab:blue'
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
ax2.set_ylabel('Throughput [Gb/s]', color=color)
ax2.semilogx(message_size_bytes, mbps / 1e3, label='Throughput [Gb/s]', marker='o')
if is_tcp:
ax2.set_yticks(np.arange(0, TCP_LINK_GPBS + 1, TCP_LINK_GPBS/10))
ax2.tick_params(axis='y', labelcolor=color)
ax2.grid(True)
plt.title(title)
fig.tight_layout() # otherwise the right y-label is slightly clippe
plt.savefig(csv_filename.replace('.csv', '.png'))
plt.show()
def plot_latency(csv_filename, title):
message_size_bytes, message_count, lat = np.loadtxt(csv_filename, delimiter=',', unpack=True)
plt.semilogx(message_size_bytes, lat, label='Latency [us]', marker='o')
plt.xlabel('Message size [B]')
plt.ylabel('Latency [us]')
plt.grid(True)
plt.title(title)
plt.savefig(csv_filename.replace('.csv', '.png'))
plt.show()
# main
plot_throughput(INPUT_FILE_PUSHPULL_TCP_THROUGHPUT, 'ZeroMQ PUSH/PULL socket throughput, TCP transport', is_tcp=True)
plot_throughput(INPUT_FILE_PUSHPULL_INPROC_THROUGHPUT, 'ZeroMQ PUSH/PULL socket throughput, INPROC transport')
plot_throughput(INPUT_FILE_PUBSUBPROXY_INPROC_THROUGHPUT, 'ZeroMQ PUB/SUB PROXY socket throughput, INPROC transport')
plot_latency(INPUT_FILE_REQREP_TCP_LATENCY, 'ZeroMQ REQ/REP socket latency, TCP transport')

238
perf/inproc_lat.cpp Normal file
View File

@@ -0,0 +1,238 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#endif
static size_t message_size;
static int roundtrip_count;
#if defined ZMQ_HAVE_WINDOWS
static unsigned int __stdcall worker (void *ctx_)
#else
static void *worker (void *ctx_)
#endif
{
void *s;
int rc;
int i;
zmq_msg_t msg;
s = zmq_socket (ctx_, ZMQ_REP);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_connect (s, "inproc://lat_test");
if (rc != 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
exit (1);
}
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
exit (1);
}
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
exit (1);
}
#if defined ZMQ_HAVE_WINDOWS
return 0;
#else
return NULL;
#endif
}
int main (int argc, char *argv[])
{
#if defined ZMQ_HAVE_WINDOWS
HANDLE local_thread;
#else
pthread_t local_thread;
#endif
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
void *watch;
unsigned long elapsed;
double latency;
if (argc != 3) {
printf ("usage: inproc_lat <message-size> <roundtrip-count>\n");
return 1;
}
message_size = atoi (argv[1]);
roundtrip_count = atoi (argv[2]);
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_REQ);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_bind (s, "inproc://lat_test");
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
#if defined ZMQ_HAVE_WINDOWS
local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
if (local_thread == 0) {
printf ("error in _beginthreadex\n");
return -1;
}
#else
rc = pthread_create (&local_thread, NULL, worker, ctx);
if (rc != 0) {
printf ("error in pthread_create: %s\n", zmq_strerror (rc));
return -1;
}
#endif
rc = zmq_msg_init_size (&msg, message_size);
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
return -1;
}
memset (zmq_msg_data (&msg), 0, message_size);
printf ("message size: %d [B]\n", (int) message_size);
printf ("roundtrip count: %d\n", (int) roundtrip_count);
watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
}
elapsed = zmq_stopwatch_stop (watch);
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
latency = (double) elapsed / (roundtrip_count * 2);
#if defined ZMQ_HAVE_WINDOWS
DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
if (rc2 == WAIT_FAILED) {
printf ("error in WaitForSingleObject\n");
return -1;
}
BOOL rc3 = CloseHandle (local_thread);
if (rc3 == 0) {
printf ("error in CloseHandle\n");
return -1;
}
#else
rc = pthread_join (local_thread, NULL);
if (rc != 0) {
printf ("error in pthread_join: %s\n", zmq_strerror (rc));
return -1;
}
#endif
printf ("average latency: %.3f [us]\n", (double) latency);
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}

247
perf/inproc_thr.cpp Normal file
View File

@@ -0,0 +1,247 @@
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#endif
static int message_count;
static size_t message_size;
#if defined ZMQ_HAVE_WINDOWS
static unsigned int __stdcall worker (void *ctx_)
#else
static void *worker (void *ctx_)
#endif
{
void *s;
int rc;
int i;
zmq_msg_t msg;
s = zmq_socket (ctx_, ZMQ_PUSH);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_connect (s, "inproc://thr_test");
if (rc != 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
exit (1);
}
for (i = 0; i != message_count; i++) {
rc = zmq_msg_init_size (&msg, message_size);
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
exit (1);
}
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (zmq_msg_data (&msg), 0, message_size);
#endif
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
exit (1);
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
exit (1);
}
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
exit (1);
}
#if defined ZMQ_HAVE_WINDOWS
return 0;
#else
return NULL;
#endif
}
int main (int argc, char *argv[])
{
#if defined ZMQ_HAVE_WINDOWS
HANDLE local_thread;
#else
pthread_t local_thread;
#endif
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
void *watch;
unsigned long elapsed;
unsigned long throughput;
double megabits;
if (argc != 3) {
printf ("usage: inproc_thr <message-size> <message-count>\n");
return 1;
}
message_size = atoi (argv[1]);
message_count = atoi (argv[2]);
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_PULL);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_bind (s, "inproc://thr_test");
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
#if defined ZMQ_HAVE_WINDOWS
local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
if (local_thread == 0) {
printf ("error in _beginthreadex\n");
return -1;
}
#else
rc = pthread_create (&local_thread, NULL, worker, ctx);
if (rc != 0) {
printf ("error in pthread_create: %s\n", zmq_strerror (rc));
return -1;
}
#endif
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
watch = zmq_stopwatch_start ();
for (i = 0; i != message_count - 1; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
}
elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
#if defined ZMQ_HAVE_WINDOWS
DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
if (rc2 == WAIT_FAILED) {
printf ("error in WaitForSingleObject\n");
return -1;
}
BOOL rc3 = CloseHandle (local_thread);
if (rc3 == 0) {
printf ("error in CloseHandle\n");
return -1;
}
#else
rc = pthread_join (local_thread, NULL);
if (rc != 0) {
printf ("error in pthread_join: %s\n", zmq_strerror (rc));
return -1;
}
#endif
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
throughput =
(unsigned long) ((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
return 0;
}

116
perf/local_lat.cpp Normal file
View File

@@ -0,0 +1,116 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
int main (int argc, char *argv[])
{
const char *bind_to;
int roundtrip_count;
size_t message_size;
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
if (argc != 4) {
printf ("usage: local_lat <bind-to> <message-size> "
"<roundtrip-count>\n");
return 1;
}
bind_to = argv[1];
message_size = atoi (argv[2]);
roundtrip_count = atoi (argv[3]);
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_REP);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_bind (s, bind_to);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
zmq_sleep (1);
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}

161
perf/local_thr.cpp Normal file
View File

@@ -0,0 +1,161 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
// keys are arbitrary but must match remote_lat.cpp
const char server_prvkey[] = "{X}#>t#jRGaQ}gMhv=30r(Mw+87YGs+5%kh=i@f8";
int main (int argc, char *argv[])
{
const char *bind_to;
int message_count;
size_t message_size;
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
void *watch;
unsigned long elapsed;
double throughput;
double megabits;
int curve = 0;
if (argc != 4 && argc != 5) {
printf ("usage: local_thr <bind-to> <message-size> <message-count> "
"[<enable_curve>]\n");
return 1;
}
bind_to = argv[1];
message_size = atoi (argv[2]);
message_count = atoi (argv[3]);
if (argc >= 5 && atoi (argv[4])) {
curve = 1;
}
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_PULL);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
if (curve) {
rc = zmq_setsockopt (s, ZMQ_CURVE_SECRETKEY, server_prvkey,
sizeof (server_prvkey));
if (rc != 0) {
printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
return -1;
}
int server = 1;
rc = zmq_setsockopt (s, ZMQ_CURVE_SERVER, &server, sizeof (int));
if (rc != 0) {
printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_bind (s, bind_to);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
watch = zmq_stopwatch_start ();
for (i = 0; i != message_count - 1; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
}
elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
throughput = ((double) message_count / (double) elapsed * 1000000);
megabits = ((double) throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}

412
perf/proxy_thr.cpp Normal file
View File

@@ -0,0 +1,412 @@
/*
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include <stdarg.h>
#include <string.h>
#include <string>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#include <unistd.h>
#endif
/*
Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
Topology:
XPUB SUB
| |
+-----> XSUB -> XPUB -----/
| ^^^^^^^^^^^^
XPUB ZMQ proxy
All connections use "inproc" transport. The two XPUB sockets start
flooding the proxy. The throughput is computed using the bytes received
in the SUB socket.
*/
#define HWM 10000
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
#endif
#define TEST_ASSERT_SUCCESS_ERRNO(expr) \
test_assert_success_message_errno_helper (expr, NULL, #expr)
// This macro is used to avoid-variable warning. If used with an expression,
// the sizeof is not evaluated to avoid polluting the assembly code.
#ifdef NDEBUG
#define ASSERT_EXPR_SAFE(x) \
do { \
(void) sizeof (x); \
} while (0)
#else
#define ASSERT_EXPR_SAFE(x) assert (x)
#endif
static uint64_t message_count = 0;
static size_t message_size = 0;
typedef struct
{
void *context;
int thread_idx;
const char *frontend_endpoint[4];
const char *backend_endpoint[4];
const char *control_endpoint;
} proxy_hwm_cfg_t;
int test_assert_success_message_errno_helper (int rc_,
const char *msg_,
const char *expr_)
{
if (rc_ == -1) {
char buffer[512];
buffer[sizeof (buffer) - 1] =
0; // to ensure defined behavior with VC++ <= 2013
printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
exit (1);
}
return rc_;
}
static void set_hwm (void *skt)
{
int hwm = HWM;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
}
static void publisher_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
const int idx = cfg->thread_idx;
int optval;
int rc;
void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
assert (pubsocket);
set_hwm (pubsocket);
optval = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
optval = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
// Wait before starting TX operations till 1 subscriber has subscribed
// (in this test there's 1 subscriber only)
char buffer[32] = {};
rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
if (rc != 1) {
printf ("invalid response length: expected 1, received %d", rc);
exit (1);
}
if (buffer[0] != 1) {
printf ("invalid response value: expected 1, received %d",
(int) buffer[0]);
exit (1);
}
zmq_msg_t msg_orig;
rc = zmq_msg_init_size (&msg_orig, message_size);
assert (rc == 0);
memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
uint64_t send_count = 0;
while (send_count < message_count) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_copy (&msg, &msg_orig);
assert (rc == 0);
// Send the message to the socket
rc = zmq_msg_send (&msg, pubsocket, 0);
if (rc != -1) {
send_count++;
} else {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
}
}
zmq_close (pubsocket);
//printf ("publisher thread ended\n");
}
static void subscriber_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
const int idx = cfg->thread_idx;
void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
assert (subsocket);
set_hwm (subsocket);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (subsocket, cfg->backend_endpoint[idx]));
// Receive message_count messages
uint64_t rxsuccess = 0;
bool success = true;
while (success) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, subsocket, 0);
if (rc != -1) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
rxsuccess++;
}
if (rxsuccess == message_count)
break;
}
// Cleanup
zmq_close (subsocket);
//printf ("subscriber thread ended\n");
}
static void proxy_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
int rc;
// FRONTEND SUB
void *frontend_xsub = zmq_socket (
cfg->context,
ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
assert (frontend_xsub);
set_hwm (frontend_xsub);
// Bind FRONTEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
const char *ep = cfg->frontend_endpoint[i];
if (ep != NULL) {
assert (strlen (ep) > 5);
rc = zmq_bind (frontend_xsub, ep);
ASSERT_EXPR_SAFE (rc == 0);
}
}
// BACKEND PUB
void *backend_xpub = zmq_socket (
cfg->context,
ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
assert (backend_xpub);
int optval = 1;
rc =
zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
ASSERT_EXPR_SAFE (rc == 0);
set_hwm (backend_xpub);
// Bind BACKEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
const char *ep = cfg->backend_endpoint[i];
if (ep != NULL) {
assert (strlen (ep) > 5);
rc = zmq_bind (backend_xpub, ep);
ASSERT_EXPR_SAFE (rc == 0);
}
}
// CONTROL REP
void *control_rep = zmq_socket (
cfg->context,
ZMQ_REP); // This one is used by the proxy to receive&reply to commands
assert (control_rep);
// Bind CONTROL
rc = zmq_bind (control_rep, cfg->control_endpoint);
ASSERT_EXPR_SAFE (rc == 0);
// Start proxying!
zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
zmq_close (frontend_xsub);
zmq_close (backend_xpub);
zmq_close (control_rep);
//printf ("proxy thread ended\n");
}
void terminate_proxy (const proxy_hwm_cfg_t *cfg)
{
// CONTROL REQ
void *control_req = zmq_socket (
cfg->context,
ZMQ_REQ); // This one can be used to send command to the proxy
assert (control_req);
// Connect CONTROL-REQ: a socket to which send commands
int rc = zmq_connect (control_req, cfg->control_endpoint);
ASSERT_EXPR_SAFE (rc == 0);
// Ask the proxy to exit: the subscriber has received all messages
rc = zmq_send (control_req, "TERMINATE", 9, 0);
ASSERT_EXPR_SAFE (rc == 9);
zmq_close (control_req);
}
// The main thread simply starts some publishers, a proxy,
// and a subscriber. Finish when all packets are received.
int main (int argc, char *argv[])
{
if (argc != 3) {
printf ("usage: proxy_thr <message-size> <message-count>\n");
return 1;
}
message_size = atoi (argv[1]);
message_count = atoi (argv[2]);
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
void *context = zmq_ctx_new ();
assert (context);
int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
ASSERT_EXPR_SAFE (rv == 0);
// START ALL SECONDARY THREADS
const char *pub1 = "inproc://perf_pub1";
const char *pub2 = "inproc://perf_pub2";
const char *sub1 = "inproc://perf_backend";
proxy_hwm_cfg_t cfg_global = {};
cfg_global.context = context;
cfg_global.frontend_endpoint[0] = pub1;
cfg_global.frontend_endpoint[1] = pub2;
cfg_global.backend_endpoint[0] = sub1;
cfg_global.control_endpoint = "inproc://ctrl";
// Proxy
proxy_hwm_cfg_t cfg_proxy = cfg_global;
void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
assert (proxy != 0);
// Subscriber 1
proxy_hwm_cfg_t cfg_sub1 = cfg_global;
cfg_sub1.thread_idx = 0;
void *subscriber =
zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
assert (subscriber != 0);
// Start measuring
void *watch = zmq_stopwatch_start ();
// Publisher 1
proxy_hwm_cfg_t cfg_pub1 = cfg_global;
cfg_pub1.thread_idx = 0;
void *publisher1 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
assert (publisher1 != 0);
// Publisher 2
proxy_hwm_cfg_t cfg_pub2 = cfg_global;
cfg_pub2.thread_idx = 1;
void *publisher2 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
assert (publisher2 != 0);
// Wait for all packets to be received
zmq_threadclose (subscriber);
// Stop measuring
unsigned long elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
unsigned long throughput =
(unsigned long) ((double) message_count / (double) elapsed * 1000000);
double megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
// Wait for the end of publishers...
zmq_threadclose (publisher1);
zmq_threadclose (publisher2);
// ... then close the proxy
terminate_proxy (&cfg_proxy);
zmq_threadclose (proxy);
int rc = zmq_ctx_term (context);
ASSERT_EXPR_SAFE (rc == 0);
return 0;
}

129
perf/remote_lat.cpp Normal file
View File

@@ -0,0 +1,129 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main (int argc, char *argv[])
{
const char *connect_to;
int roundtrip_count;
size_t message_size;
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
void *watch;
unsigned long elapsed;
double latency;
if (argc != 4) {
printf ("usage: remote_lat <connect-to> <message-size> "
"<roundtrip-count>\n");
return 1;
}
connect_to = argv[1];
message_size = atoi (argv[2]);
roundtrip_count = atoi (argv[3]);
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_REQ);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_connect (s, connect_to);
if (rc != 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_msg_init_size (&msg, message_size);
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
return -1;
}
memset (zmq_msg_data (&msg), 0, message_size);
watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
}
elapsed = zmq_stopwatch_stop (watch);
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
latency = (double) elapsed / (roundtrip_count * 2);
printf ("message size: %d [B]\n", (int) message_size);
printf ("roundtrip count: %d\n", (int) roundtrip_count);
printf ("average latency: %.3f [us]\n", (double) latency);
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}

138
perf/remote_thr.cpp Normal file
View File

@@ -0,0 +1,138 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// keys are arbitrary but must match local_lat.cpp
const char server_pubkey[] = "DX4nh=yUn{-9ugra0X3Src4SU-4xTgqxcYY.+<SH";
const char client_pubkey[] = "<n^oA}I:66W+*ds3tAmi1+KJzv-}k&fC2aA5Bj0K";
const char client_prvkey[] = "9R9bV}[6z6DC-%$!jTVTKvWc=LEL{4i4gzUe$@Zx";
int main (int argc, char *argv[])
{
const char *connect_to;
int message_count;
int message_size;
void *ctx;
void *s;
int rc;
int i;
zmq_msg_t msg;
int curve = 0;
if (argc != 4 && argc != 5) {
printf ("usage: remote_thr <connect-to> <message-size> "
"<message-count> [<enable_curve>]\n");
return 1;
}
connect_to = argv[1];
message_size = atoi (argv[2]);
message_count = atoi (argv[3]);
if (argc >= 5 && atoi (argv[4])) {
curve = 1;
}
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_PUSH);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
if (curve) {
rc = zmq_setsockopt (s, ZMQ_CURVE_SECRETKEY, client_prvkey,
sizeof (client_prvkey));
if (rc != 0) {
printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_setsockopt (s, ZMQ_CURVE_PUBLICKEY, client_pubkey,
sizeof (client_pubkey));
if (rc != 0) {
printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_setsockopt (s, ZMQ_CURVE_SERVERKEY, server_pubkey,
sizeof (server_pubkey));
if (rc != 0) {
printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_connect (s, connect_to);
if (rc != 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
return -1;
}
for (i = 0; i != message_count; i++) {
rc = zmq_msg_init_size (&msg, message_size);
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}