Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \
tests/test_hiccup_msg \
tests/test_zmq_ppoll_fd \
tests/test_xsub_verbose \
tests/test_pubsub_topics_count
tests/test_pubsub_topics_count \
tests/test_msg_ffn_external_storage

tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
Expand Down Expand Up @@ -1149,6 +1150,10 @@ tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp
tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests_test_msg_ffn_external_storage_SOURCES = tests/test_msg_ffn_external_storage.cpp
tests_test_msg_ffn_external_storage_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_msg_ffn_external_storage_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

if HAVE_FORK
test_apps += tests/test_zmq_ppoll_signals

Expand Down
11 changes: 11 additions & 0 deletions builds/gyp/project-tests.gypi
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@
'libzmq'
],
},
{
'target_name': 'test_msg_ffn_external_storage',
'type': 'executable',
'sources': [
'../../tests/test_msg_ffn_external_storage.cpp',
'../../tests/testutil.hpp'
],
'dependencies': [
'libzmq'
],
},
{
'target_name': 'test_msg_init',
'type': 'executable',
Expand Down
1 change: 1 addition & 0 deletions builds/gyp/project-tests.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<test name = "test_invalid_rep" />
<test name = "test_msg_flags" />
<test name = "test_msg_ffn" />
<test name = "test_msg_ffn_external_storage" />
<test name = "test_msg_init" />
<test name = "test_connect_resolve" />
<test name = "test_immediate" />
Expand Down
24 changes: 24 additions & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,30 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
ZMQ_EXPORT int
zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_);

/* Draft Msg control block type for init_external_storage. */
typedef struct zmq_msg_content_t
{
#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64))
__declspec (align (8)) unsigned char _[64];
#elif defined(_MSC_VER) \
&& (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM))
__declspec (align (4)) unsigned char _[64];
#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \
|| (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \
|| (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590)
unsigned char _[64] __attribute__ ((aligned (sizeof (void *))));
#else
unsigned char _[64];
#endif
} zmq_msg_content_t;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you adding a new type that is identical to the existing one? Just reuse that?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @bluca Thanks a bunch for looking at this,

I just changed this for zmq_msg_content_t to be a typedef alias of the zmq_msg_t.

I wanted them to be different types because conceptually they are - They just happen to be the same size/alignment requirements but they are used differently internally. One is a block for a zeroMQ message to be built in while the other is for a control block. I did not want to get confused about what types were going where/ what they were being used for.

Hence the desire for at least differently named types (even if they are just aliases now)

(I tested the change on our code base, and there are no issues as one would expect - Though I am not sure about what is going on with the MacOS CI here...)


ZMQ_EXPORT int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_);

/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
Expand Down
18 changes: 18 additions & 0 deletions src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ struct iovec
typedef char
check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];

// Compile time check whether msg_t::content_t fits into zmq_msg_content_t.
// It is expected to be larger.
typedef char check_msg_content_size
[sizeof (zmq::msg_t::content_t) <= sizeof (zmq_msg_content_t) ? 1 : -1];

void zmq_version (int *major_, int *minor_, int *patch_)
{
Expand Down Expand Up @@ -606,6 +610,20 @@ int zmq_msg_init_data (
->init_data (data_, size_, ffn_, hint_);
}

int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_)
{
return (reinterpret_cast<zmq::msg_t *> (msg_))
->init_external_storage (
reinterpret_cast<zmq::msg_t::content_t *> (content_), data_, size_,
ffn_, hint_);
}


int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
Expand Down
24 changes: 24 additions & 0 deletions src/zmq_draft.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_);
const char *zmq_msg_group (zmq_msg_t *msg_);
int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_);

/* Draft Msg control block type for init_external_storage. */
typedef struct zmq_msg_content_t
{
#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_ARM64))
__declspec (align (8)) unsigned char _[64];
#elif defined(_MSC_VER) \
&& (defined(_M_IX86) || defined(_M_ARM_ARMV7VE) || defined(_M_ARM))
__declspec (align (4)) unsigned char _[64];
#elif defined(__GNUC__) || defined(__INTEL_COMPILER) \
|| (defined(__SUNPRO_C) && __SUNPRO_C >= 0x590) \
|| (defined(__SUNPRO_CC) && __SUNPRO_CC >= 0x590)
unsigned char _[64] __attribute__ ((aligned (sizeof (void *))));
#else
unsigned char _[64];
#endif
} zmq_msg_content_t;

int zmq_msg_init_external_storage (zmq_msg_t *msg_,
zmq_msg_content_t *content_,
void *data_,
size_t size_,
zmq_free_fn *ffn_,
void *hint_);

/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ if(ENABLE_DRAFTS)
test_zmq_ppoll_fd
test_xsub_verbose
test_pubsub_topics_count
test_msg_ffn_external_storage
)

if(HAVE_FORK)
Expand Down
102 changes: 102 additions & 0 deletions tests/test_msg_ffn_external_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* SPDX-License-Identifier: MPL-2.0 */

#include "testutil.hpp"
#include "testutil_unity.hpp"

#include <string.h>

SETUP_TEARDOWN_TESTCONTEXT

void ffn (void *data_, void *hint_)
{
// Signal that ffn has been called by writing "freed" to hint
(void) data_; // Suppress 'unused' warnings at compile time
memcpy (hint_, (void *) "freed", 5);
}

void test_msg_init_ffn_external_storage ()
{
// Create the infrastructure
char my_endpoint[MAX_SOCKET_STRING];

void *router = test_context_socket (ZMQ_ROUTER);
bind_loopback_ipv4 (router, my_endpoint, sizeof my_endpoint);

void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));

// Test that creating and closing a message triggers ffn
zmq_msg_content_t *content = new zmq_msg_content_t;
zmq_msg_t msg;
char hint[5];
char data[255];
memset (data, 0, 255);
memcpy (data, (void *) "data", 4);
memcpy (hint, (void *) "hint", 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);

// Making and closing a copy triggers ffn
zmq_msg_t msg2;
zmq_msg_init (&msg2);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);

// Test that sending a message triggers ffn
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));

zmq_msg_send (&msg, dealer, 0);
char buf[255];
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4);

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);
memcpy (hint, (void *) "hint", 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

// Sending a copy of a message triggers ffn
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_external_storage (
&msg, content, (void *) data, 255, ffn, (void *) hint));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_copy (&msg2, &msg));

zmq_msg_send (&msg, dealer, 0);
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_INT (255, zmq_recv (router, buf, 255, 0));
TEST_ASSERT_EQUAL_STRING_LEN (data, buf, 4);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

msleep (SETTLE_TIME);
TEST_ASSERT_EQUAL_STRING_LEN ("freed", hint, 5);

// Deallocate the infrastructure.
test_context_socket_close (router);
test_context_socket_close (dealer);

delete content;
}

int main (void)
{
setup_test_environment ();

UNITY_BEGIN ();
RUN_TEST (test_msg_init_ffn_external_storage);
return UNITY_END ();
}
Loading