From 1d03b1be8652d5dffcf7789ffd26e28fa4ef8035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Mon, 30 Oct 2023 15:34:04 +0100 Subject: [PATCH] add syslog service --- NOTICE.txt | 13 + build.gradle | 1 + gradle.properties | 2 +- settings.gradle | 2 + src/main/java/module-info.java | 5 + .../java/org/xbib/event/EventConsumer.java | 4 +- .../java/org/xbib/event/EventManager.java | 14 + .../event/clock/SimpleClockEventConsumer.java | 5 + .../event/io/file/FileFollowEventManager.java | 7 +- .../xbib/event/syslog/CEFMessageParser.java | 131 +++++++ .../event/syslog/DefaultSyslogMessage.java | 314 ++++++++++++++++ .../event/syslog/DefaultSyslogRequest.java | 63 ++++ .../org/xbib/event/syslog/EncoderHelper.java | 41 +++ .../java/org/xbib/event/syslog/Facility.java | 193 ---------- .../org/xbib/event/syslog/JsonParser.java | 346 ------------------ .../java/org/xbib/event/syslog/Message.java | 94 +++++ .../org/xbib/event/syslog/MessageEncoder.java | 125 +++++++ .../org/xbib/event/syslog/MessageKey.java | 7 + .../org/xbib/event/syslog/MessageParser.java | 287 +++++---------- .../org/xbib/event/syslog/MessageType.java | 20 + .../java/org/xbib/event/syslog/Priority.java | 18 + .../event/syslog/RFC3164MessageParser.java | 46 +++ .../event/syslog/RFC5424MessageParser.java | 60 +++ .../java/org/xbib/event/syslog/Severity.java | 123 ------- .../xbib/event/syslog/SyslogEventManager.java | 43 +++ .../xbib/event/syslog/SyslogFrameDecoder.java | 37 ++ .../event/syslog/SyslogIdleStateHandler.java | 25 ++ .../org/xbib/event/syslog/SyslogMessage.java | 9 + .../event/syslog/SyslogMessageHandler.java | 48 +++ .../xbib/event/syslog/SyslogMessageKey.java | 4 + .../org/xbib/event/syslog/SyslogRequest.java | 39 ++ .../org/xbib/event/syslog/SyslogService.java | 307 +++++----------- .../event/syslog/TCPSyslogMessageDecoder.java | 39 ++ .../event/syslog/UDPSyslogMessageDecoder.java | 40 ++ .../event/clock/TestClockEventConsumer.java | 5 + .../io/file/TestFileFollowEventConsumer.java | 7 +- .../xbib/event/syslog/SyslogServiceTest.java | 20 + .../event/timer/TestTimerEventConsumer.java | 5 + 38 files changed, 1478 insertions(+), 1071 deletions(-) create mode 100644 src/main/java/org/xbib/event/syslog/CEFMessageParser.java create mode 100644 src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java create mode 100644 src/main/java/org/xbib/event/syslog/DefaultSyslogRequest.java create mode 100644 src/main/java/org/xbib/event/syslog/EncoderHelper.java delete mode 100644 src/main/java/org/xbib/event/syslog/Facility.java delete mode 100644 src/main/java/org/xbib/event/syslog/JsonParser.java create mode 100644 src/main/java/org/xbib/event/syslog/Message.java create mode 100644 src/main/java/org/xbib/event/syslog/MessageEncoder.java create mode 100644 src/main/java/org/xbib/event/syslog/MessageKey.java create mode 100644 src/main/java/org/xbib/event/syslog/MessageType.java create mode 100644 src/main/java/org/xbib/event/syslog/Priority.java create mode 100644 src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java create mode 100644 src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java delete mode 100644 src/main/java/org/xbib/event/syslog/Severity.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogEventManager.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogFrameDecoder.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogIdleStateHandler.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogMessage.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogMessageHandler.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogMessageKey.java create mode 100644 src/main/java/org/xbib/event/syslog/SyslogRequest.java create mode 100644 src/main/java/org/xbib/event/syslog/TCPSyslogMessageDecoder.java create mode 100644 src/main/java/org/xbib/event/syslog/UDPSyslogMessageDecoder.java create mode 100644 src/test/java/org/xbib/event/syslog/SyslogServiceTest.java diff --git a/NOTICE.txt b/NOTICE.txt index 94ae1f0..9e1ddba 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -23,6 +23,7 @@ branch 4.1 as of 26 Apr 2023. Licence: Apache 2.0 +----------------- The work in org.xbib.event.io is based upon the work in @@ -38,6 +39,8 @@ branched as of 23 December, 2021 License: Apache 2.0 +--------------- + The work in org.xbib.event.bus is taken from Guava https://github.com/google/guava @@ -45,3 +48,13 @@ https://github.com/google/guava as of 27 August, 2022 License: Apache 2.0 + +--------------- + +The work in org.xbib.event.syslog is based upon netty-codec-syslog + +https://github.com/jcustenborder/netty-codec-syslog + +as of 30 October, 2023 + +License: Apache 2.0 diff --git a/build.gradle b/build.gradle index 286b2b2..786b372 100644 --- a/build.gradle +++ b/build.gradle @@ -45,6 +45,7 @@ dependencies { implementation libs.time implementation libs.datastructures.common implementation libs.datastructures.json.tiny + implementation libs.netty.handler implementation libs.reactivestreams testImplementation libs.rxjava3 testImplementation libs.settings.datastructures.json diff --git a/gradle.properties b/gradle.properties index 607456a..7cc3384 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = org.xbib name = event -version = 0.0.4 +version = 0.0.5 org.gradle.warning.mode = ALL diff --git a/settings.gradle b/settings.gradle index 563fc0d..cddbd06 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,7 +18,9 @@ dependencyResolutionManagement { version('gradle', '8.4') version('groovy', '4.0.13') version('datastructures', '5.0.5') + version('netty', '4.1.100.Final') version('net', '4.0.0') + library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('net', 'org.xbib', 'net').versionRef('net') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index a3c4e4e..e296def 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -13,6 +13,11 @@ module org.xbib.event { exports org.xbib.event.util; exports org.xbib.event.yield; exports org.xbib.event; + requires io.netty.buffer; + requires io.netty.common; + requires io.netty.transport; + requires io.netty.handler; + requires io.netty.codec; requires org.xbib.datastructures.api; requires org.xbib.datastructures.common; requires org.xbib.datastructures.json.tiny; diff --git a/src/main/java/org/xbib/event/EventConsumer.java b/src/main/java/org/xbib/event/EventConsumer.java index 1aa7d09..65171e7 100644 --- a/src/main/java/org/xbib/event/EventConsumer.java +++ b/src/main/java/org/xbib/event/EventConsumer.java @@ -1,4 +1,6 @@ package org.xbib.event; -public interface EventConsumer { +import java.io.Closeable; + +public interface EventConsumer extends Closeable { } diff --git a/src/main/java/org/xbib/event/EventManager.java b/src/main/java/org/xbib/event/EventManager.java index 09695e6..e298fa5 100644 --- a/src/main/java/org/xbib/event/EventManager.java +++ b/src/main/java/org/xbib/event/EventManager.java @@ -6,6 +6,7 @@ import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.clock.ClockEventManager; import org.xbib.event.io.file.FileFollowEventManager; import org.xbib.event.io.path.PathEventManager; +import org.xbib.event.syslog.SyslogEventManager; import org.xbib.event.timer.TimerEventManager; import org.xbib.settings.Settings; @@ -44,11 +45,14 @@ public final class EventManager implements Closeable { private final PathEventManager pathEventManager; + private final SyslogEventManager syslogEventManager; + private EventManager(Settings settings) { this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader); this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault()); this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader); this.pathEventManager = new PathEventManager(settings, eventBus, executorService, classLoader); + this.syslogEventManager = new SyslogEventManager(settings, eventBus); } public static EventManager newEventManager(Settings settings) { @@ -81,6 +85,10 @@ public final class EventManager implements Closeable { return pathEventManager; } + public SyslogEventManager getSyslogEventManager() { + return syslogEventManager; + } + @Override public boolean equals(Object obj) { return super.equals(obj); @@ -88,8 +96,14 @@ public final class EventManager implements Closeable { @Override public void close() throws IOException { + for (EventConsumer eventConsumer : eventConsumers) { + eventConsumer.close(); + } clockEventManager.close(); + timerEventManager.close(); + fileFollowEventManager.close(); pathEventManager.close(); + syslogEventManager.close(); } private static class EventManagerExceptionHandler implements SubscriberExceptionHandler { diff --git a/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java b/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java index f7f75e8..30c8624 100644 --- a/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java +++ b/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java @@ -2,6 +2,7 @@ package org.xbib.event.clock; import org.xbib.event.EventConsumer; +import java.io.IOException; import java.util.logging.Logger; import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.Subscribe; @@ -18,4 +19,8 @@ public class SimpleClockEventConsumer implements EventConsumer { void onEvent(ClockEvent event) { logger.info("received demo clock event, instant = " + event.getInstant()); } + + @Override + public void close() throws IOException { + } } diff --git a/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java index 9b25962..affaf07 100644 --- a/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java +++ b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java @@ -3,6 +3,8 @@ package org.xbib.event.io.file; import org.xbib.event.bus.AsyncEventBus; import org.xbib.settings.Settings; +import java.io.Closeable; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedHashMap; @@ -14,7 +16,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; -public class FileFollowEventManager { +public class FileFollowEventManager implements Closeable { private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName()); @@ -49,7 +51,8 @@ public class FileFollowEventManager { } } - public void close() { + @Override + public void close() throws IOException { for (Map.Entry, FileFollowEventService> entry : eventServiceMap.entrySet()) { entry.getValue().setKeepWatching(false); entry.getKey().cancel(true); diff --git a/src/main/java/org/xbib/event/syslog/CEFMessageParser.java b/src/main/java/org/xbib/event/syslog/CEFMessageParser.java new file mode 100644 index 0000000..ac42de8 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/CEFMessageParser.java @@ -0,0 +1,131 @@ +package org.xbib.event.syslog; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +public class CEFMessageParser extends MessageParser { + + private static final String CEF_PREFIX_PATTERN = "^(<(?\\d+)>)?(?([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?\\S+)\\s+CEF:(?\\d+)\\|(?.*)$"; + + private static final String CEF_MAIN_PATTERN = "(? matcherCEFPrefix; + + private final ThreadLocal matcherCEFMain; + + private final ThreadLocal matcherCEFExtension; + + public CEFMessageParser() { + this.matcherCEFPrefix = initMatcher(CEF_PREFIX_PATTERN); + this.matcherCEFMain = initMatcher(CEF_MAIN_PATTERN); + this.matcherCEFExtension = initMatcher(PATTERN_EXTENSION); + } + + List splitToList(String data) { + List result = new ArrayList<>(10); + Matcher matcherData = this.matcherCEFMain.get().reset(data); + int start = 0; + int end = 0; + while (matcherData.find()) { + end = matcherData.end(); + String part = data.substring(start, end - 1); + start = end; + result.add(part); + } + if (data.length() > end) { + result.add(data.substring(end)); + } + return result; + } + + @Override + public Message parse(SyslogRequest request) { + final Matcher matcherPrefix = this.matcherCEFPrefix.get().reset(request.rawMessage()); + if (!matcherPrefix.find()) { + return null; + } + final String groupPriority = matcherPrefix.group("priority"); + final String groupDate = matcherPrefix.group("date"); + final String groupHost = matcherPrefix.group("host"); + final String groupCEFVersion = matcherPrefix.group("version"); + final String groupData = matcherPrefix.group("data"); + final Integer priority = (groupPriority == null || groupPriority.isEmpty()) ? null : Integer.parseInt(groupPriority); + final Integer facility = null == priority ? null : Priority.facility(priority); + final Integer level = null == priority ? null : Priority.level(priority, facility); + final LocalDateTime date = parseDate(groupDate); + final Integer cefVersion = Integer.parseInt(groupCEFVersion); + final List parts = splitToList(groupData); + DefaultSyslogMessage.Builder builder = DefaultSyslogMessage.builder(); + builder.type(MessageType.CEF); + builder.rawMessage(request.rawMessage()); + builder.remoteAddress(request.remoteAddress()); + builder.date(date); + builder.version(cefVersion); + builder.host(groupHost); + builder.level(level); + builder.facility(facility); + int index = 0; + for (String token : parts) { + token = token.replace("\\|", "|"); + switch (index) { + case 0: + builder.deviceVendor(token); + break; + case 1: + builder.deviceProduct(token); + break; + case 2: + builder.deviceVersion(token); + break; + case 3: + builder.deviceEventClassId(token); + break; + case 4: + builder.name(token); + break; + case 5: + builder.severity(token); + break; + case 6: + Map map = parseExtension(token); + builder.map(map); + break; + default: + break; + } + index++; + } + return builder.build(); + } + + private Map parseExtension(String token) { + final Map result = new LinkedHashMap<>(); + if (null == token || token.isEmpty()) { + return result; + } + Matcher matcher = this.matcherCEFExtension.get().reset(token); + String key = null; + String value; + int lastEnd = -1, lastStart = -1; + while (matcher.find()) { + if (lastEnd > -1) { + value = token.substring(lastEnd, matcher.start()).trim(); + result.put(key, value); + } + key = matcher.group(1); + lastStart = matcher.start(); + lastEnd = matcher.end(); + } + if (lastStart > -1 && !result.containsKey(key)) { + value = token.substring(lastEnd).trim(); + result.put(key, value); + } + return result; + } +} diff --git a/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java b/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java new file mode 100644 index 0000000..18a0bc7 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java @@ -0,0 +1,314 @@ +package org.xbib.event.syslog; + +import java.net.InetAddress; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; + +public class DefaultSyslogMessage implements SyslogMessage { + + private final Builder builder; + + private DefaultSyslogMessage(Builder builder) { + this.builder = builder; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public LocalDateTime date() { + return builder.date; + } + + @Override + public InetAddress remoteAddress() { + return builder.remoteAddress; + } + + @Override + public String rawMessage() { + return builder.rawMessage; + } + + @Override + public MessageType type() { + return builder.type; + } + + @Override + public Integer level() { + return builder.level; + } + + @Override + public Integer version() { + return builder.version; + } + + @Override + public Integer facility() { + return builder.facility; + } + + @Override + public String host() { + return builder.host; + } + + @Override + public String message() { + return builder.message; + } + + @Override + public String processId() { + return builder.processId; + } + + @Override + public String tag() { + return builder.tag; + } + + @Override + public String messageId() { + return builder.messageId; + } + + @Override + public String appName() { + return builder.appName; + } + + @Override + public List structuredData() { + return builder.structuredData; + } + + @Override + public String deviceVendor() { + return builder.deviceVendor; + } + + @Override + public String deviceProduct() { + return builder.deviceProduct; + } + + @Override + public String deviceVersion() { + return builder.deviceVersion; + } + + @Override + public String deviceEventClassId() { + return builder.deviceEventClassId; + } + + @Override + public String name() { + return builder.name; + } + + @Override + public String severity() { + return builder.severity; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("date=").append(builder.date) + .append(",remoteAddress=").append(builder.remoteAddress) + .append(",rawMessage=").append(builder.rawMessage) + .append(",type=").append(builder.type) + .append(",level=").append(builder.level) + .append(",version=").append(builder.version) + .append(",facility=").append(builder.facility) + .append(",host=").append(builder.host) + .append(",message=").append(builder.message) + .append(",processId=").append(builder.processId) + .append(",tag=").append(builder.tag) + .append(",messageId=").append(builder.messageId) + .append(",appName=").append(builder.appName) + .append(",structuredData=").append(builder.structuredData); + return sb.toString(); + } + + @Override + public void setKey(String key) { + // ignore + } + + @Override + public String getKey() { + return builder.messageId; + } + + @Override + public void setMap(Map map) { + // ignore + } + + @Override + public Map getMap() { + return builder.map; + } + + public static class Builder implements SyslogMessage.Builder { + + LocalDateTime date; + + InetAddress remoteAddress; + + String rawMessage; + + MessageType type; + + Integer level; + + Integer version; + + Integer facility; + + String host; + + String message; + + String processId; + + String tag; + + String messageId; + + String appName; + + List structuredData; + + String deviceVendor; + + String deviceProduct; + + String deviceVersion; + + String deviceEventClassId; + + String name; + + String severity; + + Map map; + + public Builder date(LocalDateTime date) { + this.date = date; + return this; + } + + public Builder remoteAddress(InetAddress remoteAddress) { + this.remoteAddress = remoteAddress; + return this; + } + + public Builder rawMessage(String rawMessage) { + this.rawMessage = rawMessage; + return this; + } + + public Builder type(MessageType type) { + this.type = type; + return this; + } + + public Builder level(Integer level) { + this.level = level; + return this; + } + + public Builder version(Integer version) { + this.version = version; + return this; + } + + public Builder facility(Integer facility) { + this.facility = facility; + return this; + } + + public Builder host(String host) { + this.host = host; + return this; + } + + public Builder message(String message) { + this.message = message; + return this; + } + + public Builder processId(String processId) { + this.processId = processId; + return this; + } + + public Builder tag(String tag) { + this.tag = tag; + return this; + } + + public Builder messageId(String messageId) { + this.messageId = messageId; + return this; + } + + public Builder appName(String appName) { + this.appName = appName; + return this; + } + + public Builder structuredData(List structuredData) { + this.structuredData = structuredData; + return this; + } + + public Builder deviceVendor(String deviceVendor) { + this.deviceVendor = deviceVendor; + return this; + } + + public Builder deviceProduct(String deviceProduct) { + this.deviceProduct = deviceProduct; + return this; + } + + public Builder deviceVersion(String deviceVersion) { + this.deviceVersion = deviceVersion; + return this; + } + + public Builder deviceEventClassId(String deviceEventClassId) { + this.deviceEventClassId = deviceEventClassId; + return this; + } + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder severity(String severity) { + this.severity = severity; + return this; + } + + public Builder map(Map map) { + this.map = map; + return this; + } + + @Override + public SyslogMessage build() { + return new DefaultSyslogMessage(this); + } + } +} diff --git a/src/main/java/org/xbib/event/syslog/DefaultSyslogRequest.java b/src/main/java/org/xbib/event/syslog/DefaultSyslogRequest.java new file mode 100644 index 0000000..06b608d --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/DefaultSyslogRequest.java @@ -0,0 +1,63 @@ +package org.xbib.event.syslog; + +import java.net.InetAddress; +import java.time.LocalDateTime; + +public class DefaultSyslogRequest implements SyslogRequest { + + private final Builder builder; + + private DefaultSyslogRequest(Builder builder) { + this.builder = builder; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public LocalDateTime receivedDate() { + return builder.receivedDate; + } + + @Override + public InetAddress remoteAddress() { + return builder.remoteAddress; + } + + @Override + public String rawMessage() { + return builder.rawMessage; + } + + public static class Builder implements SyslogRequest.Builder { + + LocalDateTime receivedDate; + + InetAddress remoteAddress; + + String rawMessage; + + private Builder() { + } + + public Builder receivedDate(LocalDateTime receivedDate) { + this.receivedDate = receivedDate; + return this; + } + + public Builder remoteAddress(InetAddress remoteAddress) { + this.remoteAddress = remoteAddress; + return this; + } + + public Builder rawMessage(String rawMessage) { + this.rawMessage = rawMessage; + return this; + } + + public SyslogRequest build() { + return new DefaultSyslogRequest(this); + } + } +} diff --git a/src/main/java/org/xbib/event/syslog/EncoderHelper.java b/src/main/java/org/xbib/event/syslog/EncoderHelper.java new file mode 100644 index 0000000..a18c96a --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/EncoderHelper.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2018 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.xbib.event.syslog; + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +class EncoderHelper { + public static final Charset CHARSET = StandardCharsets.UTF_8; + public static final byte[] LESS_THAN = "<".getBytes(CHARSET); + public static final byte[] GREATER_THAN = ">".getBytes(CHARSET); + public static final byte[] LEFT_SQUARE = "[".getBytes(CHARSET); + public static final byte[] RIGHT_SQUARE = "]".getBytes(CHARSET); + public static final byte[] SPACE = " ".getBytes(CHARSET); + public static final byte[] EQUALS = "=".getBytes(CHARSET); + + + public static void appendPriority(ByteBuf buffer, Message message) { + if (null != message.facility() && null != message.level()) { + Integer priority = Priority.priority(message.level(), message.facility()); + buffer.writeBytes(LESS_THAN); + buffer.writeCharSequence(priority.toString(), CHARSET); + buffer.writeBytes(GREATER_THAN); + } + } +} diff --git a/src/main/java/org/xbib/event/syslog/Facility.java b/src/main/java/org/xbib/event/syslog/Facility.java deleted file mode 100644 index 334e4a4..0000000 --- a/src/main/java/org/xbib/event/syslog/Facility.java +++ /dev/null @@ -1,193 +0,0 @@ -package org.xbib.event.syslog; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; - -/** - * Syslog facility as defined in RFC 5424 - The Syslog Protocol. - * See RFC 5427 - Textual Conventions for Syslog Management for the {@link #label}. - */ -public enum Facility implements Comparable { - - /** - * Kernel messages, numerical code 0. - */ - KERN(0, "KERN"), - /** - * User-level messages, numerical code 1. - */ - USER(1, "USER"), - /** - * Mail system, numerical code 2. - */ - MAIL(2, "MAIL"), - /** - * System daemons, numerical code 3. - */ - DAEMON(3, "DAEMON"), - /** - * Security/authorization messages, numerical code 4. - */ - AUTH(4, "AUTH"), - /** - * Messages generated internally by syslogd, numerical code 5. - */ - SYSLOG(5, "SYSLOG"), - /** - * Line printer subsystem, numerical code 6. - */ - LPR(6, "LPR"), - /** - * Network news subsystem, numerical code 7. - */ - NEWS(7, "NEWS"), - /** - * UUCP subsystem, numerical code 8 - */ - UUCP(8, "UUCP"), - /** - * Clock daemon, numerical code 9. - */ - CRON(9, "CRON"), - /** - * Security/authorization messages, numerical code 10. - */ - AUTHPRIV(10, "AUTHPRIV"), - /** - * FTP daemon, numerical code 11. - */ - FTP(11, "FTP"), - /** - * NTP subsystem, numerical code 12. - */ - NTP(12, "NTP"), - /** - * Log audit, numerical code 13. - */ - AUDIT(13, "AUDIT"), - /** - * Log alert, numerical code 14. - */ - ALERT(14, "ALERT"), - /** - * Clock daemon, numerical code 15. - */ - CLOCK(15, "CLOCK"), - /** - * Reserved for local use, numerical code 16. - */ - LOCAL0(16, "LOCAL0"), - /** - * Reserved for local use, numerical code 17. - */ - LOCAL1(17, "LOCAL1"), - /** - * Reserved for local use, numerical code 18. - */ - LOCAL2(18, "LOCAL2"), - /** - * Reserved for local use, numerical code 19. - */ - LOCAL3(19, "LOCAL3"), - /** - * Reserved for local use, numerical code 20. - */ - LOCAL4(20, "LOCAL4"), - /** - * Reserved for local use, numerical code 21. - */ - LOCAL5(21, "LOCAL5"), - /** - * Reserved for local use, numerical code 22. - */ - LOCAL6(22, "LOCAL6"), - /** - * Reserved for local use, numerical code 23. - */ - LOCAL7(23, "LOCAL7"); - - private final static Map facilityFromLabel = new HashMap(); - - private final static Map facilityFromNumericalCode = new HashMap(); - - static { - for (Facility facility : Facility.values()) { - facilityFromLabel.put(facility.label, facility); - facilityFromNumericalCode.put(facility.numericalCode, facility); - } - } - - /** - * Syslog facility numerical code - */ - private final int numericalCode; - /** - * Syslog facility textual code. Not {@code null} - */ - private final String label; - - private Facility(int numericalCode, String label) { - this.numericalCode = numericalCode; - this.label = label; - } - - /** - * @param numericalCode Syslog facility numerical code - * @return Syslog facility, not {@code null} - * @throws IllegalArgumentException the given numericalCode is not a valid Syslog facility numerical code - */ - public static Facility fromNumericalCode(int numericalCode) throws IllegalArgumentException { - Facility facility = facilityFromNumericalCode.get(numericalCode); - if (facility == null) { - throw new IllegalArgumentException("Invalid facility '" + numericalCode + "'"); - } - return facility; - } - - /** - * @param label Syslog facility textual code. {@code null} or empty returns {@code null} - * @return Syslog facility, {@code null} if given value is {@code null} - * @throws IllegalArgumentException the given value is not a valid Syslog facility textual code - */ - public static Facility fromLabel(String label) throws IllegalArgumentException { - if (label == null || label.isEmpty()) { - return null; - } - - Facility facility = facilityFromLabel.get(label); - if (facility == null) { - throw new IllegalArgumentException("Invalid facility '" + label + "'"); - } - return facility; - } - - /** - * Syslog facility numerical code - * @return numerical code - */ - public int numericalCode() { - return numericalCode; - } - - /** - * Syslog facility textual code. Not {@code null}. - * @return label - */ - public String label() { - return label; - } - - /** - * Compare on {@link Facility#numericalCode()} - * @return comparator for facilities - */ - public static Comparator comparator() { - return new Comparator() { - @Override - public int compare(Facility f1, Facility f2) { - return Integer.compare(f1.numericalCode, f2.numericalCode); - } - }; - } -} diff --git a/src/main/java/org/xbib/event/syslog/JsonParser.java b/src/main/java/org/xbib/event/syslog/JsonParser.java deleted file mode 100644 index c87cbf4..0000000 --- a/src/main/java/org/xbib/event/syslog/JsonParser.java +++ /dev/null @@ -1,346 +0,0 @@ -package org.xbib.event.syslog; - -import java.io.IOException; -import java.io.Reader; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -class JsonParser { - - private static final int DEFAULT_BUFFER_SIZE = 1024; - - private final Reader reader; - - private final char[] buf; - - private int index; - - private int fill; - - private int ch; - - private StringBuilder sb; - - private int start; - - public JsonParser(Reader reader) { - this(reader, DEFAULT_BUFFER_SIZE); - } - - public JsonParser(Reader reader, int buffersize) { - this.reader = reader; - buf = new char[buffersize]; - start = -1; - } - - public Object parse() throws IOException { - read(); - skipBlank(); - Object result = parseValue(); - skipBlank(); - if (ch != -1) { - throw new IOException("unexpected character: " + ch); - } - return result; - } - - private Object parseValue() throws IOException { - switch (ch) { - case 'n': - return parseNull(); - case 't': - return parseTrue(); - case 'f': - return parseFalse(); - case '"': - return parseString(); - case '[': - return parseList(); - case '{': - return parseMap(); - case '-': - case '+': - case '0': - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - case '8': - case '9': - return parseNumber(); - } - throw new IOException("value"); - } - - private List parseList() throws IOException { - read(); - List list = new ArrayList(); - skipBlank(); - if (parseChar(']')) { - return list; - } - do { - skipBlank(); - list.add(parseValue()); - skipBlank(); - } while (parseChar(',')); - if (!parseChar(']')) { - expected("',' or ']'"); - } - return list; - } - - private Map parseMap() throws IOException { - read(); - Map object = new LinkedHashMap(); - skipBlank(); - if (parseChar('}')) { - return object; - } - do { - skipBlank(); - if (ch != '"') { - expected("name"); - } - String name = parseString(); - skipBlank(); - if (!parseChar(':')) { - expected("':'"); - } - skipBlank(); - object.put(name, parseValue()); - skipBlank(); - } while (parseChar(',')); - if (!parseChar('}')) { - expected("',' or '}'"); - } - return object; - } - - private Object parseNull() throws IOException { - read(); - checkForChar('u'); - checkForChar('l'); - checkForChar('l'); - return null; - } - - private Object parseTrue() throws IOException { - read(); - checkForChar('r'); - checkForChar('u'); - checkForChar('e'); - return Boolean.TRUE; - } - - private Object parseFalse() throws IOException { - read(); - checkForChar('a'); - checkForChar('l'); - checkForChar('s'); - checkForChar('e'); - return Boolean.FALSE; - } - - private void checkForChar(char ch) throws IOException { - if (!parseChar(ch)) { - expected("'" + ch + "'"); - } - } - - private String parseString() throws IOException { - read(); - startCapture(); - while (ch != '"') { - if (ch == '\\') { - pauseCapture(); - parseEscaped(); - startCapture(); - } else if (ch < 0x20) { - expected("valid string character"); - } else { - read(); - } - } - String s = endCapture(); - read(); - return s; - } - - private void parseEscaped() throws IOException { - read(); - switch (ch) { - case '"': - case '/': - case '\\': - sb.append((char) ch); - break; - case 'b': - sb.append('\b'); - break; - case 't': - sb.append('\t'); - break; - case 'f': - sb.append('\f'); - break; - case 'n': - sb.append('\n'); - break; - case 'r': - sb.append('\r'); - break; - case 'u': - char[] hex = new char[4]; - for (int i = 0; i < 4; i++) { - read(); - if (!isHexDigit()) { - expected("hexadecimal digit"); - } - hex[i] = (char) ch; - } - sb.append((char) Integer.parseInt(String.valueOf(hex), 16)); - break; - default: - expected("valid escape sequence"); - } - read(); - } - - private Object parseNumber() throws IOException { - startCapture(); - parseChar('-'); - int firstDigit = ch; - if (!parseDigit()) { - expected("digit"); - } - if (firstDigit != '0') { - while (parseDigit()) { - } - } - parseFraction(); - parseExponent(); - return endCapture(); - } - - private boolean parseFraction() throws IOException { - if (!parseChar('.')) { - return false; - } - if (!parseDigit()) { - expected("digit"); - } - while (parseDigit()) { - } - return true; - } - - private boolean parseExponent() throws IOException { - if (!parseChar('e') && !parseChar('E')) { - return false; - } - if (!parseChar('+')) { - parseChar('-'); - } - if (!parseDigit()) { - expected("digit"); - } - while (parseDigit()) { - } - return true; - } - - private boolean parseChar(char ch) throws IOException { - if (this.ch != ch) { - return false; - } - read(); - return true; - } - - private boolean parseDigit() throws IOException { - if (!isDigit()) { - return false; - } - read(); - return true; - } - - private void skipBlank() throws IOException { - while (isWhiteSpace()) { - read(); - } - } - - private void read() throws IOException { - if (ch == -1) { - throw new IOException("unexpected end of input"); - } - if (index == fill) { - if (start != -1) { - sb.append(buf, start, fill - start); - start = 0; - } - fill = reader.read(buf, 0, buf.length); - index = 0; - if (fill == -1) { - ch = -1; - return; - } - } - ch = buf[index++]; - } - - private void startCapture() { - if (sb == null) { - sb = new StringBuilder(); - } - start = index - 1; - } - - private void pauseCapture() { - int end = ch == -1 ? index : index - 1; - sb.append(buf, start, end - start); - start = -1; - } - - private String endCapture() { - int end = ch == -1 ? index : index - 1; - String captured; - if (sb.length() > 0) { - sb.append(buf, start, end - start); - captured = sb.toString(); - sb.setLength(0); - } else { - captured = new String(buf, start, end - start); - } - start = -1; - return captured; - } - - private boolean isWhiteSpace() { - return ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r'; - } - - private boolean isDigit() { - return ch >= '0' && ch <= '9'; - } - - private boolean isHexDigit() { - return ch >= '0' && ch <= '9' - || ch >= 'a' && ch <= 'f' - || ch >= 'A' && ch <= 'F'; - } - - private void expected(String expected) throws IOException { - if (ch == -1) { - throw new IOException("unexpected end of input"); - } - throw new IOException("expected " + expected); - } -} diff --git a/src/main/java/org/xbib/event/syslog/Message.java b/src/main/java/org/xbib/event/syslog/Message.java new file mode 100644 index 0000000..3d2f9bb --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/Message.java @@ -0,0 +1,94 @@ +package org.xbib.event.syslog; + +import org.xbib.datastructures.api.Builder; +import org.xbib.event.Event; + +import java.net.InetAddress; +import java.time.LocalDateTime; +import java.util.List; + +/** + * Represents a standard syslog message. + */ +public interface Message extends Event { + /** + * Date of the message. This is the parsed date from the client. + * + * @return date of the message + */ + LocalDateTime date(); + + /** + * IP Address for the sender of the message. + * + * @return sender IP Address + */ + InetAddress remoteAddress(); + + /** + * Unprocessed copy of the message. + * + * @return Unprocessed message + */ + String rawMessage(); + + MessageType type(); + + /** + * Level for the message. Parsed from the message. + * + * @return message Level + */ + Integer level(); + + /** + * Version of the message. + * + * @return message version + */ + Integer version(); + + /** + * Facility of the message. + * + * @return message facility + */ + Integer facility(); + + /** + * Host of the message. This is the value from the message. + * + * @return message host + */ + String host(); + + /** + * Message part of the overall syslog message. + * + * @return message part of the overall syslog message + */ + String message(); + + String processId(); + + String tag(); + + String messageId(); + + String appName(); + + List structuredData(); + + String deviceVendor(); + + String deviceProduct(); + + String deviceVersion(); + + String deviceEventClassId(); + + String name(); + + String severity(); + +} diff --git a/src/main/java/org/xbib/event/syslog/MessageEncoder.java b/src/main/java/org/xbib/event/syslog/MessageEncoder.java new file mode 100644 index 0000000..ada63e3 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/MessageEncoder.java @@ -0,0 +1,125 @@ +package org.xbib.event.syslog; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; + +public class MessageEncoder extends MessageToMessageEncoder { + + final DateTimeFormatter cefDateFormat; + final Charset charset; + final byte[] cef; + final byte[] pipe; + + public MessageEncoder(DateTimeFormatter cefDateFormat) { + this.cefDateFormat = cefDateFormat; + this.charset = StandardCharsets.UTF_8; + this.cef = "CEF:0".getBytes(charset); + this.pipe = "|".getBytes(charset); + } + + @Override + protected void encode(ChannelHandlerContext context, Message message, List list) { + switch (message.type()) { + case CEF: + encodeCEF(context, message, list); + break; + case RFC3164: + encodeRFC3164(context, message, list); + break; + case RFC5424: + encodeRFC5424(context, message, list); + break; + default: + break; + } + } + + private void encodeCEF(ChannelHandlerContext context, Message message, List output) { + final ByteBuf buffer = context.alloc().buffer(); + EncoderHelper.appendPriority(buffer, message); + buffer.writeCharSequence(cefDateFormat.format(message.date()), charset); + buffer.writeBytes(EncoderHelper.SPACE); + buffer.writeCharSequence(message.host(), charset); + buffer.writeBytes(EncoderHelper.SPACE); + buffer.writeBytes(cef); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.deviceVendor(), charset); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.deviceProduct(), charset); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.deviceVersion(), charset); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.deviceEventClassId(), charset); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.name(), charset); + buffer.writeBytes(pipe); + buffer.writeCharSequence(message.severity(), charset); + buffer.writeBytes(pipe); + int index = 0; + for (Map.Entry kvp : message.getMap().entrySet()) { + if (index > 0) { + buffer.writeBytes(EncoderHelper.SPACE); + } + buffer.writeCharSequence(kvp.getKey(), charset); + buffer.writeBytes(EncoderHelper.EQUALS); + buffer.writeCharSequence(kvp.getValue().toString(), charset); + index++; + } + + output.add(buffer); + } + + private void encodeRFC3164(ChannelHandlerContext context, Message message, List output) { + ByteBuf buffer = context.alloc().buffer(); + EncoderHelper.appendPriority(buffer, message); + buffer.writeCharSequence(message.date().format(cefDateFormat), charset); + buffer.writeCharSequence(" ", charset); + buffer.writeCharSequence(message.host(), charset); + buffer.writeCharSequence(" ", charset); + buffer.writeCharSequence(message.tag(), charset); + if (message.processId() != null) { + buffer.writeCharSequence("[", charset); + buffer.writeCharSequence(message.processId(), charset); + buffer.writeCharSequence("]", charset); + } + buffer.writeCharSequence(": ", charset); + buffer.writeCharSequence(message.message(), charset); + output.add(buffer); + } + + private void encodeRFC5424(ChannelHandlerContext context, Message message, List output) { + final ByteBuf buffer = context.alloc().buffer(); + EncoderHelper.appendPriority(buffer, message); + if (message.version() != null) { + buffer.writeCharSequence(message.version().toString(), charset); + } + buffer.writeBytes(EncoderHelper.SPACE); + buffer.writeCharSequence(message.date().format(cefDateFormat), charset); + buffer.writeCharSequence(" ", charset); + buffer.writeCharSequence(message.host(), charset); + buffer.writeCharSequence(" ", charset); + if (message.appName() != null) { + buffer.writeCharSequence(message.appName(), charset); + } + if (message.processId() != null) { + buffer.writeCharSequence(message.processId(), charset); + } else { + buffer.writeCharSequence(" -", charset); + } + if (message.messageId() != null) { + buffer.writeCharSequence(message.messageId(), charset); + } else { + buffer.writeCharSequence(" -", charset); + } + buffer.writeCharSequence(" - ", charset); + buffer.writeCharSequence(message.message(), charset); + output.add(buffer); + } +} diff --git a/src/main/java/org/xbib/event/syslog/MessageKey.java b/src/main/java/org/xbib/event/syslog/MessageKey.java new file mode 100644 index 0000000..42bab4d --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/MessageKey.java @@ -0,0 +1,7 @@ +package org.xbib.event.syslog; + +public interface MessageKey { + String remoteAddress(); + + String host(); +} diff --git a/src/main/java/org/xbib/event/syslog/MessageParser.java b/src/main/java/org/xbib/event/syslog/MessageParser.java index 973856a..31bd442 100644 --- a/src/main/java/org/xbib/event/syslog/MessageParser.java +++ b/src/main/java/org/xbib/event/syslog/MessageParser.java @@ -1,231 +1,140 @@ package org.xbib.event.syslog; +import org.xbib.datastructures.api.Builder; +import org.xbib.datastructures.json.tiny.JsonBuilder; + import java.io.IOException; -import java.io.StringReader; +import java.time.DateTimeException; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; import java.time.temporal.TemporalAccessor; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.xbib.datastructures.api.Builder; -/** - * Parses a syslog message with RFC 3164 or RFC 5424 date format - */ -public class MessageParser { +public abstract class MessageParser { - private final static Pattern TWO_SPACES = Pattern.compile(" "); + private static final Logger log = Logger.getLogger(MessageParser.class.getName()); - private final static DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME; + private static final String NULL_TOKEN = "-"; - private final static DateTimeFormatter rfc3164Format = - DateTimeFormatter.ofPattern("MMM d HH:mm:ss").withZone(ZoneId.of("UTC")); + protected final List dateFormatters; - private final static int RFC3164_LEN = 15; + private final ThreadLocal matcherStructuredData; - private final static int RFC5424_PREFIX_LEN = 19; + private final ThreadLocal matcherKeyValue; - private final DateTimeFormatter timeParser; + private final ZoneId zoneId; - private final Map timestampCache; + public MessageParser() { + this(ZoneId.of("UTC")); + } - private final Map fieldNames = new HashMap<>() {{ - put("host", "host"); - put("facility", "facility"); - put("severity", "severity"); - put("timestamp", "timestamp"); - put("message", "message"); - }}; + public MessageParser(ZoneId zoneId) { + this.zoneId = zoneId; + this.dateFormatters = Arrays.asList(DateTimeFormatter.ISO_OFFSET_DATE_TIME, + new DateTimeFormatterBuilder() + .appendPattern("MMM dd") + .optionalStart() + .appendPattern("[ yyyy]") + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1) + .optionalEnd() + .appendPattern(" HH:mm:ss") + .toFormatter(Locale.ROOT) + ); + this.matcherStructuredData = initMatcher("\\[([^\\]]+)\\]"); + this.matcherKeyValue = initMatcher("(?\\S+)=\"(?[^\"]+)\"|(?\\S+)"); + } - private Map patterns; + /** + * Method is used to parse an incoming syslog message. + * + * @param request Incoming syslog request. + * @return Object to pass along the pipeline. Null if could not be parsed. + */ + public abstract Message parse(SyslogRequest request) throws IOException; - public MessageParser() { - timeParser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(ZoneId.of("UTC")); - timestampCache = new LinkedHashMap<>(); - /*CacheBuilder.newBuilder().maximumSize(1000).build( - new CacheLoader<>() { - - @Override - public LocalDateTime load(String key) { - return LocalDateTime.parse(key, timeParser); - } - });*/ + protected final ThreadLocal initMatcher(String pattern) { + return initMatcher(pattern, 0); } - public MessageParser setPatterns(Map patterns) { - this.patterns = patterns; - return this; + protected final ThreadLocal initMatcher(String pattern, int flags) { + final Pattern p = Pattern.compile(pattern, flags); + return new MatcherInheritableThreadLocal(p); } - public MessageParser setFieldName(String name, String newName) { - fieldNames.put(name, newName); - return this; + protected String nullableString(String groupText) { + return NULL_TOKEN.equals(groupText) ? null : groupText; } - @SuppressWarnings("unchecked") - public void parseMessage(String msg, Builder builder) throws IOException { - int msgLen = msg.length(); - int pos = 0; - if (msg.charAt(pos) != '<') { - throw new IllegalArgumentException("bad format: invalid priority: cannot find open bracket '<' " + msg); - } - int end = msg.indexOf('>'); - if (end < 0 || end > 6) { - throw new IllegalArgumentException("bad format: invalid priority: cannot find end bracket '>' " + msg); - } - int pri = Integer.parseInt(msg.substring(1, end)); - Facility facility = Facility.fromNumericalCode(pri / 8); - Severity severity = Severity.fromNumericalCode(pri % 8); - builder.field(fieldNames.get("facility"), facility.label()) - .field(fieldNames.get("severity"), severity.label()); - if (msgLen <= end + 1) { - throw new IllegalArgumentException("bad format: no data except priority " + msg); - } - pos = end + 1; - if (msgLen > pos + 2 && "1 ".equals(msg.substring(pos, pos + 2))) { - pos += 2; - } - TemporalAccessor timestamp; - char ch = msg.charAt(pos); - if (ch == '-') { - timestamp = LocalDateTime.now(); - if (msgLen <= pos + 2) { - throw new IllegalArgumentException("bad syslog format (missing hostname)"); - } - pos += 2; - } else if (ch >= 'A' && ch <= 'Z') { - if (msgLen <= pos + RFC3164_LEN) { - throw new IllegalArgumentException("bad timestamp format"); - } - timestamp = parseRFC3164Time(msg.substring(pos, pos + RFC3164_LEN)); - pos += RFC3164_LEN + 1; - } else { - int sp = msg.indexOf(' ', pos); - if (sp == -1) { - throw new IllegalArgumentException("bad timestamp format"); - } - timestamp = parseRFC5424Date(msg.substring(pos, sp)); - pos = sp + 1; - } - builder.field(fieldNames.get("timestamp"), formatter.format(timestamp)); - int ns = msg.indexOf(' ', pos); - if (ns == -1) { - throw new IllegalArgumentException("bad syslog format (missing hostname)"); - } - String hostname = msg.substring(pos, ns); - builder.field(fieldNames.get("host"), hostname); - - String data; - if (msgLen > ns + 1) { - pos = ns + 1; - data = msg.substring(pos); - } else { - data = msg; - } - try { - if (data.startsWith("@cee:")) { - data = data.substring(5); - } - JsonParser parser = new JsonParser(new StringReader(data)); - Map map = (Map)parser.parse(); - builder.buildMap(map); - } catch (Throwable t) { - // ignore - } - String message = fieldNames.get("message"); - builder.field(message, data); - if (patterns != null) { - for (Map.Entry entry : patterns.entrySet()) { - Matcher m = entry.getValue().matcher(data); - if (m.find()) { - builder.field(entry.getKey(), m.group(1)); + protected LocalDateTime parseDate(String date) { + final String cleanDate = date.replaceAll("\\s+", " "); + LocalDateTime result = null; + for (DateTimeFormatter formatter : dateFormatters) { + try { + TemporalAccessor temporal = formatter.parseBest(cleanDate, OffsetDateTime::from, LocalDateTime::from); + if (temporal instanceof LocalDateTime) { + result = ((LocalDateTime) temporal); + } else { + result = ((OffsetDateTime) temporal).withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime(); } + if (result.getLong(ChronoField.YEAR_OF_ERA) == 1) { + result = result.withYear(LocalDateTime.now(this.zoneId).getYear()); + } + break; + } catch (DateTimeException e) { + // ignore } } + if (result == null) { + log.log(Level.SEVERE, "could not parse date " + cleanDate); + } + return result; } - private LocalDateTime parseRFC5424Date(String msg) { - int len = msg.length(); - if (len <= RFC5424_PREFIX_LEN) { - throw new IllegalArgumentException("bad format: not a valid RFC5424 timestamp: " + msg); - } - String timestampPrefix = msg.substring(0, RFC5424_PREFIX_LEN); - LocalDateTime timestamp = timestampCache.get(timestampPrefix); - int pos = RFC5424_PREFIX_LEN; - if (timestamp == null) { - throw new IllegalArgumentException("parse error: timestamp is null"); - } - if (msg.charAt(pos) == '.') { - boolean found = false; - int end = pos + 1; - if (len <= end) { - throw new IllegalArgumentException("bad timestamp format (no TZ)"); - } - while (!found) { - char ch = msg.charAt(end); - if (ch >= '0' && ch <= '9') { - end++; + protected List parseStructuredData(String structuredData) throws IOException { + final Matcher matcher = matcherStructuredData.get().reset(structuredData); + final List result = new ArrayList<>(); + while (matcher.find()) { + final String input = matcher.group(1); + Builder builder = JsonBuilder.builder(); + final Matcher kvpMatcher = matcherKeyValue.get().reset(input); + while (kvpMatcher.find()) { + final String key = kvpMatcher.group("key"); + final String value = kvpMatcher.group("value"); + final String id = kvpMatcher.group("id"); + if (null != id && !id.isEmpty()) { + builder.field("id", id); } else { - found = true; + builder.fieldIfNotNull(key, value); } } - if (end - (pos + 1) > 0) { - long milliseconds = (long) (Double.parseDouble(msg.substring(pos, end)) * 1000.0); - timestamp.plus(milliseconds, ChronoUnit.MILLIS); - } else { - throw new IllegalArgumentException("bad format: invalid timestamp (fractional portion): " + msg); - } - pos = end; + result.add(builder); } - char ch = msg.charAt(pos); - if (ch != 'Z') { - if (ch == '+' || ch == '-') { - if (len <= pos + 5) { - throw new IllegalArgumentException("bad format: invalid timezone: " + msg); - } - int sign = ch == '+' ? +1 : -1; - char[] hourzone = new char[5]; - for (int i = 0; i < 5; i++) { - hourzone[i] = msg.charAt(pos + 1 + i); - } - if (hourzone[0] >= '0' && hourzone[0] <= '9' - && hourzone[1] >= '0' && hourzone[1] <= '9' - && hourzone[2] == ':' - && hourzone[3] >= '0' && hourzone[3] <= '9' - && hourzone[4] >= '0' && hourzone[4] <= '9') { - int hourOffset = Integer.parseInt(msg.substring(pos + 1, pos + 3)); - int minOffset = Integer.parseInt(msg.substring(pos + 4, pos + 6)); - timestamp.minus(sign * ((hourOffset * 60L) + minOffset) * 60000, ChronoUnit.MILLIS); - } else { - throw new IllegalArgumentException("bad format: invalid timezone: " + msg); - } - } - } - return timestamp; + return result; } - private LocalDateTime parseRFC3164Time(String timestamp) { - LocalDateTime now = LocalDateTime.now(); - int year = now.getYear(); - timestamp = TWO_SPACES.matcher(timestamp).replaceFirst(" "); - LocalDateTime date; - try { - date = LocalDateTime.parse(timestamp, rfc3164Format); - } catch (Exception e) { - return LocalDateTime.MIN; + static class MatcherInheritableThreadLocal extends InheritableThreadLocal { + private final Pattern pattern; + + MatcherInheritableThreadLocal(Pattern pattern) { + this.pattern = pattern; } - LocalDateTime fixed = date.withYear(year); - if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { - fixed = date.withYear(year - 1); - } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { - fixed = date.withYear(year + 1); + + @Override + protected Matcher initialValue() { + return this.pattern.matcher(""); } - return fixed; } } diff --git a/src/main/java/org/xbib/event/syslog/MessageType.java b/src/main/java/org/xbib/event/syslog/MessageType.java new file mode 100644 index 0000000..d831e90 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/MessageType.java @@ -0,0 +1,20 @@ +package org.xbib.event.syslog; + +public enum MessageType { + /** + * Message type for a message that could not be parsed. + */ + UNKNOWN, + /** + * Message type for a CEF message. + */ + CEF, + /** + * Message type for a rfc 3164 message. + */ + RFC3164, + /** + * Message type for a rfc 5424 message. + */ + RFC5424 +} diff --git a/src/main/java/org/xbib/event/syslog/Priority.java b/src/main/java/org/xbib/event/syslog/Priority.java new file mode 100644 index 0000000..93fc13a --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/Priority.java @@ -0,0 +1,18 @@ +package org.xbib.event.syslog; + +class Priority { + private Priority() { + } + + public static int facility(int priority) { + return priority >> 3; + } + + public static int level(int priority, int facility) { + return priority - (facility << 3); + } + + public static int priority(int level, int facility) { + return (facility * 8) + level; + } +} diff --git a/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java b/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java new file mode 100644 index 0000000..c0c696d --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java @@ -0,0 +1,46 @@ +package org.xbib.event.syslog; + +import java.time.LocalDateTime; +import java.util.regex.Matcher; + +public class RFC3164MessageParser extends MessageParser { + + private static final String PATTERN = "^(<(?\\d+)>)?(?([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?\\S+)\\s+((?[^\\[\\s\\]]+)(\\[(?\\d+)\\])?:)*\\s*(?.+)$"; + + private final ThreadLocal matcherThreadLocal; + + public RFC3164MessageParser() { + this.matcherThreadLocal = initMatcher(PATTERN); + } + + @Override + public Message parse(SyslogRequest request) { + final Matcher matcher = matcherThreadLocal.get().reset(request.rawMessage()); + if (!matcher.find()) { + return null; + } + final String groupPriority = matcher.group("priority"); + final String groupDate = matcher.group("date"); + final String groupHost = matcher.group("host"); + final String groupMessage = matcher.group("message"); + final String groupTag = matcher.group("tag"); + final String groupProcId = matcher.group("procid"); + final String processId = (groupProcId == null || groupProcId.isEmpty()) ? null : groupProcId; + final Integer priority = (groupPriority == null || groupPriority.isEmpty()) ? null : Integer.parseInt(groupPriority); + final Integer facility = null == priority ? null : Priority.facility(priority); + final Integer level = null == priority ? null : Priority.level(priority, facility); + final LocalDateTime date = parseDate(groupDate); + return DefaultSyslogMessage.builder() + .type(MessageType.RFC3164) + .rawMessage(request.rawMessage()) + .remoteAddress(request.remoteAddress()) + .date(date) + .host(groupHost) + .level(level) + .facility(facility) + .message(groupMessage) + .tag(groupTag) + .processId(processId) + .build(); + } +} diff --git a/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java b/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java new file mode 100644 index 0000000..81e51fb --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java @@ -0,0 +1,60 @@ +package org.xbib.event.syslog; + +import org.xbib.datastructures.api.Builder; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.regex.Matcher; + +public class RFC5424MessageParser extends MessageParser { + + private static final String PATTERN = "^<(?\\d+)>(?\\d{1,3})\\s*(?[0-9:+-TZ]+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?(-|\\[.+\\]))\\s*(?.+)$"; + + private final ThreadLocal matcherThreadLocal; + + public RFC5424MessageParser() { + this.matcherThreadLocal = initMatcher(PATTERN); + } + + @Override + public Message parse(SyslogRequest request) throws IOException { + final Matcher matcher = matcherThreadLocal.get().reset(request.rawMessage()); + if (!matcher.find()) { + return null; + } + final String groupPriority = matcher.group("priority"); + final String groupVersion = matcher.group("version"); + final String groupDate = matcher.group("date"); + final String groupHost = matcher.group("host"); + final String groupAppName = matcher.group("appname"); + final String groupProcID = matcher.group("procid"); + final String groupMessageID = matcher.group("msgid"); + final String groupStructuredData = matcher.group("structureddata"); + final String groupMessage = matcher.group("message"); + final int priority = Integer.parseInt(groupPriority); + final int facility = Priority.facility(priority); + final LocalDateTime date = parseDate(groupDate); + final int level = Priority.level(priority, facility); + final Integer version = Integer.parseInt(groupVersion); + final String appName = nullableString(groupAppName); + final String procID = nullableString(groupProcID); + final String messageID = nullableString(groupMessageID); + final List structuredData = parseStructuredData(groupStructuredData); + return DefaultSyslogMessage.builder() + .type(MessageType.RFC5424) + .rawMessage(request.rawMessage()) + .remoteAddress(request.remoteAddress()) + .date(date) + .host(groupHost) + .level(level) + .facility(facility) + .message(groupMessage) + .version(version) + .processId(procID) + .messageId(messageID) + .structuredData(structuredData) + .appName(appName) + .build(); + } +} diff --git a/src/main/java/org/xbib/event/syslog/Severity.java b/src/main/java/org/xbib/event/syslog/Severity.java deleted file mode 100644 index d851da2..0000000 --- a/src/main/java/org/xbib/event/syslog/Severity.java +++ /dev/null @@ -1,123 +0,0 @@ -package org.xbib.event.syslog; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; - -/** - * Syslog severity as defined in RFC 5424 - The Syslog Protocol. - */ -public enum Severity { - /** - * Emergency: system is unusable, numerical code 0. - */ - EMERGENCY(0, "EMERGENCY"), - /** - * Alert: action must be taken immediately, numerical code 1. - */ - ALERT(1, "ALERT"), - /** - * Critical: critical conditions, numerical code 2. - */ - CRITICAL(2, "CRITICAL"), - /** - * Error: error conditions, numerical code 3. - */ - ERROR(3, "ERROR"), - /** - * Warning: warning conditions, numerical code 4. - */ - WARNING(4, "WARNING"), - /** - * Notice: normal but significant condition, numerical code 5. - */ - NOTICE(5, "NOTICE"), - /** - * Informational: informational messages, numerical code 6. - */ - INFORMATIONAL(6, "INFORMATIONAL"), - /** - * Debug: debug-level messages, numerical code 7. - */ - DEBUG(7, "DEBUG"); - - private final static Map severityFromLabel = new HashMap(); - - private final static Map severityFromNumericalCode = new HashMap(); - - static { - for (Severity severity : Severity.values()) { - severityFromLabel.put(severity.label, severity); - severityFromNumericalCode.put(severity.numericalCode, severity); - } - } - - private final int numericalCode; - - private final String label; - - private Severity(int numericalCode, String label) { - this.numericalCode = numericalCode; - this.label = label; - } - - /** - * @param numericalCode Syslog severity numerical code - * @return Syslog severity, not {@code null} - * @throws IllegalArgumentException the given numericalCode is not a valid Syslog severity numerical code - */ - public static Severity fromNumericalCode(int numericalCode) throws IllegalArgumentException { - Severity severity = severityFromNumericalCode.get(numericalCode); - if (severity == null) { - throw new IllegalArgumentException("Invalid severity '" + numericalCode + "'"); - } - return severity; - } - - /** - * @param label Syslog severity textual code. {@code null} or empty returns {@code null} - * @return Syslog severity, {@code null} if given value is {@code null} - * @throws IllegalArgumentException the given value is not a valid Syslog severity textual code - */ - public static Severity fromLabel(String label) throws IllegalArgumentException { - if (label == null || label.isEmpty()) { - return null; - } - - Severity severity = severityFromLabel.get(label); - if (severity == null) { - throw new IllegalArgumentException("Invalid severity '" + label + "'"); - } - return severity; - } - - /** - * Syslog severity numerical code - * @return numerical code - */ - public int numericalCode() { - return numericalCode; - } - - /** - * Syslog severity textual code. Not {@code null}. - * @return the severity label - */ - public String label() { - return label; - } - - /** - * Compare on {@link Severity#numericalCode()} - * @return comparator for severities - */ - public static Comparator comparator() { - return new Comparator() { - @Override - public int compare(Severity s1, Severity s2) { - return Integer.compare(s1.numericalCode, s2.numericalCode); - } - }; - } -} - diff --git a/src/main/java/org/xbib/event/syslog/SyslogEventManager.java b/src/main/java/org/xbib/event/syslog/SyslogEventManager.java new file mode 100644 index 0000000..46047a8 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogEventManager.java @@ -0,0 +1,43 @@ +package org.xbib.event.syslog; + +import org.xbib.event.bus.AsyncEventBus; +import org.xbib.event.io.file.FileFollowEventService; +import org.xbib.settings.Settings; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class SyslogEventManager implements Closeable { + private static final Logger logger = Logger.getLogger(SyslogEventManager.class.getName()); + + private final List syslogServices; + + public SyslogEventManager(Settings settings, + AsyncEventBus eventBus) { + this.syslogServices = new ArrayList<>(); + for (Map.Entry entry : settings.getGroups("event.syslog").entrySet()) { + Settings definition = entry.getValue(); + if (definition.getAsBoolean("enabled", true)) { + try { + SyslogService syslogService = new SyslogService(definition, eventBus); + syslogServices.add(syslogService); + logger.log(Level.INFO, "syslog service " + entry.getKey() + " added"); + } catch (Exception e) { + logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e); + } + } + } + } + + @Override + public void close() throws IOException { + for (SyslogService syslogService : syslogServices) { + syslogService.close(); + } + } +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogFrameDecoder.java b/src/main/java/org/xbib/event/syslog/SyslogFrameDecoder.java new file mode 100644 index 0000000..fd0c0d2 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogFrameDecoder.java @@ -0,0 +1,37 @@ +package org.xbib.event.syslog; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.util.ByteProcessor; +import io.netty.util.CharsetUtil; + +public class SyslogFrameDecoder extends LineBasedFrameDecoder { + final static ByteProcessor INTEGER = b -> b >= ((byte) 48) && b <= ((byte) 57); + + public SyslogFrameDecoder(int maxLength) { + super(maxLength, true, false); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf b) throws Exception { + ByteBuf buffer = b.retain(); + int lengthIndex = buffer.forEachByte(INTEGER); + final int digitCount = lengthIndex - buffer.readerIndex(); + if (digitCount > 0) { + buffer.markReaderIndex(); + final String frameLength = buffer.getCharSequence(buffer.readerIndex(), digitCount, CharsetUtil.UTF_8).toString(); + buffer.skipBytes(digitCount + 1); + int length = Integer.parseInt(frameLength); + + if (b.readerIndex() + length > b.writerIndex()) { + buffer.resetReaderIndex(); + return null; + } + return buffer.slice(digitCount + 1, length); + } else { + return super.decode(ctx, buffer); + } + } +} + diff --git a/src/main/java/org/xbib/event/syslog/SyslogIdleStateHandler.java b/src/main/java/org/xbib/event/syslog/SyslogIdleStateHandler.java new file mode 100644 index 0000000..cfa7821 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogIdleStateHandler.java @@ -0,0 +1,25 @@ +package org.xbib.event.syslog; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; + +@ChannelHandler.Sharable +class SyslogIdleStateHandler extends ChannelDuplexHandler { + + public static final SyslogIdleStateHandler INSTANCE = new SyslogIdleStateHandler(); + + private SyslogIdleStateHandler() { + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent e) { + if (e.state() == IdleState.WRITER_IDLE) { + ctx.close(); + } + } + } +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogMessage.java b/src/main/java/org/xbib/event/syslog/SyslogMessage.java new file mode 100644 index 0000000..6d79475 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogMessage.java @@ -0,0 +1,9 @@ +package org.xbib.event.syslog; + +public interface SyslogMessage extends Message { + + interface Builder { + + SyslogMessage build(); + } +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogMessageHandler.java b/src/main/java/org/xbib/event/syslog/SyslogMessageHandler.java new file mode 100644 index 0000000..369808c --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogMessageHandler.java @@ -0,0 +1,48 @@ +package org.xbib.event.syslog; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +@ChannelHandler.Sharable +public class SyslogMessageHandler extends SimpleChannelInboundHandler { + + private final List parsers; + + public SyslogMessageHandler() { + this(Arrays.asList(new CEFMessageParser(), new RFC5424MessageParser(), new RFC3164MessageParser())); + } + + public SyslogMessageHandler(List parsers) { + this.parsers = parsers; + } + + @Override + protected void channelRead0(ChannelHandlerContext context, SyslogRequest request) { + context.executor().submit(() -> { + for (MessageParser parser : parsers) { + try { + Object result = parser.parse(request); + if (result != null) { + context.fireChannelRead(result); + return; + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + context.write(DefaultSyslogMessage.builder() + .type(MessageType.UNKNOWN) + .date(LocalDateTime.now()) + .rawMessage(request.rawMessage()) + .remoteAddress(request.remoteAddress()) + .build()); + }); + } +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogMessageKey.java b/src/main/java/org/xbib/event/syslog/SyslogMessageKey.java new file mode 100644 index 0000000..ca87cb5 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogMessageKey.java @@ -0,0 +1,4 @@ +package org.xbib.event.syslog; + +public interface SyslogMessageKey extends MessageKey { +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogRequest.java b/src/main/java/org/xbib/event/syslog/SyslogRequest.java new file mode 100644 index 0000000..7922d47 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogRequest.java @@ -0,0 +1,39 @@ +package org.xbib.event.syslog; + +import java.net.InetAddress; +import java.time.LocalDateTime; + +/** + * Interface represents an incoming syslog request. This interface acts as an intermediary between + * the TCPSyslogMessageDecoder and UDPSyslogMessageDecoder + * + * @see TCPSyslogMessageDecoder + * @see UDPSyslogMessageDecoder + */ +public interface SyslogRequest { + /** + * The time the message was received by Netty. + * + * @return The time the message was received by Netty. + */ + LocalDateTime receivedDate(); + + /** + * IP Address for the sender of the message. + * + * @return Sender IP Address + */ + InetAddress remoteAddress(); + + /** + * The raw message that was delivered + * + * @return Raw message. + */ + String rawMessage(); + + interface Builder { + + SyslogRequest build(); + } +} diff --git a/src/main/java/org/xbib/event/syslog/SyslogService.java b/src/main/java/org/xbib/event/syslog/SyslogService.java index 36d381c..5ee97d8 100644 --- a/src/main/java/org/xbib/event/syslog/SyslogService.java +++ b/src/main/java/org/xbib/event/syslog/SyslogService.java @@ -1,249 +1,126 @@ package org.xbib.event.syslog; -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; -import java.util.regex.Pattern; -import org.xbib.datastructures.api.Builder; -import org.xbib.datastructures.api.ByteSizeUnit; -import org.xbib.datastructures.api.ByteSizeValue; -import org.xbib.datastructures.api.TimeValue; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import org.xbib.event.bus.EventBus; import org.xbib.settings.Settings; -public class SyslogService { -/* - private static final Logger logger = Logger.getLogger(SyslogService.class.getName()); - - private final static String SYSLOG_HOST = "syslog.host"; - - private final static String SYSLOG_PORT = "syslog.port"; - - private final static String SYSLOG_RECEIVE_BUFFER_SIZE = "receive_buffer_size"; - - private final static String SYSLOG_PATTERNS = "patterns"; - - private final static String SYSLOG_FIELD_NAMES = "field_names"; - - private final String host; - - private final String port; +import java.io.Closeable; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; - private final ByteSizeValue receiveBufferSize; +public class SyslogService implements Closeable { - private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; + private final int port; - private final MessageParser messageParser; + private final Handler handler; - private DateTimeFormatter formatter; + private final EventBus eventBus; - private ConnectionlessBootstrap udpBootstrap; + private EventLoopGroup group; - private ServerBootstrap tcpBootstrap; + private EventLoopGroup bossGroup; - private Channel udpChannel; + private EventLoopGroup workerGroup; - private Channel tcpChannel; + private ChannelFuture channelFuture; - public SyslogService(Settings settings) { - this.host = settings.get(SYSLOG_HOST, "127.0.0.1"); - this.port = settings.get(SYSLOG_PORT, "9500-9600"); - this.receiveBufferSize = settings.getAsBytesSize(SYSLOG_RECEIVE_BUFFER_SIZE, new ByteSizeValue(10, ByteSizeUnit.MB)); - this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(receiveBufferSize.bytesAsInt()); - Map map = (Map) settings.getAsStructuredMap().get(SYSLOG_PATTERNS); - Map patterns = new HashMap<>(); - if (map != null) { - for (String key : map.keySet()) { - patterns.put(key, Pattern.compile((String) map.get(key))); - } - } - this.messageParser = new MessageParser().setPatterns(patterns); - map = (Map) settings.getAsStructuredMap().get(SYSLOG_FIELD_NAMES); - if (map != null) { - for (String key : map.keySet()) { - messageParser.setFieldName(key, (String) map.get(key)); - } - } - logger.info("syslog server: host [" + host + "], port [" + port + "]"); + public SyslogService(Settings settings, + EventBus eventBus) { + this.port = settings.getAsInt("port", 1514); + this.handler = new Handler(); + this.eventBus = eventBus; } - protected void doStart() throws Exception { - initializeUDP(); - initializeTCP(); - logger.info("syslog server up"); + public void startUdp() throws InterruptedException { + group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_BROADCAST, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(DatagramChannel datagramChannel) { + ChannelPipeline channelPipeline = datagramChannel.pipeline(); + channelPipeline.addLast( + new UDPSyslogMessageDecoder(), + new SyslogMessageHandler(), + handler + ); + } + }); + this.channelFuture = bootstrap.bind(port).sync(); } - protected void doStop() throws ElasticsearchException { - if (udpChannel != null) { - udpChannel.close().awaitUninterruptibly(); - } - if (udpBootstrap != null) { - udpBootstrap.releaseExternalResources(); - } - if (tcpChannel != null) { - tcpChannel.close().awaitUninterruptibly(); - } - if (tcpBootstrap != null) { - tcpBootstrap.releaseExternalResources(); - } - bulkProcessor.close(); - logger.info("syslog server down"); + public void startTcp() throws InterruptedException { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + ServerBootstrap serverBootstrap= new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new DelimiterBasedFrameDecoder(2000, true, Delimiters.lineDelimiter()), + new TCPSyslogMessageDecoder(), + new SyslogMessageHandler(), + handler + ); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + this.channelFuture = serverBootstrap.bind(port).sync(); } - private void initializeUDP() { - udpBootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory( - Executors.newCachedThreadPool(), 4)); - udpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt()); - udpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); - udpBootstrap.setOption("broadcast", "false"); - udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new Handler("udp")); - } - }); - InetAddress address; - try { - address = NetworkUtils.resolveInetAddress(host, null); - } catch (IOException e) { - logger.warn("failed to resolve host {}", e, host); - return; + @Override + public void close() throws IOException { + if (group != null) { + group.shutdownGracefully(); } - final InetAddress hostAddress = address; - PortsRange portsRange = new PortsRange(port); - final AtomicReference lastException = new AtomicReference<>(); - boolean success = portsRange.iterate(new PortsRange.PortCallback() { - @Override - public boolean onPortNumber(int portNumber) { - try { - udpChannel = udpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - } - }); - if (!success) { - logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port); - return; + if (bossGroup != null) { + bossGroup.shutdownGracefully(); } - logger.info("UDP listener running, address {}", udpChannel.getLocalAddress()); - } - - private void initializeTCP() { - tcpBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - settings.getAsInt("tcp.worker", 4))); - - tcpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt()); - tcpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); - tcpBootstrap.setOption("reuseAddress", settings.getAsBoolean("tcp.reuse_address", true)); - tcpBootstrap.setOption("tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true)); - tcpBootstrap.setOption("keepAlive", settings.getAsBoolean("tcp.keep_alive", true)); - - tcpBootstrap.setOption("child.receiveBufferSize", receiveBufferSize.bytesAsInt()); - tcpBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); - tcpBootstrap.setOption("child.reuseAddress", settings.getAsBoolean("tcp.reuse_address", true)); - tcpBootstrap.setOption("child.tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true)); - tcpBootstrap.setOption("child.keepAlive", settings.getAsBoolean("tcp.keep_alive", true)); - - tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new Handler("tcp")); - } - }); - - InetAddress address; - try { - address = SyslogNetworkUtils.resolveInetAddress(host, null); - } catch (IOException e) { - logger.warn("failed to resolve host {}", e, host); - return; + if (workerGroup != null) { + workerGroup.shutdownGracefully(); } - final InetAddress hostAddress = address; - PortsRange portsRange = new PortsRange(port); - final AtomicReference lastException = new AtomicReference<>(); - boolean success = portsRange.iterate(new PortsRange.PortCallback() { - @Override - public boolean onPortNumber(int portNumber) { - try { - tcpChannel = tcpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - } - }); - if (!success) { - logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port); - return; + if (channelFuture != null) { + channelFuture.channel().close(); } - logger.info("TCP listener running, address {}", tcpChannel.getLocalAddress()); } - class Handler extends SimpleChannelUpstreamHandler { - - private final String protocol; + @ChannelHandler.Sharable + private class Handler extends SimpleChannelInboundHandler { - Handler(String protocol) { - this.protocol = protocol; - } + private static final Logger logger = Logger.getLogger(Handler.class.getName()); - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - XContentBuilder builder = jsonBuilder(); - parse(ctx, buffer, builder); - IndexRequest indexRequest = new IndexRequest(isTimeWindow ? formatter.print(new DateTime()) : index) - .type(type) - .opType(IndexRequest.OpType.INDEX) - .source(builder); - try { - bulkProcessor.add(indexRequest); - } catch (Exception e1) { - logger.warn("failed to execute bulk request", e1); - } + public Handler() { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - if (e.getCause() instanceof BindException) { - // ignore, this happens when we retry binding to several ports, its fine if we fail... - return; - } - logger.warn("failure caught", e.getCause()); - throw new IOException(e.getCause()); - } - - private void parse(ChannelHandlerContext ctx, ChannelBuffer buffer, Builder builder) throws IOException { - SocketAddress localAddress = ctx.getChannel().getLocalAddress(); - SocketAddress remoteAddress = ctx.getChannel().getRemoteAddress(); - ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer); - try { - builder.startObject(); - builder.field("protocol", protocol); - if (localAddress != null) { - builder.field("local", localAddress.toString()); - } - if (remoteAddress != null) { - builder.field("remote", remoteAddress.toString()); - } - messageParser.parseMessage(ref.toUtf8(), builder); - builder.endObject(); - } catch (Exception e) { - logger.error(e.getMessage(), e); + protected void channelRead0(ChannelHandlerContext ctx, Message msg) { + if (eventBus != null) { + eventBus.post(msg); + } else { + logger.log(Level.INFO, msg.toString()); } } } -*/ } diff --git a/src/main/java/org/xbib/event/syslog/TCPSyslogMessageDecoder.java b/src/main/java/org/xbib/event/syslog/TCPSyslogMessageDecoder.java new file mode 100644 index 0000000..7e24f32 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/TCPSyslogMessageDecoder.java @@ -0,0 +1,39 @@ +package org.xbib.event.syslog; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.List; + +@ChannelHandler.Sharable +public class TCPSyslogMessageDecoder extends MessageToMessageDecoder { + + private final Charset charset; + + public TCPSyslogMessageDecoder(Charset charset) { + this.charset = charset; + } + + public TCPSyslogMessageDecoder() { + this(StandardCharsets.UTF_8); + } + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List output) { + final InetSocketAddress socketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress(); + final String rawMessage = byteBuf.toString(this.charset); + output.add(DefaultSyslogRequest.builder() + .receivedDate(LocalDateTime.now()) + .remoteAddress(socketAddress.getAddress()) + .rawMessage(rawMessage) + .build() + ); + } + +} diff --git a/src/main/java/org/xbib/event/syslog/UDPSyslogMessageDecoder.java b/src/main/java/org/xbib/event/syslog/UDPSyslogMessageDecoder.java new file mode 100644 index 0000000..b9b65f9 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/UDPSyslogMessageDecoder.java @@ -0,0 +1,40 @@ +package org.xbib.event.syslog; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.net.InetAddress; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.List; + +@ChannelHandler.Sharable +public class UDPSyslogMessageDecoder extends MessageToMessageDecoder { + + private final Charset charset; + + public UDPSyslogMessageDecoder(Charset charset) { + this.charset = charset; + } + + public UDPSyslogMessageDecoder() { + this(StandardCharsets.UTF_8); + } + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, + DatagramPacket datagramPacket, + List output) { + final String rawMessage = datagramPacket.content().toString(this.charset); + final InetAddress inetAddress = datagramPacket.sender().getAddress(); + output.add(DefaultSyslogRequest.builder() + .receivedDate(LocalDateTime.now()) + .rawMessage(rawMessage) + .remoteAddress(inetAddress) + .build() + ); + } +} diff --git a/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java b/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java index d907dde..0578ba8 100644 --- a/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java +++ b/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java @@ -4,6 +4,7 @@ import org.xbib.event.EventConsumer; import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.Subscribe; +import java.io.IOException; import java.time.Instant; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,4 +18,8 @@ public class TestClockEventConsumer implements EventConsumer { void onEvent(TestClockEvent event) { logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant()); } + + @Override + public void close() throws IOException { + } } diff --git a/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java b/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java index 3883ed3..b183318 100644 --- a/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java +++ b/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java @@ -5,6 +5,7 @@ import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.Subscribe; import org.xbib.event.timer.TestTimerEvent; +import java.io.IOException; import java.time.Instant; import java.util.logging.Level; import java.util.logging.Logger; @@ -16,6 +17,10 @@ public class TestFileFollowEventConsumer implements EventConsumer { @Subscribe @AllowConcurrentEvents void onEvent(TestFileFollowEvent event) { - logger.log(Level.INFO, "received filefollw event path = " + event.getPath() + " content = " + event.getContent()); + logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent()); + } + + @Override + public void close() throws IOException { } } diff --git a/src/test/java/org/xbib/event/syslog/SyslogServiceTest.java b/src/test/java/org/xbib/event/syslog/SyslogServiceTest.java new file mode 100644 index 0000000..0ab3f81 --- /dev/null +++ b/src/test/java/org/xbib/event/syslog/SyslogServiceTest.java @@ -0,0 +1,20 @@ +package org.xbib.event.syslog; + +import org.junit.jupiter.api.Test; +import org.xbib.settings.Settings; + +import java.io.IOException; + +public class SyslogServiceTest { + + @Test + public void testSyslogService() throws InterruptedException, IOException { + Settings settings = Settings.settingsBuilder() + .put("port", 1514) + .build(); + SyslogService syslogService = new SyslogService(settings, null); + syslogService.startTcp(); + Thread.sleep(60000L); + syslogService.close(); + } +} diff --git a/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java b/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java index 4199d7b..85feb3b 100644 --- a/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java +++ b/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java @@ -4,6 +4,7 @@ import org.xbib.event.EventConsumer; import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.Subscribe; +import java.io.IOException; import java.time.Instant; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,4 +18,8 @@ public class TestTimerEventConsumer implements EventConsumer { void onEvent(TestTimerEvent event) { logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant()); } + + @Override + public void close() throws IOException { + } }