@@ -89,6 +89,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
8989 // Apply the subscription to the trie
9090 unsigned char *const data = (unsigned char *) sub.data ();
9191 const size_t size = sub.size ();
92+ metadata_t * metadata = sub.metadata ();
9293 if (size > 0 && (*data == 0 || *data == 1 )) {
9394 if (manual)
9495 {
@@ -100,7 +101,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
100101
101102 pending_pipes.push_back (pipe_);
102103 pending_data.push_back (blob_t (data, size));
103- pending_metadata.push_back (sub.metadata ());
104+ if (metadata)
105+ metadata->add_ref ();
106+ pending_metadata.push_back (metadata);
104107 pending_flags.push_back (0 );
105108 }
106109 else
@@ -117,15 +120,19 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
117120 if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
118121 (*data == 0 && verbose_unsubs && verbose_subs))) {
119122 pending_data.push_back (blob_t (data, size));
120- pending_metadata.push_back (sub.metadata ());
123+ if (metadata)
124+ metadata->add_ref ();
125+ pending_metadata.push_back (metadata);
121126 pending_flags.push_back (0 );
122127 }
123128 }
124129 }
125130 else {
126131 // Process user message coming upstream from xsub socket
127132 pending_data.push_back (blob_t (data, size));
128- pending_metadata.push_back (sub.metadata ());
133+ if (metadata)
134+ metadata->add_ref ();
135+ pending_metadata.push_back (metadata);
129136 pending_flags.push_back (sub.flags ());
130137 }
131138 sub.close ();
@@ -280,6 +287,8 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
280287 // set metadata only if there is some
281288 if (metadata_t * metadata = pending_metadata.front ()) {
282289 msg_->set_metadata (metadata);
290+ // Remove ref corresponding to vector placement
291+ metadata->drop_ref ();
283292 }
284293
285294 msg_->set_flags (pending_flags.front ());
0 commit comments