Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
java-version: 17
distribution: 'temurin'
server-id: github
settings-path: ${{ github.workspace }}
Expand Down
16 changes: 13 additions & 3 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Standalone version of https://github.com/teragrep/rlp_09[rlp_09]

- Simple to use
- Scales to as many threads as wanted
- Supports infinite flooding and sending specific amount of events
- Supports infinite flooding and sending specific amount of records, both per thread and total

== Limitations

Expand Down Expand Up @@ -35,12 +35,22 @@ Standalone version of https://github.com/teragrep/rlp_09[rlp_09]
|port|1601|RELP target port
|threads|4|RELP Flooder thread count
|useTls|false|Is TLS used for connections (Note: Not implemented)
|payloadSize|10|Record message extra padding
|payloadSize|10|Record extra padding
|reportInterval|10|How often report should be printed
|maxRecordsSent|-1|How many records should be sent (-1 for infinity). By default, this is per-thread but with usePerThreadIterator=false it is the total amount.
|usePerThreadIterator|true|Should each thread act as an independent iterator (will send maxRecordsSent * threads amount of records)
|connectTimeout|5|Seconds to wait while connecting
|waitForAcks|true|Whether to wait for acks or not. Recommended to keep on.
|mode|simple|Which mode to use. See modes for more information
|===

=== Modes

simple: Will flood identical static messages until each thread has sent total of maxRecordsSent. Fastest.

dynamic: Will flood dynamic messages until each thread has sent total of maxRecordsSent. Slower than simple but more dynamic content.

dynamic-shared: Will flood dynamic messages until total of maxRecordsSent records are sent across all threads. Slowest mode.

== Contributing

