Skip to content

Commit e35be0b

Browse files
authored
Fix MQTT OAuth wildcard vhost matching and user cleanup on disconnect (#1745)
### WHAT is this pull request doing? Fixes #1743 The MQTT connection factory used `user.permissions.has_key?(vhost)` for vhost permission checks, which only does exact hash key lookup. The AMQP connection factory uses `user.find_permission(vhost)` which supports wildcard matching (e.g. a scope with vhost `*` matching `/`). This caused MQTT OAuth connections to fail when JWT token scopes use wildcard vhosts, even though the same token worked for AMQP. Also adds `OAuthUser#cleanup` on MQTT client disconnect to prevent expiration monitoring fiber leaks, consistent with the AMQP client. ### HOW can this pull request be tested? Run with: `make test SPEC=spec/mqtt/oauth_spec.cr`
1 parent 8355efa commit e35be0b

3 files changed

Lines changed: 108 additions & 2 deletions

File tree

spec/mqtt/oauth_spec.cr

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
require "./spec_helper"
2+
require "../oauth_spec"
3+
4+
module MqttSpecs
5+
extend MqttHelpers
6+
extend MqttMatchers
7+
8+
describe "MQTT OAuth" do
9+
it "allows connection with wildcard vhost permissions" do
10+
with_server do |server|
11+
reader, writer = IO.pipe
12+
mqtt_io = MQTT::Protocol::IO.new(reader)
13+
conn_info = LavinMQ::ConnectionInfo.local
14+
15+
# OAuthUser with wildcard vhost "*" should match default vhost "/"
16+
permissions = {"*" => {config: /.*/, read: /.*/, write: /.*/}}
17+
user = OAuthUserHelper.create_user(RoughTime.utc + 1.hour, permissions)
18+
19+
broker = server.@mqtt_brokers.@brokers["/"]
20+
packet = MQTT::Protocol::Connect.new(
21+
client_id: "oauth-wildcard-test",
22+
clean_session: true,
23+
keepalive: 60u16,
24+
username: "testuser",
25+
password: nil,
26+
will: nil,
27+
)
28+
29+
broker.add_client(mqtt_io, conn_info, user, packet)
30+
31+
sleep 100.milliseconds
32+
33+
# Client should be connected
34+
reader.closed?.should be_false
35+
ensure
36+
reader.try &.close
37+
writer.try &.close
38+
end
39+
end
40+
41+
it "allows connection with exact vhost permissions" do
42+
with_server do |server|
43+
reader, writer = IO.pipe
44+
mqtt_io = MQTT::Protocol::IO.new(reader)
45+
conn_info = LavinMQ::ConnectionInfo.local
46+
47+
permissions = {"/" => {config: /.*/, read: /.*/, write: /.*/}}
48+
user = OAuthUserHelper.create_user(RoughTime.utc + 1.hour, permissions)
49+
50+
broker = server.@mqtt_brokers.@brokers["/"]
51+
packet = MQTT::Protocol::Connect.new(
52+
client_id: "oauth-exact-test",
53+
clean_session: true,
54+
keepalive: 60u16,
55+
username: "testuser",
56+
password: nil,
57+
will: nil,
58+
)
59+
60+
broker.add_client(mqtt_io, conn_info, user, packet)
61+
62+
sleep 100.milliseconds
63+
64+
reader.closed?.should be_false
65+
ensure
66+
reader.try &.close
67+
writer.try &.close
68+
end
69+
end
70+
71+
it "cleans up OAuthUser on disconnect" do
72+
with_server do |server|
73+
reader, writer = IO.pipe
74+
mqtt_io = MQTT::Protocol::IO.new(reader)
75+
conn_info = LavinMQ::ConnectionInfo.local
76+
77+
permissions = {"/" => {config: /.*/, read: /.*/, write: /.*/}}
78+
user = OAuthUserHelper.create_user(RoughTime.utc + 1.hour, permissions)
79+
80+
broker = server.@mqtt_brokers.@brokers["/"]
81+
packet = MQTT::Protocol::Connect.new(
82+
client_id: "oauth-cleanup-test",
83+
clean_session: true,
84+
keepalive: 60u16,
85+
username: "testuser",
86+
password: nil,
87+
will: nil,
88+
)
89+
90+
broker.add_client(mqtt_io, conn_info, user, packet)
91+
92+
# Close the socket to trigger disconnect and cleanup
93+
reader.close
94+
sleep 200.milliseconds
95+
96+
# The OAuthUser's token_updated channel should be closed by cleanup
97+
user.@token_updated.closed?.should be_true
98+
ensure
99+
writer.try &.close
100+
end
101+
end
102+
end
103+
end

src/lavinmq/mqtt/client.cr

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ module LavinMQ
8282
publish_will
8383
ensure
8484
@broker.remove_client(self)
85+
case user = @user
86+
when Auth::OAuthUser
87+
user.cleanup
88+
end
8589
@waitgroup.done
8690
close_socket
8791
@log.info { "Connection disconnected for user=#{@user.name} duration=#{duration}" }

src/lavinmq/mqtt/connection_factory.cr

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ module LavinMQ
6666

6767
user = @authenticator.authenticate(context)
6868
return unless user
69-
has_vhost_permissions = user.try &.permissions.has_key?(vhost)
70-
return unless has_vhost_permissions
69+
return unless user.find_permission(vhost)
7170
broker = @brokers[vhost]?
7271
return unless broker
7372

0 commit comments

Comments
 (0)