Skip to content

Commit da47f35

Browse files
authored
add postgreSQL type, tests and sample data. (#40)
* add postgreSQL type, tests and sample data. * make elems variable final in PostgreSQLType * replace try-catch with assertDoesNotThrow in PostgreSQLTypeTest and make testEvent arguments final * change appName from AppType key to properties.message->db name.
1 parent 67e09ac commit da47f35

5 files changed

Lines changed: 520 additions & 12 deletions

File tree

src/main/java/com/teragrep/nlf_01/NLFPlugin.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,17 @@ else if (jsonObject.getString("Type").equals("ContainerLogV2")) {
118118
else if (jsonObject.getString("Type").equals("Syslog")) {
119119
eventTypes.add(new SyslogType(parsedEvent, syslogExpectedProcessName, realHostname));
120120
}
121-
else {
122-
throw new PluginException(
123-
new IllegalArgumentException("Invalid event type: " + jsonObject.getString("Type"))
124-
);
121+
}
122+
else if (
123+
jsonObject.containsKey("AppType")
124+
&& jsonObject.get("AppType").getValueType().equals(JsonValue.ValueType.STRING)
125+
) {
126+
if (jsonObject.getString("AppType").equals("PostgreSQL")) {
127+
eventTypes.add(new PostgreSQLType(parsedEvent, realHostname));
125128
}
126129
}
127-
else {
130+
131+
if (eventTypes.isEmpty()) {
128132
throw new PluginException(
129133
new IllegalArgumentException("Event was not of expected log format or type was not found")
130134
);
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Teragrep Neon log format plugin for AKV_01
3+
* Copyright (C) 2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.nlf_01.types;
47+
48+
import com.teragrep.akv_01.event.ParsedEvent;
49+
import com.teragrep.akv_01.plugin.PluginException;
50+
import com.teragrep.nlf_01.PropertiesJson;
51+
import com.teragrep.nlf_01.util.*;
52+
import com.teragrep.rlo_14.Facility;
53+
import com.teragrep.rlo_14.SDElement;
54+
import com.teragrep.rlo_14.Severity;
55+
import jakarta.json.JsonObject;
56+
import jakarta.json.JsonValue;
57+
58+
import java.time.Instant;
59+
import java.util.HashSet;
60+
import java.util.Set;
61+
import java.util.UUID;
62+
import java.util.regex.Matcher;
63+
import java.util.regex.Pattern;
64+
65+
public final class PostgreSQLType implements EventType {
66+
67+
private final ParsedEvent parsedEvent;
68+
private final String realHostname;
69+
private final Pattern appNamePattern;
70+
71+
public PostgreSQLType(final ParsedEvent parsedEvent, final String realHostname) {
72+
this(parsedEvent, realHostname, Pattern.compile("^.*?db=(?<dbName>.*?),"));
73+
}
74+
75+
public PostgreSQLType(final ParsedEvent parsedEvent, final String realHostname, final Pattern appNamePattern) {
76+
this.parsedEvent = parsedEvent;
77+
this.realHostname = realHostname;
78+
this.appNamePattern = appNamePattern;
79+
}
80+
81+
private void assertKey(final JsonObject obj, final String key, final JsonValue.ValueType type)
82+
throws PluginException {
83+
if (!obj.containsKey(key)) {
84+
throw new PluginException(new IllegalArgumentException("Key " + key + " does not exist"));
85+
}
86+
87+
if (!obj.get(key).getValueType().equals(type)) {
88+
throw new PluginException(new IllegalArgumentException("Key " + key + " is not of type " + type));
89+
}
90+
}
91+
92+
@Override
93+
public Severity severity() throws PluginException {
94+
return Severity.NOTICE;
95+
}
96+
97+
@Override
98+
public Facility facility() throws PluginException {
99+
return Facility.AUDIT;
100+
}
101+
102+
@Override
103+
public String hostname() throws PluginException {
104+
final JsonObject record = parsedEvent.asJsonStructure().asJsonObject();
105+
106+
assertKey(record, "resourceId", JsonValue.ValueType.STRING);
107+
final String resourceId = record.getString("resourceId");
108+
109+
return new ValidRFC5424Hostname(
110+
"md5-".concat(new MD5Hash(resourceId).md5().concat("-").concat(new ASCIIString(new ResourceId(resourceId).resourceName()).withNonAsciiCharsRemoved()))
111+
).hostnameWithInvalidCharsRemoved();
112+
}
113+
114+
@Override
115+
public String appName() throws PluginException {
116+
final JsonObject record = parsedEvent.asJsonStructure().asJsonObject();
117+
118+
assertKey(record, "properties", JsonValue.ValueType.OBJECT);
119+
final JsonObject properties = record.getJsonObject("properties");
120+
assertKey(properties, "message", JsonValue.ValueType.STRING);
121+
final String message = properties.getString("message");
122+
123+
final Matcher matcher = appNamePattern.matcher(message);
124+
if (matcher.find()) {
125+
final String dbName = matcher.group("dbName");
126+
if (dbName == null) {
127+
throw new PluginException("Capture group 'dbName' was not found");
128+
}
129+
return new ValidRFC5424AppName(new ASCIIString(dbName).withNonAsciiCharsRemoved()).validAppName();
130+
}
131+
132+
throw new PluginException("Could not parse dbName from properties.message");
133+
}
134+
135+
@Override
136+
public long timestamp() throws PluginException {
137+
final JsonObject record = parsedEvent.asJsonStructure().asJsonObject();
138+
assertKey(record, "time", JsonValue.ValueType.STRING);
139+
final String time = record.getString("time");
140+
141+
return new ValidRFC5424Timestamp(time).validTimestamp();
142+
}
143+
144+
@Override
145+
public Set<SDElement> sdElements() throws PluginException {
146+
final Set<SDElement> elems = new HashSet<>();
147+
String time = "";
148+
if (!parsedEvent.enqueuedTimeUtc().isStub()) {
149+
time = parsedEvent.enqueuedTimeUtc().zonedDateTime().toString();
150+
}
151+
152+
String fullyQualifiedNamespace = "";
153+
String eventHubName = "";
154+
String partitionId = "";
155+
String consumerGroup = "";
156+
if (!parsedEvent.partitionCtx().isStub()) {
157+
fullyQualifiedNamespace = String
158+
.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("FullyQualifiedNamespace", ""));
159+
eventHubName = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("EventHubName", ""));
160+
partitionId = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("PartitionId", ""));
161+
consumerGroup = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("ConsumerGroup", ""));
162+
}
163+
164+
elems
165+
.add(new SDElement("aer_02_partition@48577").addSDParam("fully_qualified_namespace", fullyQualifiedNamespace).addSDParam("eventhub_name", eventHubName).addSDParam("partition_id", partitionId).addSDParam("consumer_group", consumerGroup));
166+
167+
elems
168+
.add(new SDElement("event_id@48577").addSDParam("uuid", UUID.randomUUID().toString()).addSDParam("hostname", realHostname).addSDParam("unixtime", Instant.now().toString()).addSDParam("id_source", "aer_02"));
169+
170+
String partitionKey = "";
171+
if (!parsedEvent.systemProperties().isStub()) {
172+
partitionKey = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("PartitionKey", ""));
173+
}
174+
175+
String offset = "";
176+
if (!parsedEvent.offset().isStub()) {
177+
offset = parsedEvent.offset().value();
178+
}
179+
180+
elems
181+
.add(new SDElement("aer_02_event@48577").addSDParam("offset", offset).addSDParam("enqueued_time", time).addSDParam("partition_key", partitionKey).addSDParam("properties", new PropertiesJson(parsedEvent.properties()).toJsonObject().toString()));
182+
183+
elems
184+
.add(new SDElement("aer_02@48577").addSDParam("timestamp_source", time.isEmpty() ? "generated" : "timeEnqueued"));
185+
186+
elems.add(new SDElement("nlf_01@48577").addSDParam("eventType", this.getClass().getSimpleName()));
187+
188+
return elems;
189+
}
190+
191+
@Override
192+
public String msgId() throws PluginException {
193+
String sequenceNumber = "";
194+
if (!parsedEvent.systemProperties().isStub()) {
195+
sequenceNumber = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("SequenceNumber", ""));
196+
}
197+
return sequenceNumber;
198+
}
199+
200+
@Override
201+
public String msg() throws PluginException {
202+
return parsedEvent.asString();
203+
}
204+
}

