Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Maven Central
uses: actions/setup-java@v2
with:
java-version: 8.0.292+10
java-version: 17
distribution: 'adopt'

server-id: ossrh
Expand All @@ -42,7 +42,7 @@ jobs:
- name: Setup GitHub Packages
uses: actions/setup-java@v2
with:
java-version: 8.0.292+10
java-version: 17
distribution: 'adopt'

- name: Publish to GitHub Packages
Expand Down
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Example Iterator will send only static records like as

[source,sh]
----
<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 event
<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 record
----

== Configurations
Expand Down
37 changes: 30 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
<url>https://teragrep.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>8</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<java.version>17</java.version>
<revision>0.0.1</revision>
<changelist>-SNAPSHOT</changelist>
<sha1/>
<rlp_01.version>4.0.1</rlp_01.version>
<rlp_03.version>0.0.2-SNAPSHOT</rlp_03.version>
<log4j2.version>2.23.1</log4j2.version>
<slf4j.version>2.0.13</slf4j.version>
</properties>
<licenses>
<license>
Expand All @@ -40,15 +42,36 @@
<!-- RELP -->
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlp_01</artifactId>
<version>${rlp_01.version}</version>
<artifactId>rlp_03</artifactId>
<version>${rlp_03.version}</version>
</dependency>
<!-- Syslog message -->
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlo_14</artifactId>
<version>1.0.1</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}-${revision}${changelist}${sha1}</finalName>
Expand Down Expand Up @@ -120,7 +143,7 @@
<goal>jar</goal>
</goals>
<configuration>
<source>8</source>
<source>17</source>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,26 @@
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Iterator;

