Skip to content

Commit 8bba858

Browse files
authored
Disconnect MQTT clients on OAuth token expiration (#1746)
### WHAT is this pull request doing? Fixes #1744 The AMQP client sets up an `on_expiration` callback that closes the connection when an OAuth token expires. The MQTT client had no equivalent, so MQTT clients with expired tokens remained connected indefinitely. Adds the same `on_expiration` callback to the MQTT client, closing the connection with reason "token expired" when the token lifetime runs out. ### HOW can this pull request be tested? Run with: `make test SPEC=spec/mqtt/oauth_expiration_spec.cr`
1 parent fe9d3d7 commit 8bba858

2 files changed

Lines changed: 80 additions & 0 deletions

File tree

spec/mqtt/oauth_expiration_spec.cr

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
require "./spec_helper"
2+
require "../oauth_spec"
3+
4+
module MqttSpecs
5+
extend MqttHelpers
6+
extend MqttMatchers
7+
8+
describe "MQTT OAuth token expiration" do
9+
it "disconnects client when token expires" do
10+
with_server do |server|
11+
# Create a pipe pair to simulate the MQTT socket
12+
reader, writer = IO.pipe
13+
mqtt_io = MQTT::Protocol::IO.new(reader)
14+
conn_info = LavinMQ::ConnectionInfo.local
15+
16+
# Create OAuthUser with a very short-lived token
17+
permissions = {"/" => {config: /.*/, read: /.*/, write: /.*/}}
18+
user = OAuthUserHelper.create_user(RoughTime.utc + 50.milliseconds, permissions)
19+
20+
broker = server.@mqtt_brokers.@brokers["/"]
21+
packet = MQTT::Protocol::Connect.new(
22+
client_id: "oauth-expiry-test",
23+
clean_session: true,
24+
keepalive: 60u16,
25+
username: "testuser",
26+
password: nil,
27+
will: nil,
28+
)
29+
30+
broker.add_client(mqtt_io, conn_info, user, packet)
31+
32+
# Client should be connected
33+
reader.closed?.should be_false
34+
35+
# Wait for token to expire and the on_expiration callback to fire
36+
wait_for { reader.closed? }.should be_true
37+
ensure
38+
writer.try &.close
39+
end
40+
end
41+
42+
it "does not disconnect client when token is still valid" do
43+
with_server do |server|
44+
reader, writer = IO.pipe
45+
mqtt_io = MQTT::Protocol::IO.new(reader)
46+
conn_info = LavinMQ::ConnectionInfo.local
47+
48+
# Create OAuthUser with a long-lived token
49+
permissions = {"/" => {config: /.*/, read: /.*/, write: /.*/}}
50+
user = OAuthUserHelper.create_user(RoughTime.utc + 1.hour, permissions)
51+
52+
broker = server.@mqtt_brokers.@brokers["/"]
53+
packet = MQTT::Protocol::Connect.new(
54+
client_id: "oauth-valid-test",
55+
clean_session: true,
56+
keepalive: 60u16,
57+
username: "testuser",
58+
password: nil,
59+
will: nil,
60+
)
61+
62+
broker.add_client(mqtt_io, conn_info, user, packet)
63+
64+
sleep 50.milliseconds
65+
66+
# Client should still be connected
67+
reader.closed?.should be_false
68+
ensure
69+
reader.try &.close
70+
writer.try &.close
71+
end
72+
end
73+
end
74+
end

src/lavinmq/mqtt/client.cr

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ module LavinMQ
4141
@log = Logger.new(Log, metadata)
4242
@log.info { "Connection established for user=#{@user.name}" }
4343
spawn read_loop, name: "MQTT read_loop #{@connection_info.remote_address}"
44+
case user = @user
45+
when Auth::OAuthUser
46+
user.on_expiration do
47+
close("token expired")
48+
end
49+
end
4450
end
4551

4652
def client_name

0 commit comments

Comments
 (0)