Skip to content

Commit d62ef17

Browse files
authored
connection pool objects (#821)
* add ConnectionSource and LazyHikariConnectionSource * use atomic reference for hikari datasource, clean up naming * make dataSource method package private * clean up tests and add exclude filter for test password * add completely static thread safe ExecutorDataSourceRegistry, TestingConnectionSource, refactor unit tests to avoid concurrency issues, use Supplier interface * bump HikariCP 4.0.3 -> 7.0.2 * remove unused exclude filter * rename ExecutorDataSourceRegistry to ConnectionPoolSingleton * apply spotless * add DataSourceState interface to replace null checks * add abstract to all interface methods * use DataSourceState in TestingConnectionSource, clean up tests
1 parent cb0cebb commit d62ef17

13 files changed

Lines changed: 1129 additions & 0 deletions

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@
192192
<artifactId>commons-text</artifactId>
193193
<version>1.10.0</version>
194194
</dependency>
195+
<dependency>
196+
<groupId>com.zaxxer</groupId>
197+
<artifactId>HikariCP</artifactId>
198+
<version>7.0.2</version>
199+
</dependency>
195200
<!-- teragrep exec syslog stream command dependencies -->
196201
<dependency>
197202
<groupId>com.teragrep</groupId>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2026 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.steps.teragrep.connection;
47+
48+
import com.typesafe.config.Config;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
52+
import java.sql.Connection;
53+
import java.sql.SQLException;
54+
55+
/**
56+
* Provides Connection objects from a static HikariCP datasource.
57+
* <p>
58+
* Methods connection() and resetForTesting() are thread locked on the class level
59+
*/
60+
public final class ConnectionPoolSingleton {
61+
62+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolSingleton.class);
63+
private static DataSourceState state = new StubDataSourceState();
64+
65+
private ConnectionPoolSingleton() {
66+
// blocks accidental initialization
67+
}
68+
69+
/**
70+
* Gets a Connection instance using a given config to instantiate a static connection pool.
71+
*
72+
* @param config config that is used to configure the connection pool, cannot change after initialization
73+
* @return Connection instance form the pool
74+
* @throws SQLException if there is an exception getting an SQL connection from the pool
75+
* @throws IllegalStateException if the config is changed after initialization
76+
*/
77+
public static synchronized Connection connection(final Config config) throws SQLException, IllegalStateException {
78+
LOGGER.debug("thread entered lock block");
79+
if (state.isStub()) {
80+
state = new InitializedDataSourceState(config);
81+
}
82+
else if (!state.config().equals(config)) {
83+
throw new IllegalStateException("Datasource was already initialized with a different config");
84+
}
85+
return state.dataSource().getConnection();
86+
}
87+
88+
// only for testing
89+
static synchronized void resetForTest() {
90+
LOGGER.warn("resetForTest() called, this should only happen in a test case");
91+
if (!state.isStub()) {
92+
state.dataSource().close();
93+
}
94+
state = new StubDataSourceState();
95+
}
96+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2026 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.steps.teragrep.connection;
47+
48+
import java.sql.Connection;
49+
import java.util.function.Supplier;
50+
51+
public interface ConnectionSource extends AutoCloseable, Supplier<Connection> {
52+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2026 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.steps.teragrep.connection;
47+
48+
import com.typesafe.config.Config;
49+
import com.zaxxer.hikari.HikariConfig;
50+
import com.zaxxer.hikari.HikariDataSource;
51+
52+
import java.util.Objects;
53+
import java.util.UUID;
54+
import java.util.function.Supplier;
55+
56+
final class DataSourceFromConfig implements Supplier<HikariDataSource> {
57+
58+
private final String sourceName;
59+
private final Config config;
60+
61+
DataSourceFromConfig(final Config config) {
62+
this("pth_10-pool-" + UUID.randomUUID(), config);
63+
}
64+
65+
DataSourceFromConfig(final String sourceName, final Config config) {
66+
this.sourceName = sourceName;
67+
this.config = config;
68+
}
69+
70+
@Override
71+
public HikariDataSource get() {
72+
final HikariConfig hikariConfig = new HikariConfig();
73+
// credentials
74+
hikariConfig.setJdbcUrl(connectionURL());
75+
hikariConfig.setUsername(connectionUsername());
76+
hikariConfig.setPassword(connectionPassword());
77+
// pool configuration
78+
hikariConfig.setMaximumPoolSize(256); // limit for safety, should not be reached
79+
hikariConfig.setMinimumIdle(0); // no hanging connections inside executor
80+
hikariConfig.setAutoCommit(true);
81+
hikariConfig.setConnectionTimeout(30000);
82+
hikariConfig.setValidationTimeout(5000);
83+
hikariConfig.setPoolName(sourceName);
84+
return new HikariDataSource(hikariConfig);
85+
}
86+
87+
private String connectionUsername() {
88+
final String username;
89+
final String BLOOMDB_USERNAME_CONFIG_ITEM = "dpl.pth_10.bloom.db.username";
90+
if (config.hasPath(BLOOMDB_USERNAME_CONFIG_ITEM)) {
91+
username = config.getString(BLOOMDB_USERNAME_CONFIG_ITEM);
92+
if (username == null || username.isEmpty()) {
93+
throw new RuntimeException("Database username not set.");
94+
}
95+
}
96+
else {
97+
throw new RuntimeException("Missing configuration item: '" + BLOOMDB_USERNAME_CONFIG_ITEM + "'.");
98+
}
99+
return username;
100+
}
101+
102+
private String connectionPassword() {
103+
final String password;
104+
final String BLOOMDB_PASSWORD_CONFIG_ITEM = "dpl.pth_10.bloom.db.password";
105+
if (config.hasPath(BLOOMDB_PASSWORD_CONFIG_ITEM)) {
106+
password = config.getString(BLOOMDB_PASSWORD_CONFIG_ITEM);
107+
if (password == null) {
108+
throw new RuntimeException("Database password not set.");
109+
}
110+
}
111+
else {
112+
throw new RuntimeException("Missing configuration item: '" + BLOOMDB_PASSWORD_CONFIG_ITEM + "'.");
113+
}
114+
return password;
115+
}
116+
117+
private String connectionURL() {
118+
final String databaseUrl;
119+
final String BLOOMDB_URL_CONFIG_ITEM = "dpl.pth_06.bloom.db.url";
120+
if (config.hasPath(BLOOMDB_URL_CONFIG_ITEM)) {
121+
databaseUrl = config.getString(BLOOMDB_URL_CONFIG_ITEM);
122+
if (databaseUrl == null || databaseUrl.isEmpty()) {
123+
throw new RuntimeException("Database url not set.");
124+
}
125+
}
126+
else {
127+
throw new RuntimeException("Missing configuration item: '" + BLOOMDB_URL_CONFIG_ITEM + "'.");
128+
}
129+
return databaseUrl;
130+
}
131+
132+
@Override
133+
public boolean equals(final Object o) {
134+
final boolean rv;
135+
if (o == null) {
136+
rv = false;
137+
}
138+
else if (getClass() != o.getClass()) {
139+
rv = false;
140+
}
141+
else {
142+
final DataSourceFromConfig that = (DataSourceFromConfig) o;
143+
rv = Objects.equals(sourceName, that.sourceName) && Objects.equals(config, that.config);
144+
}
145+
return rv;
146+
}
147+
148+
@Override
149+
public int hashCode() {
150+
return Objects.hash(sourceName, config);
151+
}
152+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2026 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.steps.teragrep.connection;
47+
48+
import com.typesafe.config.Config;
49+
import com.zaxxer.hikari.HikariDataSource;
50+
51+
public interface DataSourceState {
52+
53+
public abstract boolean isStub();
54+
55+
public abstract HikariDataSource dataSource();
56+
57+
public abstract Config config();
58+
59+
}

0 commit comments

Comments
 (0)