public class ExampleRelpFlooderIterator implements Iterator<byte[]> {
private final byte[] record =
public class ExampleRelpFlooderIterator implements Iterator<String> {
private final String record =
new SyslogMessage()
.withTimestamp(Instant.now().toEpochMilli())
.withAppName("rlp_09")
.withHostname("localhost")
.withFacility(Facility.USER)
.withSeverity(Severity.INFORMATIONAL)
.withMsg("Example rlo_09 event")
.toRfc5424SyslogMessage()
.getBytes(StandardCharsets.UTF_8);
.withMsg("Example rlo_09 record")
.toRfc5424SyslogMessage();
@Override
public boolean hasNext() {
return true;
}

@Override
public byte[] next() {
public String next() {
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

public class ExampleRelpFlooderIteratorFactory implements RelpFlooderIteratorFactory {
@Override
public Iterator<byte[]> get(int threadId) {
public Iterator<String> get(int threadId) {
return new ExampleRelpFlooderIterator();
}
}
77 changes: 63 additions & 14 deletions src/main/java/com/teragrep/rlp_09/RelpFlooder.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,53 @@

package com.teragrep.rlp_09;

import com.teragrep.rlp_03.channel.socket.PlainFactory;
import com.teragrep.rlp_03.channel.context.ConnectContextFactory;
import com.teragrep.rlp_03.channel.socket.SocketFactory;
import com.teragrep.rlp_03.client.ClientFactory;
import com.teragrep.rlp_03.eventloop.EventLoop;
import com.teragrep.rlp_03.eventloop.EventLoopFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RelpFlooder {
private final ExecutorService executorService;
List<RelpFlooderTask> relpFlooderTaskList = new ArrayList<>();

public HashMap<Integer, Integer> getRecordsSentPerThread() {
HashMap<Integer, Integer> recordsSent = new HashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooder.class);
public HashMap<Integer, Long> getRecordsSentPerThread() {
HashMap<Integer, Long> recordsSent = new HashMap<>();
for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){
recordsSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getRecordsSent());
}
return recordsSent;
}

public int getTotalRecordsSent() {
int totalrecordsSent = 0;
public long getTotalRecordsSent() {
long totalrecordsSent = 0;
for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){
totalrecordsSent += relpFlooderTask.getRecordsSent();
}
return totalrecordsSent;
}

public HashMap<Integer, Integer> getTotalBytesSentPerThread() {
HashMap<Integer, Integer> bytesSent = new HashMap<>();
public HashMap<Integer, Long> getTotalBytesSentPerThread() {
HashMap<Integer, Long> bytesSent = new HashMap<>();
for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){
bytesSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getBytesSent());
}
return bytesSent;
}

public int getTotalBytesSent() {
int totalBytesSent = 0;
public long getTotalBytesSent() {
long totalBytesSent = 0;
for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){
totalBytesSent += relpFlooderTask.getBytesSent();
}
Expand All @@ -99,27 +108,67 @@ public RelpFlooder() {
public RelpFlooder(RelpFlooderConfig relpFlooderConfig){
this(relpFlooderConfig, new ExampleRelpFlooderIteratorFactory());
}

public RelpFlooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFactory iteratorFactory) {
this.relpFlooderConfig = relpFlooderConfig;
this.iteratorFactory = iteratorFactory;
this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.getThreads());
this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.threads);
}
public void start() {
for (int i=0; i<relpFlooderConfig.getThreads(); i++) {
RelpFlooderTask relpFlooderTask = new RelpFlooderTask(i, relpFlooderConfig, iteratorFactory.get(i));

public void start() {
LOGGER.trace("Creating a PlainFactory");
SocketFactory socketFactory = new PlainFactory();
LOGGER.trace("Creating ConnectContextFactory");
ConnectContextFactory connectContextFactory = new ConnectContextFactory(Executors.newFixedThreadPool(relpFlooderConfig.threads), socketFactory);
LOGGER.trace("Creating EventLoopfactory");
EventLoopFactory eventLoopFactory = new EventLoopFactory();
EventLoop eventLoop;
try {
LOGGER.trace("Creating EventLoop from EventLoopFactory");
eventLoop = eventLoopFactory.create();
} catch (IOException e) {
throw new RuntimeException("Can't create EventLoop: " + e.getMessage());
}
LOGGER.trace("Creating EventLoopThread");
Thread eventLoopThread = new Thread(eventLoop);
LOGGER.trace("Starting EventLoopThread");
eventLoopThread.start();
LOGGER.trace("Creating ClientFactory");
ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop);

for (int i=0; i<relpFlooderConfig.threads; i++) {
LOGGER.trace("Creating RelpFlooderTask number <{}>", i);
RelpFlooderTask relpFlooderTask = new RelpFlooderTask(i, relpFlooderConfig, iteratorFactory.get(i), clientFactory);
relpFlooderTaskList.add(relpFlooderTask);
}
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Got <{}> tasks in RelpFlooderTaskList: <{}>", relpFlooderTaskList.size(), relpFlooderTaskList);
}
try {
LOGGER.trace("Invoking all RelpFlooderTaskList tasks");
List<Future<Object>> futures = executorService.invokeAll(relpFlooderTaskList);
for(Future<Object> future : futures) {
LOGGER.trace("Waiting for future <{}>", future);
future.get();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
LOGGER.trace("Stopping EventLoop");
eventLoop.stop();
try {
LOGGER.trace("Joining EventLoopThread");
eventLoopThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LOGGER.trace("Shutting down ExecutorService");
executorService.shutdown();
}

public void stop() {
LOGGER.trace("Entering stop(), running RelpFlooderTask::stop");
relpFlooderTaskList.parallelStream().forEach(RelpFlooderTask::stop);
LOGGER.trace("Exiting stop()");
}
}
57 changes: 30 additions & 27 deletions src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,45 @@

package com.teragrep.rlp_09;

public class RelpFlooderConfig {

private String target="127.0.0.1";
private int port=601;
private int threads=1;

public void setTarget(String target) {
this.target = target;
}

public void setPort(int port) {
this.port = port;
}

public void setThreads(int threads) {
this.threads = threads;
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public String getTarget() {
return target;
}
public class RelpFlooderConfig {

public int getPort() {
return port;
}
private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooderConfig.class);
public final String target;
public final int port;
public final int threads;
public final int connectTimeout;
public final boolean waitForAcks;

public int getThreads() {
return threads;
public RelpFlooderConfig() {
this("127.0.0.1", 601);
}

public RelpFlooderConfig() {
public RelpFlooderConfig(String target, int port) {
this(target, port, 1, 1, true);
}

public RelpFlooderConfig(String target, int port, int threads) {
public RelpFlooderConfig(String target, int port, int threads, int connectTimeout, boolean waitForAcks) {
this.target = target;
this.port = port;
this.threads = threads;
this.connectTimeout = connectTimeout;
this.waitForAcks = waitForAcks;
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Got RelpFlooderConfig: <{}>", this);
}
}

@Override
public String toString() {
return "RelpFlooderConfig{" +
"target='" + target + '\'' +
", port=" + port +
", threads=" + threads +
", connectTimeout=" + connectTimeout +
", waitForAcks=" + waitForAcks +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@

import java.util.Iterator;
public interface RelpFlooderIteratorFactory {
Iterator<byte[]> get(int threadId);
Iterator<String> get(int threadId);
}
Loading