src/test/java/com/teragrep/nlf_01/NLFPluginTest.java

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,8 @@
6161
import com.teragrep.akv_01.plugin.PluginException;
6262
import com.teragrep.nlf_01.fakes.EmptySourceable;
6363
import com.teragrep.nlf_01.fakes.FakeSourceable;
64-
import com.teragrep.nlf_01.types.AppInsightType;
65-
import com.teragrep.nlf_01.types.ContainerType;
66-
import com.teragrep.nlf_01.types.SyslogType;
67-
import com.teragrep.rlo_14.SDElement;
68-
import com.teragrep.rlo_14.SDParam;
69-
import com.teragrep.rlo_14.SyslogMessage;
64+
import com.teragrep.nlf_01.types.*;
65+
import com.teragrep.rlo_14.*;
7066
import org.junit.jupiter.api.Assertions;
7167
import org.junit.jupiter.api.Test;
7268

@@ -313,6 +309,59 @@ void syslogType() {
313309
Assertions.assertTrue(sdElementMap.get("aer_02_event@48577").containsKey("properties"));
314310
}
315311

312+
@Test
313+
void testPostgreSQLType() {
314+
final String json = Assertions
315+
.assertDoesNotThrow(() -> Files.readString(Paths.get("src/test/resources/postgre.json")));
316+
final ParsedEvent parsedEvent = new ParsedEventFactory(
317+
new UnparsedEventImpl(json, new EventPartitionContextImpl(new HashMap<>()), new EventPropertiesImpl(new HashMap<>()), new EventSystemPropertiesImpl(new HashMap<>()), new EnqueuedTimeImpl("2020-01-01T00:00:00"), new EventOffsetImpl("0"))
318+
).parsedEvent();
319+
320+
final NLFPlugin plugin = new NLFPlugin(new FakeSourceable());
321+
final List<SyslogMessage> syslogMessages = Assertions
322+
.assertDoesNotThrow(() -> plugin.syslogMessage(parsedEvent));
323+
Assertions.assertEquals(1, syslogMessages.size());
324+
325+
final SyslogMessage syslogMessage = syslogMessages.get(0);
326+
327+
Assertions
328+
.assertEquals(
329+
"{\n" + " \"AppImage\":\"cinnamon/postgres_standalone_12_a1:12.3.456789\",\n"
330+
+ " \"AppType\":\"PostgreSQL\",\n"
331+
+ " \"AppVersion\":\"abcdefghj12_2020-01-01-12-34-56\",\n"
332+
+ " \"Region\":\"countrycentral\",\n" + " \"category\":\"PostgreSQLLogs\",\n"
333+
+ " \"location\":\"countrycentral\",\n" + " \"operationName\":\"LogEvent\",\n"
334+
+ " \"properties\":\n" + " {\n"
335+
+ " \"timestamp\":\"2020-10-01 11:59:26.256 UTC\",\n"
336+
+ " \"processId\":1234567,\n" + " \"errorLevel\":\"LOG\",\n"
337+
+ " \"sqlerrcode\":\"00000\",\n" + " \"backend_type\":\"client backend\",\n"
338+
+ " \"message\":\"2020-10-01 11:59:26 UTC-12abcd3e.4f5678-user=user012,db=dbase_maintenance,app=[unknown],client=127.0.0.1LOG: AUDIT: SESSION,4,1,WRITE,INSERT,,,\\\"insert into test.abcmover (id, update_time) select 1, now() on conflict on constraint abcmover_pk do update set id = test.abcmover.id+1, update_time=now()\\\",<not logged>\"\n"
339+
+ " },\n"
340+
+ " \"resourceId\":\"/SUBSCRIPTIONS/uuid/RESOURCEGROUPS/ab-cd-efgh-ijklmn-xx-DEV-01/PROVIDERS/postgres-db/FLEXIBLESERVERS/efgh-ijklmn-xx-DEV-01\",\n"
341+
+ " \"time\":\"2020-10-01T11:59:26.256Z\",\n" + " \"ServerType\":\"PostgreSQL\",\n"
342+
+ " \"LogicalServerName\":\"efgh-ijklmn-xx-DEV-01\",\n"
343+
+ " \"ServerVersion\":\"abcdefghj12_2020-01-01-12-34-56\",\n"
344+
+ " \"ServerLocation\":\"prod:countrycentral\",\n" + " \"ReplicaRole\":\"Primary\",\n"
345+
+ " \"OriginalPrimaryServerName\":\"efgh-ijklmn-xx-DEV-01\"\n" + "}",
346+
syslogMessage.getMsg()
347+
);
348+
Assertions
349+
.assertEquals("md5-bfd1db26c3c4f8a2936317cf4ec729ea-efgh-ijklmn-xx-DEV-01", syslogMessage.getHostname());
350+
Assertions.assertEquals("dbase_maintenance", syslogMessage.getAppName());
351+
Assertions.assertEquals("2020-10-01T11:59:26.256Z", syslogMessage.getTimestamp());
352+
353+
final Map<String, Map<String, String>> sdElementMap = syslogMessage
354+
.getSDElements()
355+
.stream()
356+
.collect(Collectors.toMap((SDElement::getSdID), (sdElem) -> sdElem.getSdParams().stream().collect(Collectors.toMap(SDParam::getParamName, SDParam::getParamValue))));
357+
358+
Assertions.assertEquals(1, sdElementMap.get("nlf_01@48577").size());
359+
Assertions
360+
.assertEquals(PostgreSQLType.class.getSimpleName(), sdElementMap.get("nlf_01@48577").get("eventType"));
361+
362+
Assertions.assertTrue(sdElementMap.get("aer_02_event@48577").containsKey("properties"));
363+
}
364+
316365
@Test
317366
void unexpectedType() {
318367
final String json = Assertions
@@ -333,7 +382,7 @@ void unexpectedType() {
333382
.assertThrows(PluginException.class, () -> plugin.syslogMessage(parsedEvent));
334383
Assertions
335384
.assertEquals(
336-
"java.lang.IllegalArgumentException: Invalid event type: unexpected",
385+
"java.lang.IllegalArgumentException: Event was not of expected log format or type was not found",
337386
pluginException.getMessage()
338387
);
339388
}

0 commit comments

Comments
 (0)