diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index e873186..3ed29c2 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -21,7 +21,7 @@ jobs: - name: Setup Maven Central uses: actions/setup-java@v2 with: - java-version: 8.0.292+10 + java-version: 17 distribution: 'adopt' server-id: ossrh @@ -42,7 +42,7 @@ jobs: - name: Setup GitHub Packages uses: actions/setup-java@v2 with: - java-version: 8.0.292+10 + java-version: 17 distribution: 'adopt' - name: Publish to GitHub Packages diff --git a/README.adoc b/README.adoc index 81a8776..15607dc 100644 --- a/README.adoc +++ b/README.adoc @@ -47,7 +47,7 @@ Example Iterator will send only static records like as [source,sh] ---- -<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 event +<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 record ---- == Configurations diff --git a/pom.xml b/pom.xml index 961400d..53b00d4 100644 --- a/pom.xml +++ b/pom.xml @@ -9,13 +9,15 @@ https://teragrep.com UTF-8 - 8 - 8 - 8 + 17 + 17 + 17 0.0.1 -SNAPSHOT - 4.0.1 + 0.0.2-SNAPSHOT + 2.23.1 + 2.0.13 @@ -40,8 +42,8 @@ com.teragrep - rlp_01 - ${rlp_01.version} + rlp_03 + ${rlp_03.version} @@ -49,6 +51,27 @@ rlo_14 1.0.1 + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-slf4j2-impl + ${log4j2.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + ${artifactId}-${revision}${changelist}${sha1} @@ -120,7 +143,7 @@ jar - 8 + 17 diff --git a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java index 203fbba..0d0fa55 100644 --- a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java +++ b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java @@ -50,28 +50,26 @@ import com.teragrep.rlo_14.Severity; import com.teragrep.rlo_14.SyslogMessage; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Iterator; -public class ExampleRelpFlooderIterator implements Iterator { - private final byte[] record = +public class ExampleRelpFlooderIterator implements Iterator { + private final String record = new SyslogMessage() .withTimestamp(Instant.now().toEpochMilli()) .withAppName("rlp_09") .withHostname("localhost") .withFacility(Facility.USER) .withSeverity(Severity.INFORMATIONAL) - .withMsg("Example rlo_09 event") - .toRfc5424SyslogMessage() - .getBytes(StandardCharsets.UTF_8); + .withMsg("Example rlo_09 record") + .toRfc5424SyslogMessage(); @Override public boolean hasNext() { return true; } @Override - public byte[] next() { + public String next() { return record; } } diff --git a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java index 3624a78..97c9952 100644 --- a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java +++ b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java @@ -50,7 +50,7 @@ public class ExampleRelpFlooderIteratorFactory implements RelpFlooderIteratorFactory { @Override - public Iterator get(int threadId) { + public Iterator get(int threadId) { return new ExampleRelpFlooderIterator(); } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java index de436f8..5818a53 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java @@ -46,10 +46,19 @@ package com.teragrep.rlp_09; +import com.teragrep.rlp_03.channel.socket.PlainFactory; +import com.teragrep.rlp_03.channel.context.ConnectContextFactory; +import com.teragrep.rlp_03.channel.socket.SocketFactory; +import com.teragrep.rlp_03.client.ClientFactory; +import com.teragrep.rlp_03.eventloop.EventLoop; +import com.teragrep.rlp_03.eventloop.EventLoopFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -57,33 +66,33 @@ public class RelpFlooder { private final ExecutorService executorService; List relpFlooderTaskList = new ArrayList<>(); - - public HashMap getRecordsSentPerThread() { - HashMap recordsSent = new HashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooder.class); + public HashMap getRecordsSentPerThread() { + HashMap recordsSent = new HashMap<>(); for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ recordsSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getRecordsSent()); } return recordsSent; } - public int getTotalRecordsSent() { - int totalrecordsSent = 0; + public long getTotalRecordsSent() { + long totalrecordsSent = 0; for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ totalrecordsSent += relpFlooderTask.getRecordsSent(); } return totalrecordsSent; } - public HashMap getTotalBytesSentPerThread() { - HashMap bytesSent = new HashMap<>(); + public HashMap getTotalBytesSentPerThread() { + HashMap bytesSent = new HashMap<>(); for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ bytesSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getBytesSent()); } return bytesSent; } - public int getTotalBytesSent() { - int totalBytesSent = 0; + public long getTotalBytesSent() { + long totalBytesSent = 0; for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ totalBytesSent += relpFlooderTask.getBytesSent(); } @@ -99,27 +108,67 @@ public RelpFlooder() { public RelpFlooder(RelpFlooderConfig relpFlooderConfig){ this(relpFlooderConfig, new ExampleRelpFlooderIteratorFactory()); } + public RelpFlooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFactory iteratorFactory) { this.relpFlooderConfig = relpFlooderConfig; this.iteratorFactory = iteratorFactory; - this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.getThreads()); + this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.threads); } - public void start() { - for (int i=0; i", i); + RelpFlooderTask relpFlooderTask = new RelpFlooderTask(i, relpFlooderConfig, iteratorFactory.get(i), clientFactory); relpFlooderTaskList.add(relpFlooderTask); } + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Got <{}> tasks in RelpFlooderTaskList: <{}>", relpFlooderTaskList.size(), relpFlooderTaskList); + } try { + LOGGER.trace("Invoking all RelpFlooderTaskList tasks"); List> futures = executorService.invokeAll(relpFlooderTaskList); for(Future future : futures) { + LOGGER.trace("Waiting for future <{}>", future); future.get(); } } catch (Exception e) { throw new RuntimeException(e); } + LOGGER.trace("Stopping EventLoop"); + eventLoop.stop(); + try { + LOGGER.trace("Joining EventLoopThread"); + eventLoopThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + LOGGER.trace("Shutting down ExecutorService"); + executorService.shutdown(); } public void stop() { + LOGGER.trace("Entering stop(), running RelpFlooderTask::stop"); relpFlooderTaskList.parallelStream().forEach(RelpFlooderTask::stop); + LOGGER.trace("Exiting stop()"); } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java index bfb28c3..736da69 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java @@ -46,42 +46,45 @@ package com.teragrep.rlp_09; -public class RelpFlooderConfig { - - private String target="127.0.0.1"; - private int port=601; - private int threads=1; - - public void setTarget(String target) { - this.target = target; - } - - public void setPort(int port) { - this.port = port; - } - - public void setThreads(int threads) { - this.threads = threads; - } +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - public String getTarget() { - return target; - } +public class RelpFlooderConfig { - public int getPort() { - return port; - } + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooderConfig.class); + public final String target; + public final int port; + public final int threads; + public final int connectTimeout; + public final boolean waitForAcks; - public int getThreads() { - return threads; + public RelpFlooderConfig() { + this("127.0.0.1", 601); } - public RelpFlooderConfig() { + public RelpFlooderConfig(String target, int port) { + this(target, port, 1, 1, true); } - public RelpFlooderConfig(String target, int port, int threads) { + public RelpFlooderConfig(String target, int port, int threads, int connectTimeout, boolean waitForAcks) { this.target = target; this.port = port; this.threads = threads; + this.connectTimeout = connectTimeout; + this.waitForAcks = waitForAcks; + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Got RelpFlooderConfig: <{}>", this); + } + } + + @Override + public String toString() { + return "RelpFlooderConfig{" + + "target='" + target + '\'' + + ", port=" + port + + ", threads=" + threads + + ", connectTimeout=" + connectTimeout + + ", waitForAcks=" + waitForAcks + + '}'; } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderIteratorFactory.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderIteratorFactory.java index 8c063d8..5f2c27c 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderIteratorFactory.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderIteratorFactory.java @@ -48,5 +48,5 @@ import java.util.Iterator; public interface RelpFlooderIteratorFactory { - Iterator get(int threadId); + Iterator get(int threadId); } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 778fcfd..0fbe8ab 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -46,88 +46,104 @@ package com.teragrep.rlp_09; -import com.teragrep.rlp_01.RelpBatch; -import com.teragrep.rlp_01.RelpConnection; +import com.teragrep.rlp_03.client.Client; +import com.teragrep.rlp_03.client.ClientFactory; +import com.teragrep.rlp_03.frame.RelpFrame; +import com.teragrep.rlp_03.frame.RelpFrameFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; class RelpFlooderTask implements Callable { - private RelpConnection relpConnection = new RelpConnection(); - private int recordsSent = 0; - private int bytesSent = 0; + private long recordsSent = 0; + private long bytesSent = 0; private boolean stayRunning = true; private final RelpFlooderConfig relpFlooderConfig; private final int threadId; private final CountDownLatch latch = new CountDownLatch(1); - private final Iterator iterator; - RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator) throws RuntimeException { + private final Iterator iterator; + private final ClientFactory clientFactory; + private final RelpFrameFactory relpFrameFactory; + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooderTask.class); + RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator, ClientFactory clientFactory) throws RuntimeException { this.threadId = threadId; this.iterator = iterator; this.relpFlooderConfig = relpFlooderConfig; + this.clientFactory = clientFactory; + this.relpFrameFactory = new RelpFrameFactory(); } @Override public Object call() { - relpConnection = new RelpConnection(); - connect(); - while (stayRunning && iterator.hasNext()) { - byte[] message = iterator.next(); - RelpBatch relpBatch = new RelpBatch(); - relpBatch.insert(message); - try { - relpConnection.commit(relpBatch); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't commit batch: " + e.getMessage()); + LOGGER.debug("[Thread <{}>] Entering call()", threadId); + final String eventType = "syslog"; + final String ack = "200 OK"; + LOGGER.trace("[Thread <{}>] Trying to create a new client", threadId); + try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.target, relpFlooderConfig.port)).get(relpFlooderConfig.connectTimeout, TimeUnit.SECONDS)) { + LOGGER.trace("[Thread <{}>] Creating open connection RelpFrame", threadId); + CompletableFuture open = client.transmit(relpFrameFactory.create("open", "open")); + LOGGER.trace("[Thread <{}>] Getting OpenResponse from RelpFrame", threadId); + String openResponse = open.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got OpenResponse: <{}>", threadId, openResponse); + if (!openResponse.startsWith(ack)) { + throw new RuntimeException("Got unexpected response when opening connection: " + openResponse); } - if (!relpBatch.verifyTransactionAll()) { - throw new RuntimeException("Can't verify transactions"); + + + LOGGER.trace("[Thread <{}>] Entering main flooding loop", threadId); + while (stayRunning && iterator.hasNext()) { + String record = iterator.next(); + CompletableFuture syslog = client.transmit(relpFrameFactory.create(eventType, record)); + if (relpFlooderConfig.waitForAcks) { + String syslogResponse = syslog.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got syslog response: <{}>", threadId, syslogResponse); + if (!syslogResponse.equals(ack)) { + throw new RuntimeException("Got unexpected when sending records: " + syslogResponse); + } + } + recordsSent++; + bytesSent += record.length(); + LOGGER.trace("[Thread <{}>] Rercords sent: <{}>, bytes sent: <{}>", threadId, recordsSent, bytesSent); } - recordsSent++; - bytesSent += message.length; + LOGGER.trace("[Thread <{}>] Creating close connection RelpFrame", threadId); + CompletableFuture close = client.transmit(relpFrameFactory.create("close", "")); + LOGGER.trace("[Thread <{}>] Getting close Response from RelpFrame", threadId); + String closeResponse = close.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got close response: <{}>", threadId, closeResponse); + } catch (TimeoutException e) { + LOGGER.debug("[Thread <{}>] Received TimeoutException: <{}>", threadId, e.getMessage(), e); + } catch (RuntimeException | InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } - disconnect(); + LOGGER.trace("[Thread <{}>] Running latch.countDown()", threadId); latch.countDown(); + LOGGER.debug("[Thread <{}>] Exiting call();", threadId); return null; } - public void connect() { - try { - relpConnection.connect(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort()); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't connect properly: " + e.getMessage()); - } - } - - public void disconnect() { - try { - relpConnection.disconnect(); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't disconnect properly: ", e); - } - } - - public int getRecordsSent() { + public long getRecordsSent() { return recordsSent; } - public int getBytesSent() { + public long getBytesSent() { return bytesSent; } public int getThreadId() { return threadId; } public void stop() { + LOGGER.debug("[Thread <{}>] Entering stop()", threadId); stayRunning=false; try { + LOGGER.trace("[Thread <{}>] Waiting for latch to countDown", threadId); if(!latch.await(5L, TimeUnit.SECONDS)) { throw new RuntimeException("Timed out waiting for thread to shut down"); } } catch (InterruptedException e) { throw new RuntimeException(e); } + LOGGER.debug("[Thread <{}>] Exiting stop();", threadId); } }