Skip to content

Commit c1d2a4d

Browse files
authored
Connection pool (#70)
* add pool from lsh_01 * RenewableRelpConnection * move managed client code to com.teragrep.rlp_01.client * add PoolTest * improve PoolTest * extract Pool interface from UnboundPool * refactor RelpConnectionFactory to support SSLEngine provision * remove unused import from RelpConnectionFactory * SendMessageTest from rlp_03, it rather belongs here * test connection pooling * include cleanup to ManagedConnectionTest * change Period to Duration in RelpConfig * add testPooledRenewedConnections and testPooledReboundConnections * change rebound managed connections to disconnect cleanly by adding new method reconnect() to IManagedRelpConnection, improve tests * change assertAll -> assertDoesNotThrow. in SendMessageTest use AfterEach for messageList cleanup * remove Thread.sleep from trace-logging in SendMessageTest
1 parent 65c8f55 commit c1d2a4d

20 files changed

Lines changed: 1452 additions & 0 deletions

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@
6464
<scope>test</scope>
6565
<version>${junit.version}</version>
6666
</dependency>
67+
<dependency>
68+
<groupId>com.teragrep</groupId>
69+
<artifactId>rlp_03</artifactId>
70+
<version>9.0.0</version>
71+
<scope>test</scope>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.slf4j</groupId>
75+
<artifactId>slf4j-simple</artifactId>
76+
<version>2.0.13</version>
77+
<scope>test</scope>
78+
</dependency>
6779
</dependencies>
6880

6981
<build>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Java Reliable Event Logging Protocol Library RLP-01
3+
Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.teragrep.rlp_01.client;
18+
19+
import com.teragrep.rlp_01.pool.Poolable;
20+
21+
import java.io.IOException;
22+
23+
public interface IManagedRelpConnection extends Poolable {
24+
void reconnect();
25+
26+
void connect() throws IOException;
27+
void forceReconnect();
28+
void ensureSent(byte[] bytes);
29+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Java Reliable Event Logging Protocol Library RLP-01
3+
Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.teragrep.rlp_01.client;
18+
19+
import com.teragrep.rlp_01.RelpBatch;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.TimeoutException;
23+
24+
// TODO refactor RelpConnection into an interface and RelpConnectionImpl and remove this
25+
public interface IRelpConnection {
26+
27+
int getReadTimeout();
28+
29+
void setReadTimeout(int readTimeout);
30+
31+
int getWriteTimeout();
32+
33+
void setWriteTimeout(int writeTimeout);
34+
35+
int getConnectionTimeout();
36+
37+
void setConnectionTimeout(int timeout);
38+
39+
void setKeepAlive(boolean on);
40+
41+
int getRxBufferSize();
42+
43+
void setRxBufferSize(int size);
44+
45+
int getTxBufferSize();
46+
47+
void setTxBufferSize(int size);
48+
49+
boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException;
50+
51+
void tearDown();
52+
53+
boolean disconnect() throws IOException, IllegalStateException, TimeoutException;
54+
55+
void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException;
56+
57+
RelpConfig relpConfig();
58+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Java Reliable Event Logging Protocol Library RLP-01
3+
Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.teragrep.rlp_01.client;
18+
19+
import com.teragrep.rlp_01.RelpBatch;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.TimeoutException;
23+
24+
public class ManagedRelpConnection implements IManagedRelpConnection {
25+
26+
private final IRelpConnection relpConnection;
27+
private boolean hasConnected;
28+
29+
30+
public ManagedRelpConnection(IRelpConnection relpConnection) {
31+
this.relpConnection = relpConnection;
32+
this.hasConnected = false;
33+
}
34+
35+
@Override
36+
public void forceReconnect() {
37+
tearDown();
38+
connect();
39+
}
40+
41+
@Override
42+
public void reconnect() {
43+
close();
44+
connect();
45+
}
46+
47+
@Override
48+
public void connect() {
49+
boolean connected = false;
50+
while (!connected) {
51+
try {
52+
this.hasConnected = true;
53+
connected = relpConnection
54+
.connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort);
55+
}
56+
catch (Exception e) {
57+
System.err.println(
58+
"Failed to connect to relp server <["+relpConnection.relpConfig().relpTarget+"]>:<["+relpConnection.relpConfig().relpPort+"]>: <"+e.getMessage()+">");
59+
60+
try {
61+
Thread.sleep(relpConnection.relpConfig().relpReconnectInterval);
62+
}
63+
catch (InterruptedException exception) {
64+
System.err.println("Reconnection timer interrupted, reconnecting now");
65+
}
66+
}
67+
}
68+
}
69+
70+
private void tearDown() {
71+
/*
72+
TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown()
73+
see https://github.com/teragrep/rlp_01/issues/63 for further info
74+
*/
75+
if (hasConnected) {
76+
relpConnection.tearDown();
77+
}
78+
}
79+
80+
@Override
81+
public void ensureSent(byte[] bytes) {
82+
// avoid unnecessary exception for fresh connections
83+
if (!hasConnected) {
84+
connect();
85+
}
86+
87+
final RelpBatch relpBatch = new RelpBatch();
88+
relpBatch.insert(bytes);
89+
boolean notSent = true;
90+
while (notSent) {
91+
try {
92+
relpConnection.commit(relpBatch);
93+
}
94+
catch (IllegalStateException | IOException | TimeoutException e) {
95+
System.err.println("Exception <"+e.getMessage()+"> while sending relpBatch. Will retry");
96+
}
97+
if (!relpBatch.verifyTransactionAll()) {
98+
relpBatch.retryAllFailed();
99+
this.tearDown();
100+
this.connect();
101+
}
102+
else {
103+
notSent = false;
104+
}
105+
}
106+
}
107+
108+
@Override
109+
public boolean isStub() {
110+
return false;
111+
}
112+
113+
@Override
114+
public void close() {
115+
try {
116+
this.relpConnection.disconnect();
117+
}
118+
catch (IllegalStateException | IOException | TimeoutException e) {
119+
System.err.println("Forcefully closing connection due to exception <"+e.getMessage()+">");
120+
}
121+
finally {
122+
tearDown();
123+
}
124+
}
125+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
Java Reliable Event Logging Protocol Library RLP-01
3+
Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.teragrep.rlp_01.client;
18+
19+
import java.io.IOException;
20+
21+
public class ManagedRelpConnectionStub implements IManagedRelpConnection {
22+
23+
@Override
24+
public void reconnect() {
25+
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
26+
}
27+
28+
@Override
29+
public void connect() throws IOException {
30+
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
31+
}
32+
33+
@Override
34+
public void forceReconnect() {
35+
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
36+
}
37+
38+
@Override
39+
public void ensureSent(byte[] bytes) {
40+
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
41+
}
42+
43+
@Override
44+
public boolean isStub() {
45+
return true;
46+
}
47+
48+
@Override
49+
public void close() {
50+
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
51+
}
52+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Java Reliable Event Logging Protocol Library RLP-01
3+
Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.teragrep.rlp_01.client;
18+
19+
import java.io.IOException;
20+
21+
public class RebindableRelpConnection implements IManagedRelpConnection {
22+
private final IManagedRelpConnection managedRelpConnection;
23+
private int recordsSent;
24+
private final int rebindRequestAmount;
25+
26+
public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, int rebindRequestAmount) {
27+
this.managedRelpConnection = managedRelpConnection;
28+
this.recordsSent = 0;
29+
this.rebindRequestAmount = rebindRequestAmount;
30+
}
31+
32+
@Override
33+
public void reconnect() {
34+
managedRelpConnection.reconnect();
35+
}
36+
37+
@Override
38+
public void connect() throws IOException {
39+
managedRelpConnection.connect();
40+
}
41+
42+
@Override
43+
public void forceReconnect() {
44+
managedRelpConnection.forceReconnect();
45+
}
46+
47+
@Override
48+
public void ensureSent(byte[] bytes) {
49+
if (recordsSent >= rebindRequestAmount) {
50+
reconnect();
51+
recordsSent = 0;
52+
}
53+
managedRelpConnection.ensureSent(bytes);
54+
recordsSent++;
55+
}
56+
57+
@Override
58+
public boolean isStub() {
59+
return false;
60+
}
61+
62+
@Override
63+
public void close() throws IOException {
64+
managedRelpConnection.close();
65+
}
66+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.teragrep.rlp_01.client;
2+
3+
import java.time.Duration;
4+
5+
public class RelpConfig {
6+
public final String relpTarget;
7+
public final int relpPort;
8+
public final int relpReconnectInterval;
9+
public final int rebindRequestAmount;
10+
public final boolean rebindEnabled;
11+
public final Duration maxIdle;
12+
public final boolean maxIdleEnabled;
13+
14+
public RelpConfig(String relpTarget, int relpPort, int relpReconnectInterval, int rebindRequestAmount, boolean rebindEnabled, Duration maxIdle, boolean maxIdleEnabled) {
15+
this.relpTarget = relpTarget;
16+
this.relpPort = relpPort;
17+
this.relpReconnectInterval = relpReconnectInterval;
18+
this.rebindRequestAmount = rebindRequestAmount;
19+
this.rebindEnabled = rebindEnabled;
20+
this.maxIdle = maxIdle;
21+
this.maxIdleEnabled = maxIdleEnabled;
22+
}
23+
}

0 commit comments

Comments
 (0)