You can involve yourself with our project by https://github.com/teragrep/rlp_10/issues/new/choose[opening an issue] or submitting a pull request.
Expand Down
28 changes: 17 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
<name>rlp_10</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>8</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<java.version>17</java.version>
<revision>0.0.1</revision>
<changelist>-SNAPSHOT</changelist>
<sha1/>
<dropwizard-metrics.version>4.2.25</dropwizard-metrics.version>
<rlp_09.version>2.0.4</rlp_09.version>
<log4j2.version>2.22.1</log4j2.version>
<slf4j.version>2.0.5</slf4j.version>
<rlp_09.version>0.0.2-SNAPSHOT</rlp_09.version>
<log4j2.version>2.23.1</log4j2.version>
<slf4j.version>2.0.13</slf4j.version>
</properties>
<dependencies>
<!-- dropwizard metrics -->
Expand All @@ -32,22 +32,26 @@
<version>${rlp_09.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

</dependencies>
<build>
<finalName>${artifactId}-${revision}${changelist}${sha1}</finalName>
Expand Down Expand Up @@ -92,6 +96,8 @@
<exclude>rpm/**</exclude>
<!-- assembly file -->
<exclude>src/main/assembly/jar-with-dependencies.xml</exclude>
<!-- log4j2 -->
<exclude>src/main/resources/log4j2.xml</exclude>
<!-- readme -->
<exclude>README.adoc</exclude>
</excludes>
Expand Down
42 changes: 33 additions & 9 deletions src/main/java/com/teragrep/rlp_10/Flooder.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.teragrep.rlp_09.RelpFlooder;
import com.teragrep.rlp_09.RelpFlooderConfig;
import com.teragrep.rlp_09.RelpFlooderIteratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.HashMap;
Expand All @@ -65,19 +67,20 @@ class Flooder {
private final ConsoleReporter consoleReporter;
private Instant startTime;
private final int reportInterval;
private static final Logger LOGGER = LoggerFactory.getLogger(Flooder.class);
public Flooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFactory relpFlooderIteratorFactory, int reportInterval) {
this.relpFlooder = new RelpFlooder(relpFlooderConfig, relpFlooderIteratorFactory);
MetricRegistry metricRegistry = new MetricRegistry();
// Records sent total
metricRegistry.register(name( "records","sent", "total"), (Gauge<Integer>) relpFlooder::getTotalRecordsSent);
metricRegistry.register(name("records", "sent", "total", "perThread"), (Gauge<HashMap<Integer, Integer>>) relpFlooder::getRecordsSentPerThread);
metricRegistry.register(name( "records","sent", "total"), (Gauge<Long>) relpFlooder::getTotalRecordsSent);
metricRegistry.register(name("records", "sent", "total", "perThread"), (Gauge<HashMap<Integer, Long>>) relpFlooder::getRecordsSentPerThread);
// Records sent per second
metricRegistry.register(name("records", "sent", "perSecond"), (Gauge<Float>) this::reportRecordsPerSecond);
metricRegistry.register(name("records", "sent", "perSecond", "perThread"), (Gauge<HashMap<Integer, Float>>) this::reportRecordsPerSecondPerThread);
// Bytes sent total
metricRegistry.register(name("bytes", "sent", "total"), (Gauge<Integer>) relpFlooder::getTotalBytesSent);
metricRegistry.register(name("bytes", "sent", "total"), (Gauge<Long>) relpFlooder::getTotalBytesSent);
metricRegistry.register(name("bytes", "sent", "total", "MB"), (Gauge<Float>) this::reportTotalMegaBytesSent);
metricRegistry.register(name("bytes", "sent", "total", "perThread"), (Gauge<HashMap<Integer, Integer>>) relpFlooder::getTotalBytesSentPerThread);
metricRegistry.register(name("bytes", "sent", "total", "perThread"), (Gauge<HashMap<Integer, Long>>) relpFlooder::getTotalBytesSentPerThread);
// Bytes second record
metricRegistry.register(name("bytes", "sent", "perSecond"), (Gauge<Float>) this::reportBytesPerSecond);
metricRegistry.register(name("bytes", "sent", "perSecond", "MB"), (Gauge<Float>) this::reportMegaBytesSentPerSecond);
Expand All @@ -92,64 +95,85 @@ public Flooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFactory r
}

private String reportElapsed() {
LOGGER.trace("Reporting Elapsed");
float elapsed = reportElapsedSeconds();
return String.format("%d:%02d", (int) Math.floor(elapsed/60), (int) elapsed%60);
}
private float reportElapsedSeconds() {
LOGGER.trace("Reporting ElapsedSeconds");
return (Instant.now().toEpochMilli()-startTime.toEpochMilli())/1000f;
}

private float reportTotalMegaBytesSent() {
LOGGER.trace("Reporting TotalMegabytesSent");
return relpFlooder.getTotalBytesSent()/1024f/1024f;
}


private float reportMegaBytesSentPerSecond() {
LOGGER.trace("Reporting TotalMegabytesSentPerSecond");
Instant now = Instant.now();
float elapsed = (now.toEpochMilli() - startTime.toEpochMilli()) / 1000f;
return relpFlooder.getTotalBytesSent()/1024f/1024f/elapsed;
}

private Float reportRecordsPerSecond() {
LOGGER.trace("Reporting RecordsPerSecond");
Instant now = Instant.now();
float elapsed = (now.toEpochMilli() - startTime.toEpochMilli()) / 1000f;
return relpFlooder.getTotalRecordsSent()/elapsed;
}

private HashMap<Integer, Float> reportRecordsPerSecondPerThread() {
LOGGER.trace("Reporting RecordsPerSecondPerThread");
Instant now = Instant.now();
float elapsed = (now.toEpochMilli() - startTime.toEpochMilli())/1000f;
HashMap<Integer, Float> recordsPerThread = new HashMap<>();
for(Map.Entry<Integer, Integer> entry : relpFlooder.getRecordsSentPerThread().entrySet()) {
recordsPerThread.put(entry.getKey(), entry.getValue()/elapsed);
for(Map.Entry<Integer, Long> entry : relpFlooder.getRecordsSentPerThread().entrySet()) {
int key = entry.getKey();
float value = entry.getValue()/elapsed;
LOGGER.debug("Adding key <{}>, value <{}> to RecordsPerSecondPerThread", key, value);
recordsPerThread.put(key, value);
}
return recordsPerThread;
}

private Float reportBytesPerSecond() {
LOGGER.trace("Reporting BytesPerSecond");
Instant now = Instant.now();
float elapsed = (now.toEpochMilli() - startTime.toEpochMilli()) / 1000f;
return relpFlooder.getTotalBytesSent()/elapsed;
}

private HashMap<Integer, Float> reportBytesPerSecondPerThread() {
LOGGER.trace("Reporting BytesPerSecondPerThread");
Instant now = Instant.now();
float elapsed = (now.toEpochMilli() - startTime.toEpochMilli())/1000f;
HashMap<Integer, Float> bytesPerThread = new HashMap<>();
for(Map.Entry<Integer, Integer> entry : relpFlooder.getTotalBytesSentPerThread().entrySet()) {
bytesPerThread.put(entry.getKey(), entry.getValue()/elapsed);
for(Map.Entry<Integer, Long> entry : relpFlooder.getTotalBytesSentPerThread().entrySet()) {
int key = entry.getKey();
float value = entry.getValue()/elapsed;
LOGGER.debug("Adding key <{}>, value <{}> to BytesPerSecondPerThread", key, value);
bytesPerThread.put(key, value);
}
return bytesPerThread;
}

public void flood() {
LOGGER.trace("Entering flood()");
startTime = Instant.now();
consoleReporter.start(reportInterval, TimeUnit.SECONDS);
LOGGER.trace("Running relpFlooder.start()");
relpFlooder.start();
LOGGER.trace("Exiting flood()");
}

void stop() throws InterruptedException {
void stop() {
LOGGER.trace("Entering stop()");
LOGGER.trace("Stopping RelpFlooder");
relpFlooder.stop();
LOGGER.trace("Stopping ConsoleReporter");
consoleReporter.stop();
LOGGER.trace("Exiting stop()");
}
}
41 changes: 29 additions & 12 deletions src/main/java/com/teragrep/rlp_10/FlooderConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

package com.teragrep.rlp_10;

import org.apache.logging.log4j.Level;

import java.util.Properties;

class FlooderConfig {
public final String hostname;
public final String appname;
Expand All @@ -55,18 +59,31 @@ class FlooderConfig {
public final boolean useTls;
public final int payloadSize;
public final int reportInterval;
public final long maxMessagesSent;
public final boolean usePerThreadIterator;
public final long maxRecordsSent;
public final int connectTimeout;
public final boolean waitForAcks;
public final String mode;
public final Level selfLogging;
public final Level libLogging;
public final Level globalLogging;
public FlooderConfig() {
this.hostname = System.getProperty("hostname", "localhost");
this.appname = System.getProperty("appname", "rlp_10");
this.target = System.getProperty("target", "127.0.0.1");
this.port = Integer.parseInt(System.getProperty("port", "1601"));
this.threads = Integer.parseInt(System.getProperty("threads", "4"));
this.useTls = Boolean.parseBoolean(System.getProperty("useTls", "false"));
this.payloadSize = Integer.parseInt(System.getProperty("payloadSize", "10"));
this.reportInterval = Integer.parseInt(System.getProperty("reportInterval", "10"));
this.maxMessagesSent = Long.parseLong(System.getProperty("maxMessagesSent", "-1"));
this.usePerThreadIterator = Boolean.parseBoolean(System.getProperty("usePerThreadIterator", "true"));
this(System.getProperties());
}
public FlooderConfig(Properties properties) {
this.hostname = properties.getProperty("hostname", "localhost");
this.appname = properties.getProperty("appname", "rlp_10");
this.target = properties.getProperty("target", "127.0.0.1");
this.port = Integer.parseInt(properties.getProperty("port", "1601"));
this.threads = Integer.parseInt(properties.getProperty("threads", "4"));
this.useTls = Boolean.parseBoolean(properties.getProperty("useTls", "false"));
this.payloadSize = Integer.parseInt(properties.getProperty("payloadSize", "10"));
this.reportInterval = Integer.parseInt(properties.getProperty("reportInterval", "10"));
this.maxRecordsSent = Long.parseLong(properties.getProperty("maxRecordsSent", "-1"));
this.connectTimeout = Integer.parseInt(properties.getProperty("connectTimeout", "5"));
this.waitForAcks = Boolean.parseBoolean(properties.getProperty("waitForAcks", "true"));
this.mode = properties.getProperty("mode", "simple");
this.selfLogging = Level.toLevel(properties.getProperty("selfLogging", "info"), Level.INFO);
this.libLogging = Level.toLevel(properties.getProperty("libLogging", "info"), Level.INFO);
this.globalLogging = Level.toLevel(properties.getProperty("globalLogging", "info"), Level.INFO);
}
}
52 changes: 41 additions & 11 deletions src/main/java/com/teragrep/rlp_10/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,43 +47,73 @@
package com.teragrep.rlp_10;

import com.teragrep.rlp_09.RelpFlooderConfig;
import com.teragrep.rlp_09.RelpFlooderIteratorFactory;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
FlooderConfig flooderConfig = new FlooderConfig();
RelpFlooderConfig relpFlooderConfig = new RelpFlooderConfig(flooderConfig.target, flooderConfig.port, flooderConfig.threads);
Configurator.setLevel("com.teragrep.rlp_10", flooderConfig.selfLogging);
Configurator.setLevel("com.teragrep.rlp_09", flooderConfig.libLogging);
Configurator.setRootLevel(flooderConfig.globalLogging);
LOGGER.info("Using self logging level <[{}]>, lib logging level <[{}]>, global logging level <[{}]>", flooderConfig.selfLogging, flooderConfig.libLogging, flooderConfig.globalLogging);
RelpFlooderConfig relpFlooderConfig = new RelpFlooderConfig(flooderConfig.target, flooderConfig.port, flooderConfig.threads, flooderConfig.connectTimeout, flooderConfig.waitForAcks);
LOGGER.info("Using hostname <[{}]>", flooderConfig.hostname);
LOGGER.info("Using appname <[{}]>", flooderConfig.appname);
LOGGER.info("Adding <[{}]> characters to payload size", flooderConfig.payloadSize);
LOGGER.info("Sending records to: <[{}]:[{}]>", flooderConfig.target, flooderConfig.port);
Flooder flooder;
if(flooderConfig.usePerThreadIterator) {
LOGGER.info("Sending <[{}]> records per thread, total of <[{}]> records", flooderConfig.maxMessagesSent, flooderConfig.maxMessagesSent * flooderConfig.threads);
flooder = new Flooder(relpFlooderConfig, new PerThreadMessageIteratorFactory(flooderConfig), flooderConfig.reportInterval);
} else {
LOGGER.info("Sending total of <[{}]> records across all threads", flooderConfig.maxMessagesSent);
flooder = new Flooder(relpFlooderConfig, new SharedTotalMessageIteratorFactory(flooderConfig), flooderConfig.reportInterval);
LOGGER.info("Using <[{}]> mode with <[{}]> threads", flooderConfig.mode, flooderConfig.threads);
RelpFlooderIteratorFactory relpFlooderIteratorFactory;
switch(flooderConfig.mode) {
case "simple":
LOGGER.info(
"Sending <[{}]> static records per thread, total of <[{}]> records",
flooderConfig.maxRecordsSent < 0 ? "infinite" : flooderConfig.maxRecordsSent,
flooderConfig.maxRecordsSent < 0 ? "infinite" : flooderConfig.maxRecordsSent * flooderConfig.threads
);
relpFlooderIteratorFactory = new SimpleRecordIteratorFactory(flooderConfig);
break;
case "dynamic":
LOGGER.info(
"Sending <[{}]> dynamic records per thread, total of <[{}]> records",
flooderConfig.maxRecordsSent < 0 ? "infinite" : flooderConfig.maxRecordsSent,
flooderConfig.maxRecordsSent < 0 ? "infinite" : flooderConfig.maxRecordsSent * flooderConfig.threads
);
relpFlooderIteratorFactory = new PerThreadRecordIteratorFactory(flooderConfig);
break;
case "dynamic-shared":
LOGGER.info(
"Sending total of <[{}]> dynamic records across all threads",
flooderConfig.maxRecordsSent < 0 ? "infinite" : flooderConfig.maxRecordsSent * flooderConfig.threads
);
relpFlooderIteratorFactory = new SharedTotalRecordIteratorFactory(flooderConfig);
break;
default:
LOGGER.error("Invalid mode <[{}]> selected", flooderConfig.mode);
throw new IllegalStateException("Unexpected mode: " + flooderConfig.mode);
}
Flooder flooder = new Flooder(relpFlooderConfig, relpFlooderIteratorFactory, flooderConfig.reportInterval);
LOGGER.info("Waiting for acks: <[{}]>", flooderConfig.waitForAcks);
LOGGER.info("TLS enabled (FIXME: Implement): <[{}]>", flooderConfig.useTls);
LOGGER.info("Reporting stats every <[{}]> seconds", flooderConfig.reportInterval);

Thread shutdownHook = new Thread(() -> {
LOGGER.info("Shutting down...");
try {
flooder.stop();
} catch (InterruptedException | RuntimeException e) {
LOGGER.error("Failed to stop properly: {}", e.getMessage());
} catch (RuntimeException e) {
LOGGER.error("Failed to stop properly: <{}>", e.getMessage());
}
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
try {
flooder.flood();
}
catch (Exception e){
LOGGER.error("Caught an error while flooding: {}", e.getMessage());
LOGGER.error("Caught an error while flooding: <{}>", e.getMessage());
}
System.exit(0);
}
Expand Down
Loading