Skip to content

Commit 705775f

Browse files
walrocarlhoerberg
andauthored
Add support for mqtts to lavinmqperf (#1702)
### WHAT is this pull request doing? Adding support for mqtts to `lavinmqperf` ### HOW can this pull request be tested? `lavinmqperf mqtt throughput --uri=mqtts://<user>:<pass>@<host>` --------- Co-authored-by: Carl Hörberg <[email protected]>
1 parent 33568e3 commit 705775f

1 file changed

Lines changed: 35 additions & 6 deletions

File tree

src/lavinmqperf/mqtt/throughput.cr

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ require "json"
33
require "wait_group"
44
require "../perf"
55
require "atomic"
6+
require "openssl"
67

78
module LavinMQPerf
89
module MQTT
@@ -85,20 +86,18 @@ module LavinMQPerf
8586
@consumes = Atomic(UInt64).new(0_u64)
8687
@stopped = false
8788

88-
private def create_client(id : Int32, role : String) : {TCPSocket, LavinMQ::MQTT::IO}
89+
private def create_client(id : Int32, role : String) : {IO, LavinMQ::MQTT::IO}
8990
if @uri.host == @uri.port == nil
9091
@uri = URI.parse("mqtt://#{@uri.scheme}:#{@uri.path}")
9192
end
9293

94+
tls = @uri.scheme == "mqtts"
9395
host = @uri.host || "localhost"
94-
port = @uri.port || 1883
96+
port = @uri.port || (tls ? 8883 : 1883)
9597
user = @uri.user || "guest"
9698
password = @uri.password || "guest"
9799

98-
socket = TCPSocket.new(host, port)
99-
socket.keepalive = true
100-
socket.tcp_nodelay = false
101-
socket.sync = false
100+
socket = connect_socket(host, port, tls)
102101
io = LavinMQ::MQTT::IO.new(socket)
103102

104103
client_id = "#{role}-#{id}"
@@ -125,6 +124,32 @@ module LavinMQPerf
125124
{socket, io}
126125
end
127126

127+
private def connect_socket(host : String, port : Int32, tls : Bool) : IO
128+
tcp_socket = TCPSocket.new(host, port)
129+
tcp_socket.keepalive = true
130+
tcp_socket.tcp_nodelay = false
131+
tcp_socket.sync = false
132+
133+
if tls
134+
ssl_context = OpenSSL::SSL::Context::Client.new
135+
if verify_param = @uri.query_params["verify"]?
136+
if verify_mode = OpenSSL::SSL::VerifyMode.parse?(verify_param)
137+
ssl_context.verify_mode = verify_mode
138+
end
139+
end
140+
begin
141+
ssl_socket = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: ssl_context, sync_close: true, hostname: host)
142+
rescue ex
143+
tcp_socket.close rescue nil
144+
raise ex
145+
end
146+
ssl_socket.sync = false
147+
return ssl_socket
148+
end
149+
150+
tcp_socket
151+
end
152+
128153
def run(args = ARGV)
129154
super(args)
130155
mt = Fiber::ExecutionContext::Parallel.new("Consumer", maximum: System.cpu_count.to_i)
@@ -248,6 +273,8 @@ module LavinMQPerf
248273
end
249274
end
250275
LavinMQ::MQTT::Disconnect.new.to_io(io) if socket && !socket.closed?
276+
ensure
277+
socket.try &.close rescue nil
251278
end
252279

253280
# ameba:disable Metrics/CyclomaticComplexity
@@ -316,6 +343,8 @@ module LavinMQPerf
316343
LavinMQ::MQTT::Disconnect.new.to_io(io)
317344
io.flush
318345
end
346+
ensure
347+
socket.try &.close rescue nil
319348
end
320349

321350
private def rerun_on_exception(done, &)

0 commit comments

Comments
 (0)