From e7ca2438ef934f97b8735b58479e456db661c431 Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Wed, 15 May 2024 15:39:37 +0300 Subject: [PATCH 1/7] Use rlp_03 instead, one eventloop per thread --- README.adoc | 2 +- pom.xml | 6 +- .../rlp_09/ExampleRelpFlooderIterator.java | 2 +- .../java/com/teragrep/rlp_09/RelpFlooder.java | 17 ++-- .../teragrep/rlp_09/RelpFlooderConfig.java | 22 ++++- .../com/teragrep/rlp_09/RelpFlooderTask.java | 92 ++++++++++--------- 6 files changed, 84 insertions(+), 57 deletions(-) diff --git a/README.adoc b/README.adoc index 81a8776..15607dc 100644 --- a/README.adoc +++ b/README.adoc @@ -47,7 +47,7 @@ Example Iterator will send only static records like as [source,sh] ---- -<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 event +<14>1 2024-02-29T13:58:05.605Z localhost rlp_09 - - - Example rlo_09 record ---- == Configurations diff --git a/pom.xml b/pom.xml index 961400d..4ca5113 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 0.0.1 -SNAPSHOT - 4.0.1 + 7.0.2 @@ -40,8 +40,8 @@ com.teragrep - rlp_01 - ${rlp_01.version} + rlp_03 + ${rlp_03.version} diff --git a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java index 203fbba..c313d15 100644 --- a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java +++ b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java @@ -62,7 +62,7 @@ public class ExampleRelpFlooderIterator implements Iterator { .withHostname("localhost") .withFacility(Facility.USER) .withSeverity(Severity.INFORMATIONAL) - .withMsg("Example rlo_09 event") + .withMsg("Example rlo_09 record") .toRfc5424SyslogMessage() .getBytes(StandardCharsets.UTF_8); @Override diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java index de436f8..7cd8081 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java @@ -49,7 +49,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -58,32 +57,32 @@ public class RelpFlooder { private final ExecutorService executorService; List relpFlooderTaskList = new ArrayList<>(); - public HashMap getRecordsSentPerThread() { - HashMap recordsSent = new HashMap<>(); + public HashMap getRecordsSentPerThread() { + HashMap recordsSent = new HashMap<>(); for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ recordsSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getRecordsSent()); } return recordsSent; } - public int getTotalRecordsSent() { - int totalrecordsSent = 0; + public long getTotalRecordsSent() { + long totalrecordsSent = 0; for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ totalrecordsSent += relpFlooderTask.getRecordsSent(); } return totalrecordsSent; } - public HashMap getTotalBytesSentPerThread() { - HashMap bytesSent = new HashMap<>(); + public HashMap getTotalBytesSentPerThread() { + HashMap bytesSent = new HashMap<>(); for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ bytesSent.put(relpFlooderTask.getThreadId(), relpFlooderTask.getBytesSent()); } return bytesSent; } - public int getTotalBytesSent() { - int totalBytesSent = 0; + public long getTotalBytesSent() { + long totalBytesSent = 0; for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ totalBytesSent += relpFlooderTask.getBytesSent(); } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java index bfb28c3..e35ccc4 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java @@ -51,6 +51,9 @@ public class RelpFlooderConfig { private String target="127.0.0.1"; private int port=601; private int threads=1; + private int connectTimeout=1; + + private boolean waitForAcks=true; public void setTarget(String target) { this.target = target; @@ -63,6 +66,12 @@ public void setPort(int port) { public void setThreads(int threads) { this.threads = threads; } + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + public void setWaitForAcks(boolean waitForAcks) { + this.waitForAcks = waitForAcks; + } public String getTarget() { return target; @@ -76,12 +85,23 @@ public int getThreads() { return threads; } + public int getConnectTimeout() { + return connectTimeout; + } + + public boolean getWaitForAcks() { + return waitForAcks; + } + public RelpFlooderConfig() { } - public RelpFlooderConfig(String target, int port, int threads) { + public RelpFlooderConfig(String target, int port, int threads, int connectTimeout, boolean waitForAcks) { this.target = target; this.port = port; this.threads = threads; + this.connectTimeout = connectTimeout; + this.waitForAcks = waitForAcks; } + } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 778fcfd..403e3c9 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -46,20 +46,23 @@ package com.teragrep.rlp_09; -import com.teragrep.rlp_01.RelpBatch; -import com.teragrep.rlp_01.RelpConnection; +import com.teragrep.rlp_03.channel.context.ConnectContextFactory; +import com.teragrep.rlp_03.channel.socket.PlainFactory; +import com.teragrep.rlp_03.channel.socket.SocketFactory; +import com.teragrep.rlp_03.client.Client; +import com.teragrep.rlp_03.client.ClientFactory; +import com.teragrep.rlp_03.eventloop.EventLoop; +import com.teragrep.rlp_03.eventloop.EventLoopFactory; +import com.teragrep.rlp_03.frame.RelpFrame; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; class RelpFlooderTask implements Callable { - private RelpConnection relpConnection = new RelpConnection(); - private int recordsSent = 0; - private int bytesSent = 0; + private long recordsSent = 0; + private long bytesSent = 0; private boolean stayRunning = true; private final RelpFlooderConfig relpFlooderConfig; private final int threadId; @@ -73,48 +76,53 @@ class RelpFlooderTask implements Callable { @Override public Object call() { - relpConnection = new RelpConnection(); - connect(); - while (stayRunning && iterator.hasNext()) { - byte[] message = iterator.next(); - RelpBatch relpBatch = new RelpBatch(); - relpBatch.insert(message); - try { - relpConnection.commit(relpBatch); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't commit batch: " + e.getMessage()); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + SocketFactory socketFactory = new PlainFactory(); + ConnectContextFactory connectContextFactory = new ConnectContextFactory(executorService, socketFactory); + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + EventLoop eventLoop; + try { + eventLoop = eventLoopFactory.create(); + } catch (IOException e) { + throw new RuntimeException("Can't create EventLoop: " + e.getMessage()); + } + Thread eventLoopThread = new Thread(eventLoop); + eventLoopThread.start(); + ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop); + try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort())).get(relpFlooderConfig.getConnectTimeout(), TimeUnit.SECONDS)) { + CompletableFuture open = client.transmit("open", "rlp_09 says hi".getBytes()); + try (RelpFrame openResponse = open.get()) { + if(!openResponse.payload().toString().startsWith("200 OK")) { + throw new RuntimeException("Got unexpected response when opening connection: " + openResponse.payload().toString()); + } } - if (!relpBatch.verifyTransactionAll()) { - throw new RuntimeException("Can't verify transactions"); + + while (stayRunning && iterator.hasNext()) { + byte[] record = iterator.next(); + CompletableFuture syslog = client.transmit("syslog", record); + if(relpFlooderConfig.getWaitForAcks()) { + try (RelpFrame syslogResponse = syslog.get()) { + if (syslogResponse.payload().toString().equals("200 OK\n")) { + throw new RuntimeException("Got unexpected when sending records: " + syslogResponse.payload().toString()); + } + } + } + recordsSent++; + bytesSent += record.length; } - recordsSent++; - bytesSent += message.length; + } catch (RuntimeException | InterruptedException | TimeoutException | ExecutionException e) { + throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } - disconnect(); + eventLoop.stop(); + executorService.shutdown(); latch.countDown(); return null; } - public void connect() { - try { - relpConnection.connect(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort()); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't connect properly: " + e.getMessage()); - } - } - - public void disconnect() { - try { - relpConnection.disconnect(); - } catch (IOException | TimeoutException e) { - throw new RuntimeException("Can't disconnect properly: ", e); - } - } - - public int getRecordsSent() { + public long getRecordsSent() { return recordsSent; } - public int getBytesSent() { + public long getBytesSent() { return bytesSent; } public int getThreadId() { From df8548065b78b70e37a29fc5e9a52d7064f65981 Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Wed, 15 May 2024 15:47:42 +0300 Subject: [PATCH 2/7] Transmit session close at the end --- src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 403e3c9..4ac1f4c 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.*; @@ -110,6 +111,7 @@ public Object call() { recordsSent++; bytesSent += record.length; } + client.transmit("close", "".getBytes(StandardCharsets.UTF_8)); } catch (RuntimeException | InterruptedException | TimeoutException | ExecutionException e) { throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } From 928e8cb6d3dd442d1e94626c5bf0591ecd8033bd Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Thu, 16 May 2024 12:31:32 +0300 Subject: [PATCH 3/7] Do only one EventLoop and ClientFactory, pass ClientFactory to tasks --- .../java/com/teragrep/rlp_09/RelpFlooder.java | 30 ++++++++++++++++++- .../com/teragrep/rlp_09/RelpFlooderTask.java | 23 ++------------ 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java index 7cd8081..37132d8 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java @@ -46,6 +46,14 @@ package com.teragrep.rlp_09; +import com.teragrep.rlp_03.channel.context.ConnectContextFactory; +import com.teragrep.rlp_03.channel.socket.PlainFactory; +import com.teragrep.rlp_03.channel.socket.SocketFactory; +import com.teragrep.rlp_03.client.ClientFactory; +import com.teragrep.rlp_03.eventloop.EventLoop; +import com.teragrep.rlp_03.eventloop.EventLoopFactory; + +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -104,8 +112,21 @@ public RelpFlooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFacto this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.getThreads()); } public void start() { + SocketFactory socketFactory = new PlainFactory(); + ConnectContextFactory connectContextFactory = new ConnectContextFactory(Executors.newSingleThreadExecutor(), socketFactory); + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + EventLoop eventLoop; + try { + eventLoop = eventLoopFactory.create(); + } catch (IOException e) { + throw new RuntimeException("Can't create EventLoop: " + e.getMessage()); + } + Thread eventLoopThread = new Thread(eventLoop); + eventLoopThread.start(); + ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop); + for (int i=0; i { private final int threadId; private final CountDownLatch latch = new CountDownLatch(1); private final Iterator iterator; - RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator) throws RuntimeException { + private final ClientFactory clientFactory; + RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator, ClientFactory clientFactory) throws RuntimeException { this.threadId = threadId; this.iterator = iterator; this.relpFlooderConfig = relpFlooderConfig; + this.clientFactory = clientFactory; } @Override public Object call() { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - SocketFactory socketFactory = new PlainFactory(); - ConnectContextFactory connectContextFactory = new ConnectContextFactory(executorService, socketFactory); - EventLoopFactory eventLoopFactory = new EventLoopFactory(); - EventLoop eventLoop; - try { - eventLoop = eventLoopFactory.create(); - } catch (IOException e) { - throw new RuntimeException("Can't create EventLoop: " + e.getMessage()); - } - Thread eventLoopThread = new Thread(eventLoop); - eventLoopThread.start(); - ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop); try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort())).get(relpFlooderConfig.getConnectTimeout(), TimeUnit.SECONDS)) { CompletableFuture open = client.transmit("open", "rlp_09 says hi".getBytes()); try (RelpFrame openResponse = open.get()) { @@ -115,8 +100,6 @@ public Object call() { } catch (RuntimeException | InterruptedException | TimeoutException | ExecutionException e) { throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } - eventLoop.stop(); - executorService.shutdown(); latch.countDown(); return null; } From 51afb55d58ac0f85a1d8772e1f0278c9302f62ea Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Thu, 16 May 2024 15:42:01 +0300 Subject: [PATCH 4/7] Final Strings --- src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 507a3fe..61760d3 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -75,6 +75,8 @@ class RelpFlooderTask implements Callable { @Override public Object call() { + final String eventType = "syslog"; + final String ack = "200 OK\n"; try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort())).get(relpFlooderConfig.getConnectTimeout(), TimeUnit.SECONDS)) { CompletableFuture open = client.transmit("open", "rlp_09 says hi".getBytes()); try (RelpFrame openResponse = open.get()) { @@ -85,10 +87,10 @@ public Object call() { while (stayRunning && iterator.hasNext()) { byte[] record = iterator.next(); - CompletableFuture syslog = client.transmit("syslog", record); + CompletableFuture syslog = client.transmit(eventType, record); if(relpFlooderConfig.getWaitForAcks()) { try (RelpFrame syslogResponse = syslog.get()) { - if (syslogResponse.payload().toString().equals("200 OK\n")) { + if (syslogResponse.payload().toString().equals(ack)) { throw new RuntimeException("Got unexpected when sending records: " + syslogResponse.payload().toString()); } } From cf787d3142c697c27427f70928b656b5a1f3f1df Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:04:34 +0300 Subject: [PATCH 5/7] Updates rlp_03 settings --- pom.xml | 2 +- .../rlp_09/ExampleRelpFlooderIterator.java | 10 ++-- .../ExampleRelpFlooderIteratorFactory.java | 2 +- .../java/com/teragrep/rlp_09/RelpFlooder.java | 10 ++-- .../teragrep/rlp_09/RelpFlooderConfig.java | 52 ++++--------------- .../rlp_09/RelpFlooderIteratorFactory.java | 2 +- .../com/teragrep/rlp_09/RelpFlooderTask.java | 38 ++++++++------ 7 files changed, 44 insertions(+), 72 deletions(-) diff --git a/pom.xml b/pom.xml index 4ca5113..5557e9e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 0.0.1 -SNAPSHOT - 7.0.2 + 0.0.2-SNAPSHOT diff --git a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java index c313d15..0d0fa55 100644 --- a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java +++ b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIterator.java @@ -50,12 +50,11 @@ import com.teragrep.rlo_14.Severity; import com.teragrep.rlo_14.SyslogMessage; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Iterator; -public class ExampleRelpFlooderIterator implements Iterator { - private final byte[] record = +public class ExampleRelpFlooderIterator implements Iterator { + private final String record = new SyslogMessage() .withTimestamp(Instant.now().toEpochMilli()) .withAppName("rlp_09") @@ -63,15 +62,14 @@ public class ExampleRelpFlooderIterator implements Iterator { .withFacility(Facility.USER) .withSeverity(Severity.INFORMATIONAL) .withMsg("Example rlo_09 record") - .toRfc5424SyslogMessage() - .getBytes(StandardCharsets.UTF_8); + .toRfc5424SyslogMessage(); @Override public boolean hasNext() { return true; } @Override - public byte[] next() { + public String next() { return record; } } diff --git a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java index 3624a78..97c9952 100644 --- a/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java +++ b/src/main/java/com/teragrep/rlp_09/ExampleRelpFlooderIteratorFactory.java @@ -50,7 +50,7 @@ public class ExampleRelpFlooderIteratorFactory implements RelpFlooderIteratorFactory { @Override - public Iterator get(int threadId) { + public Iterator get(int threadId) { return new ExampleRelpFlooderIterator(); } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java index 37132d8..a9d5656 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java @@ -46,8 +46,8 @@ package com.teragrep.rlp_09; -import com.teragrep.rlp_03.channel.context.ConnectContextFactory; import com.teragrep.rlp_03.channel.socket.PlainFactory; +import com.teragrep.rlp_03.channel.context.ConnectContextFactory; import com.teragrep.rlp_03.channel.socket.SocketFactory; import com.teragrep.rlp_03.client.ClientFactory; import com.teragrep.rlp_03.eventloop.EventLoop; @@ -106,14 +106,16 @@ public RelpFlooder() { public RelpFlooder(RelpFlooderConfig relpFlooderConfig){ this(relpFlooderConfig, new ExampleRelpFlooderIteratorFactory()); } + public RelpFlooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFactory iteratorFactory) { this.relpFlooderConfig = relpFlooderConfig; this.iteratorFactory = iteratorFactory; - this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.getThreads()); + this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.threads); } + public void start() { SocketFactory socketFactory = new PlainFactory(); - ConnectContextFactory connectContextFactory = new ConnectContextFactory(Executors.newSingleThreadExecutor(), socketFactory); + ConnectContextFactory connectContextFactory = new ConnectContextFactory(Executors.newFixedThreadPool(relpFlooderConfig.threads), socketFactory); EventLoopFactory eventLoopFactory = new EventLoopFactory(); EventLoop eventLoop; try { @@ -125,7 +127,7 @@ public void start() { eventLoopThread.start(); ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop); - for (int i=0; i get(int threadId); + Iterator get(int threadId); } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 61760d3..08f5b8d 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -46,14 +46,14 @@ package com.teragrep.rlp_09; -import com.teragrep.rlp_03.channel.context.ConnectContextFactory; import com.teragrep.rlp_03.client.Client; import com.teragrep.rlp_03.client.ClientFactory; -import com.teragrep.rlp_03.eventloop.EventLoop; import com.teragrep.rlp_03.frame.RelpFrame; +import com.teragrep.rlp_03.frame.RelpFrameFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.*; @@ -64,42 +64,48 @@ class RelpFlooderTask implements Callable { private final RelpFlooderConfig relpFlooderConfig; private final int threadId; private final CountDownLatch latch = new CountDownLatch(1); - private final Iterator iterator; + private final Iterator iterator; private final ClientFactory clientFactory; - RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator, ClientFactory clientFactory) throws RuntimeException { + private final RelpFrameFactory relpFrameFactory; + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooderTask.class); + RelpFlooderTask(int threadId, RelpFlooderConfig relpFlooderConfig, Iterator iterator, ClientFactory clientFactory) throws RuntimeException { this.threadId = threadId; this.iterator = iterator; this.relpFlooderConfig = relpFlooderConfig; this.clientFactory = clientFactory; + this.relpFrameFactory = new RelpFrameFactory(); } @Override public Object call() { final String eventType = "syslog"; - final String ack = "200 OK\n"; - try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.getTarget(), relpFlooderConfig.getPort())).get(relpFlooderConfig.getConnectTimeout(), TimeUnit.SECONDS)) { - CompletableFuture open = client.transmit("open", "rlp_09 says hi".getBytes()); + final String ack = "200 OK"; + try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.target, relpFlooderConfig.port)).get(relpFlooderConfig.connectTimeout, TimeUnit.SECONDS)) { + CompletableFuture open = client.transmit(relpFrameFactory.create("open", "open")); try (RelpFrame openResponse = open.get()) { - if(!openResponse.payload().toString().startsWith("200 OK")) { + if (!openResponse.payload().toString().startsWith(ack)) { throw new RuntimeException("Got unexpected response when opening connection: " + openResponse.payload().toString()); } } while (stayRunning && iterator.hasNext()) { - byte[] record = iterator.next(); - CompletableFuture syslog = client.transmit(eventType, record); - if(relpFlooderConfig.getWaitForAcks()) { + String record = iterator.next(); + CompletableFuture syslog = client.transmit(relpFrameFactory.create(eventType, record)); + if (relpFlooderConfig.waitForAcks) { try (RelpFrame syslogResponse = syslog.get()) { - if (syslogResponse.payload().toString().equals(ack)) { + if (!syslogResponse.payload().toString().equals(ack)) { throw new RuntimeException("Got unexpected when sending records: " + syslogResponse.payload().toString()); } } } recordsSent++; - bytesSent += record.length; + bytesSent += record.length(); } - client.transmit("close", "".getBytes(StandardCharsets.UTF_8)); - } catch (RuntimeException | InterruptedException | TimeoutException | ExecutionException e) { + CompletableFuture close = client.transmit(relpFrameFactory.create("close", "")); + close.get(); + } catch (TimeoutException ignored) { + // Ignore Timeout if server has gone down and so on. + } catch (RuntimeException | InterruptedException | ExecutionException e) { throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } latch.countDown(); From 318293889cfa1772cc1db0d46030d0f8d32b0a40 Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:22:42 +0300 Subject: [PATCH 6/7] Adds a lot of logging --- pom.xml | 23 ++++++++++++ .../java/com/teragrep/rlp_09/RelpFlooder.java | 24 +++++++++++- .../teragrep/rlp_09/RelpFlooderConfig.java | 17 +++++++++ .../com/teragrep/rlp_09/RelpFlooderTask.java | 37 +++++++++++++------ 4 files changed, 88 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 5557e9e..85b91ac 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,8 @@ -SNAPSHOT 0.0.2-SNAPSHOT + 2.23.1 + 2.0.13 @@ -49,6 +51,27 @@ rlo_14 1.0.1 + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-slf4j2-impl + ${log4j2.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + ${artifactId}-${revision}${changelist}${sha1} diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java index a9d5656..5818a53 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooder.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooder.java @@ -52,6 +52,8 @@ import com.teragrep.rlp_03.client.ClientFactory; import com.teragrep.rlp_03.eventloop.EventLoop; import com.teragrep.rlp_03.eventloop.EventLoopFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -64,7 +66,7 @@ public class RelpFlooder { private final ExecutorService executorService; List relpFlooderTaskList = new ArrayList<>(); - + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooder.class); public HashMap getRecordsSentPerThread() { HashMap recordsSent = new HashMap<>(); for(RelpFlooderTask relpFlooderTask : relpFlooderTaskList){ @@ -113,42 +115,60 @@ public RelpFlooder(RelpFlooderConfig relpFlooderConfig, RelpFlooderIteratorFacto this.executorService = Executors.newFixedThreadPool(relpFlooderConfig.threads); } - public void start() { + public void start() { + LOGGER.trace("Creating a PlainFactory"); SocketFactory socketFactory = new PlainFactory(); + LOGGER.trace("Creating ConnectContextFactory"); ConnectContextFactory connectContextFactory = new ConnectContextFactory(Executors.newFixedThreadPool(relpFlooderConfig.threads), socketFactory); + LOGGER.trace("Creating EventLoopfactory"); EventLoopFactory eventLoopFactory = new EventLoopFactory(); EventLoop eventLoop; try { + LOGGER.trace("Creating EventLoop from EventLoopFactory"); eventLoop = eventLoopFactory.create(); } catch (IOException e) { throw new RuntimeException("Can't create EventLoop: " + e.getMessage()); } + LOGGER.trace("Creating EventLoopThread"); Thread eventLoopThread = new Thread(eventLoop); + LOGGER.trace("Starting EventLoopThread"); eventLoopThread.start(); + LOGGER.trace("Creating ClientFactory"); ClientFactory clientFactory = new ClientFactory(connectContextFactory, eventLoop); for (int i=0; i", i); RelpFlooderTask relpFlooderTask = new RelpFlooderTask(i, relpFlooderConfig, iteratorFactory.get(i), clientFactory); relpFlooderTaskList.add(relpFlooderTask); } + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Got <{}> tasks in RelpFlooderTaskList: <{}>", relpFlooderTaskList.size(), relpFlooderTaskList); + } try { + LOGGER.trace("Invoking all RelpFlooderTaskList tasks"); List> futures = executorService.invokeAll(relpFlooderTaskList); for(Future future : futures) { + LOGGER.trace("Waiting for future <{}>", future); future.get(); } } catch (Exception e) { throw new RuntimeException(e); } + LOGGER.trace("Stopping EventLoop"); eventLoop.stop(); try { + LOGGER.trace("Joining EventLoopThread"); eventLoopThread.join(); } catch (InterruptedException e) { throw new RuntimeException(e); } + LOGGER.trace("Shutting down ExecutorService"); executorService.shutdown(); } public void stop() { + LOGGER.trace("Entering stop(), running RelpFlooderTask::stop"); relpFlooderTaskList.parallelStream().forEach(RelpFlooderTask::stop); + LOGGER.trace("Exiting stop()"); } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java index 9f08291..736da69 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderConfig.java @@ -46,8 +46,12 @@ package com.teragrep.rlp_09; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class RelpFlooderConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFlooderConfig.class); public final String target; public final int port; public final int threads; @@ -68,6 +72,19 @@ public RelpFlooderConfig(String target, int port, int threads, int connectTimeou this.threads = threads; this.connectTimeout = connectTimeout; this.waitForAcks = waitForAcks; + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Got RelpFlooderConfig: <{}>", this); + } } + @Override + public String toString() { + return "RelpFlooderConfig{" + + "target='" + target + '\'' + + ", port=" + port + + ", threads=" + threads + + ", connectTimeout=" + connectTimeout + + ", waitForAcks=" + waitForAcks + + '}'; + } } diff --git a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java index 08f5b8d..0fbe8ab 100644 --- a/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java +++ b/src/main/java/com/teragrep/rlp_09/RelpFlooderTask.java @@ -78,37 +78,49 @@ class RelpFlooderTask implements Callable { @Override public Object call() { + LOGGER.debug("[Thread <{}>] Entering call()", threadId); final String eventType = "syslog"; final String ack = "200 OK"; + LOGGER.trace("[Thread <{}>] Trying to create a new client", threadId); try (Client client = clientFactory.open(new InetSocketAddress(relpFlooderConfig.target, relpFlooderConfig.port)).get(relpFlooderConfig.connectTimeout, TimeUnit.SECONDS)) { + LOGGER.trace("[Thread <{}>] Creating open connection RelpFrame", threadId); CompletableFuture open = client.transmit(relpFrameFactory.create("open", "open")); - try (RelpFrame openResponse = open.get()) { - if (!openResponse.payload().toString().startsWith(ack)) { - throw new RuntimeException("Got unexpected response when opening connection: " + openResponse.payload().toString()); - } + LOGGER.trace("[Thread <{}>] Getting OpenResponse from RelpFrame", threadId); + String openResponse = open.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got OpenResponse: <{}>", threadId, openResponse); + if (!openResponse.startsWith(ack)) { + throw new RuntimeException("Got unexpected response when opening connection: " + openResponse); } + + LOGGER.trace("[Thread <{}>] Entering main flooding loop", threadId); while (stayRunning && iterator.hasNext()) { String record = iterator.next(); CompletableFuture syslog = client.transmit(relpFrameFactory.create(eventType, record)); if (relpFlooderConfig.waitForAcks) { - try (RelpFrame syslogResponse = syslog.get()) { - if (!syslogResponse.payload().toString().equals(ack)) { - throw new RuntimeException("Got unexpected when sending records: " + syslogResponse.payload().toString()); - } + String syslogResponse = syslog.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got syslog response: <{}>", threadId, syslogResponse); + if (!syslogResponse.equals(ack)) { + throw new RuntimeException("Got unexpected when sending records: " + syslogResponse); } } recordsSent++; bytesSent += record.length(); + LOGGER.trace("[Thread <{}>] Rercords sent: <{}>, bytes sent: <{}>", threadId, recordsSent, bytesSent); } + LOGGER.trace("[Thread <{}>] Creating close connection RelpFrame", threadId); CompletableFuture close = client.transmit(relpFrameFactory.create("close", "")); - close.get(); - } catch (TimeoutException ignored) { - // Ignore Timeout if server has gone down and so on. + LOGGER.trace("[Thread <{}>] Getting close Response from RelpFrame", threadId); + String closeResponse = close.get().payload().toString(); + LOGGER.debug("[Thread <{}>] Got close response: <{}>", threadId, closeResponse); + } catch (TimeoutException e) { + LOGGER.debug("[Thread <{}>] Received TimeoutException: <{}>", threadId, e.getMessage(), e); } catch (RuntimeException | InterruptedException | ExecutionException e) { throw new RuntimeException("Failed to run flooder: " + e.getMessage()); } + LOGGER.trace("[Thread <{}>] Running latch.countDown()", threadId); latch.countDown(); + LOGGER.debug("[Thread <{}>] Exiting call();", threadId); return null; } @@ -122,13 +134,16 @@ public int getThreadId() { return threadId; } public void stop() { + LOGGER.debug("[Thread <{}>] Entering stop()", threadId); stayRunning=false; try { + LOGGER.trace("[Thread <{}>] Waiting for latch to countDown", threadId); if(!latch.await(5L, TimeUnit.SECONDS)) { throw new RuntimeException("Timed out waiting for thread to shut down"); } } catch (InterruptedException e) { throw new RuntimeException(e); } + LOGGER.debug("[Thread <{}>] Exiting stop();", threadId); } } From 883973d0b76f44a9ea070032e528a2d7646f0cea Mon Sep 17 00:00:00 2001 From: StrongestNumber9 <16169054+StrongestNumber9@users.noreply.github.com> Date: Tue, 18 Jun 2024 08:58:31 +0300 Subject: [PATCH 7/7] Use java 17 --- .github/workflows/release.yaml | 4 ++-- pom.xml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index e873186..3ed29c2 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -21,7 +21,7 @@ jobs: - name: Setup Maven Central uses: actions/setup-java@v2 with: - java-version: 8.0.292+10 + java-version: 17 distribution: 'adopt' server-id: ossrh @@ -42,7 +42,7 @@ jobs: - name: Setup GitHub Packages uses: actions/setup-java@v2 with: - java-version: 8.0.292+10 + java-version: 17 distribution: 'adopt' - name: Publish to GitHub Packages diff --git a/pom.xml b/pom.xml index 85b91ac..53b00d4 100644 --- a/pom.xml +++ b/pom.xml @@ -9,9 +9,9 @@ https://teragrep.com UTF-8 - 8 - 8 - 8 + 17 + 17 + 17 0.0.1 -SNAPSHOT @@ -143,7 +143,7 @@ jar - 8 + 17