add syslog service

This commit is contained in:
Jörg Prante 2023-10-30 15:34:04 +01:00
parent 12fd2eee90
commit 1d03b1be86
38 changed files with 1484 additions and 1077 deletions

View file

@ -23,6 +23,7 @@ branch 4.1 as of 26 Apr 2023.
Licence: Apache 2.0 Licence: Apache 2.0
-----------------
The work in org.xbib.event.io is based upon the work in The work in org.xbib.event.io is based upon the work in
@ -38,6 +39,8 @@ branched as of 23 December, 2021
License: Apache 2.0 License: Apache 2.0
---------------
The work in org.xbib.event.bus is taken from Guava The work in org.xbib.event.bus is taken from Guava
https://github.com/google/guava https://github.com/google/guava
@ -45,3 +48,13 @@ https://github.com/google/guava
as of 27 August, 2022 as of 27 August, 2022
License: Apache 2.0 License: Apache 2.0
---------------
The work in org.xbib.event.syslog is based upon netty-codec-syslog
https://github.com/jcustenborder/netty-codec-syslog
as of 30 October, 2023
License: Apache 2.0

View file

@ -45,6 +45,7 @@ dependencies {
implementation libs.time implementation libs.time
implementation libs.datastructures.common implementation libs.datastructures.common
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
implementation libs.netty.handler
implementation libs.reactivestreams implementation libs.reactivestreams
testImplementation libs.rxjava3 testImplementation libs.rxjava3
testImplementation libs.settings.datastructures.json testImplementation libs.settings.datastructures.json

View file

@ -1,5 +1,5 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.0.4 version = 0.0.5
org.gradle.warning.mode = ALL org.gradle.warning.mode = ALL

View file

@ -18,7 +18,9 @@ dependencyResolutionManagement {
version('gradle', '8.4') version('gradle', '8.4')
version('groovy', '4.0.13') version('groovy', '4.0.13')
version('datastructures', '5.0.5') version('datastructures', '5.0.5')
version('netty', '4.1.100.Final')
version('net', '4.0.0') version('net', '4.0.0')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net') library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures')

View file

@ -13,6 +13,11 @@ module org.xbib.event {
exports org.xbib.event.util; exports org.xbib.event.util;
exports org.xbib.event.yield; exports org.xbib.event.yield;
exports org.xbib.event; exports org.xbib.event;
requires io.netty.buffer;
requires io.netty.common;
requires io.netty.transport;
requires io.netty.handler;
requires io.netty.codec;
requires org.xbib.datastructures.api; requires org.xbib.datastructures.api;
requires org.xbib.datastructures.common; requires org.xbib.datastructures.common;
requires org.xbib.datastructures.json.tiny; requires org.xbib.datastructures.json.tiny;

View file

@ -1,4 +1,6 @@
package org.xbib.event; package org.xbib.event;
public interface EventConsumer { import java.io.Closeable;
public interface EventConsumer extends Closeable {
} }

View file

@ -6,6 +6,7 @@ import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.clock.ClockEventManager; import org.xbib.event.clock.ClockEventManager;
import org.xbib.event.io.file.FileFollowEventManager; import org.xbib.event.io.file.FileFollowEventManager;
import org.xbib.event.io.path.PathEventManager; import org.xbib.event.io.path.PathEventManager;
import org.xbib.event.syslog.SyslogEventManager;
import org.xbib.event.timer.TimerEventManager; import org.xbib.event.timer.TimerEventManager;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -44,11 +45,14 @@ public final class EventManager implements Closeable {
private final PathEventManager pathEventManager; private final PathEventManager pathEventManager;
private final SyslogEventManager syslogEventManager;
private EventManager(Settings settings) { private EventManager(Settings settings) {
this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader); this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader);
this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault()); this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault());
this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader); this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader);
this.pathEventManager = new PathEventManager(settings, eventBus, executorService, classLoader); this.pathEventManager = new PathEventManager(settings, eventBus, executorService, classLoader);
this.syslogEventManager = new SyslogEventManager(settings, eventBus);
} }
public static EventManager newEventManager(Settings settings) { public static EventManager newEventManager(Settings settings) {
@ -81,6 +85,10 @@ public final class EventManager implements Closeable {
return pathEventManager; return pathEventManager;
} }
public SyslogEventManager getSyslogEventManager() {
return syslogEventManager;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
return super.equals(obj); return super.equals(obj);
@ -88,8 +96,14 @@ public final class EventManager implements Closeable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
for (EventConsumer eventConsumer : eventConsumers) {
eventConsumer.close();
}
clockEventManager.close(); clockEventManager.close();
timerEventManager.close();
fileFollowEventManager.close();
pathEventManager.close(); pathEventManager.close();
syslogEventManager.close();
} }
private static class EventManagerExceptionHandler implements SubscriberExceptionHandler { private static class EventManagerExceptionHandler implements SubscriberExceptionHandler {

View file

@ -2,6 +2,7 @@ package org.xbib.event.clock;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
@ -18,4 +19,8 @@ public class SimpleClockEventConsumer implements EventConsumer {
void onEvent(ClockEvent event) { void onEvent(ClockEvent event) {
logger.info("received demo clock event, instant = " + event.getInstant()); logger.info("received demo clock event, instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -3,6 +3,8 @@ package org.xbib.event.io.file;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -14,7 +16,7 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class FileFollowEventManager { public class FileFollowEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName()); private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName());
@ -49,7 +51,8 @@ public class FileFollowEventManager {
} }
} }
public void close() { @Override
public void close() throws IOException {
for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) { for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) {
entry.getValue().setKeepWatching(false); entry.getValue().setKeepWatching(false);
entry.getKey().cancel(true); entry.getKey().cancel(true);

View file

@ -0,0 +1,131 @@
package org.xbib.event.syslog;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
public class CEFMessageParser extends MessageParser {
private static final String CEF_PREFIX_PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+CEF:(?<version>\\d+)\\|(?<data>.*)$";
private static final String CEF_MAIN_PATTERN = "(?<!\\\\)\\|";
private static final String PATTERN_EXTENSION = "(\\w+)=";
private final ThreadLocal<Matcher> matcherCEFPrefix;
private final ThreadLocal<Matcher> matcherCEFMain;
private final ThreadLocal<Matcher> matcherCEFExtension;
public CEFMessageParser() {
this.matcherCEFPrefix = initMatcher(CEF_PREFIX_PATTERN);
this.matcherCEFMain = initMatcher(CEF_MAIN_PATTERN);
this.matcherCEFExtension = initMatcher(PATTERN_EXTENSION);
}
List<String> splitToList(String data) {
List<String> result = new ArrayList<>(10);
Matcher matcherData = this.matcherCEFMain.get().reset(data);
int start = 0;
int end = 0;
while (matcherData.find()) {
end = matcherData.end();
String part = data.substring(start, end - 1);
start = end;
result.add(part);
}
if (data.length() > end) {
result.add(data.substring(end));
}
return result;
}
@Override
public Message parse(SyslogRequest request) {
final Matcher matcherPrefix = this.matcherCEFPrefix.get().reset(request.rawMessage());
if (!matcherPrefix.find()) {
return null;
}
final String groupPriority = matcherPrefix.group("priority");
final String groupDate = matcherPrefix.group("date");
final String groupHost = matcherPrefix.group("host");
final String groupCEFVersion = matcherPrefix.group("version");
final String groupData = matcherPrefix.group("data");
final Integer priority = (groupPriority == null || groupPriority.isEmpty()) ? null : Integer.parseInt(groupPriority);
final Integer facility = null == priority ? null : Priority.facility(priority);
final Integer level = null == priority ? null : Priority.level(priority, facility);
final LocalDateTime date = parseDate(groupDate);
final Integer cefVersion = Integer.parseInt(groupCEFVersion);
final List<String> parts = splitToList(groupData);
DefaultSyslogMessage.Builder builder = DefaultSyslogMessage.builder();
builder.type(MessageType.CEF);
builder.rawMessage(request.rawMessage());
builder.remoteAddress(request.remoteAddress());
builder.date(date);
builder.version(cefVersion);
builder.host(groupHost);
builder.level(level);
builder.facility(facility);
int index = 0;
for (String token : parts) {
token = token.replace("\\|", "|");
switch (index) {
case 0:
builder.deviceVendor(token);
break;
case 1:
builder.deviceProduct(token);
break;
case 2:
builder.deviceVersion(token);
break;
case 3:
builder.deviceEventClassId(token);
break;
case 4:
builder.name(token);
break;
case 5:
builder.severity(token);
break;
case 6:
Map<String, Object> map = parseExtension(token);
builder.map(map);
break;
default:
break;
}
index++;
}
return builder.build();
}
private Map<String, Object> parseExtension(String token) {
final Map<String, Object> result = new LinkedHashMap<>();
if (null == token || token.isEmpty()) {
return result;
}
Matcher matcher = this.matcherCEFExtension.get().reset(token);
String key = null;
String value;
int lastEnd = -1, lastStart = -1;
while (matcher.find()) {
if (lastEnd > -1) {
value = token.substring(lastEnd, matcher.start()).trim();
result.put(key, value);
}
key = matcher.group(1);
lastStart = matcher.start();
lastEnd = matcher.end();
}
if (lastStart > -1 && !result.containsKey(key)) {
value = token.substring(lastEnd).trim();
result.put(key, value);
}
return result;
}
}

View file

@ -0,0 +1,314 @@
package org.xbib.event.syslog;
import java.net.InetAddress;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
public class DefaultSyslogMessage implements SyslogMessage {
private final Builder builder;
private DefaultSyslogMessage(Builder builder) {
this.builder = builder;
}
public static Builder builder() {
return new Builder();
}
@Override
public LocalDateTime date() {
return builder.date;
}
@Override
public InetAddress remoteAddress() {
return builder.remoteAddress;
}
@Override
public String rawMessage() {
return builder.rawMessage;
}
@Override
public MessageType type() {
return builder.type;
}
@Override
public Integer level() {
return builder.level;
}
@Override
public Integer version() {
return builder.version;
}
@Override
public Integer facility() {
return builder.facility;
}
@Override
public String host() {
return builder.host;
}
@Override
public String message() {
return builder.message;
}
@Override
public String processId() {
return builder.processId;
}
@Override
public String tag() {
return builder.tag;
}
@Override
public String messageId() {
return builder.messageId;
}
@Override
public String appName() {
return builder.appName;
}
@Override
public List<org.xbib.datastructures.api.Builder> structuredData() {
return builder.structuredData;
}
@Override
public String deviceVendor() {
return builder.deviceVendor;
}
@Override
public String deviceProduct() {
return builder.deviceProduct;
}
@Override
public String deviceVersion() {
return builder.deviceVersion;
}
@Override
public String deviceEventClassId() {
return builder.deviceEventClassId;
}
@Override
public String name() {
return builder.name;
}
@Override
public String severity() {
return builder.severity;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("date=").append(builder.date)
.append(",remoteAddress=").append(builder.remoteAddress)
.append(",rawMessage=").append(builder.rawMessage)
.append(",type=").append(builder.type)
.append(",level=").append(builder.level)
.append(",version=").append(builder.version)
.append(",facility=").append(builder.facility)
.append(",host=").append(builder.host)
.append(",message=").append(builder.message)
.append(",processId=").append(builder.processId)
.append(",tag=").append(builder.tag)
.append(",messageId=").append(builder.messageId)
.append(",appName=").append(builder.appName)
.append(",structuredData=").append(builder.structuredData);
return sb.toString();
}
@Override
public void setKey(String key) {
// ignore
}
@Override
public String getKey() {
return builder.messageId;
}
@Override
public void setMap(Map<String, Object> map) {
// ignore
}
@Override
public Map<String, Object> getMap() {
return builder.map;
}
public static class Builder implements SyslogMessage.Builder {
LocalDateTime date;
InetAddress remoteAddress;
String rawMessage;
MessageType type;
Integer level;
Integer version;
Integer facility;
String host;
String message;
String processId;
String tag;
String messageId;
String appName;
List<org.xbib.datastructures.api.Builder> structuredData;
String deviceVendor;
String deviceProduct;
String deviceVersion;
String deviceEventClassId;
String name;
String severity;
Map<String, Object> map;
public Builder date(LocalDateTime date) {
this.date = date;
return this;
}
public Builder remoteAddress(InetAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
public Builder rawMessage(String rawMessage) {
this.rawMessage = rawMessage;
return this;
}
public Builder type(MessageType type) {
this.type = type;
return this;
}
public Builder level(Integer level) {
this.level = level;
return this;
}
public Builder version(Integer version) {
this.version = version;
return this;
}
public Builder facility(Integer facility) {
this.facility = facility;
return this;
}
public Builder host(String host) {
this.host = host;
return this;
}
public Builder message(String message) {
this.message = message;
return this;
}
public Builder processId(String processId) {
this.processId = processId;
return this;
}
public Builder tag(String tag) {
this.tag = tag;
return this;
}
public Builder messageId(String messageId) {
this.messageId = messageId;
return this;
}
public Builder appName(String appName) {
this.appName = appName;
return this;
}
public Builder structuredData(List<org.xbib.datastructures.api.Builder> structuredData) {
this.structuredData = structuredData;
return this;
}
public Builder deviceVendor(String deviceVendor) {
this.deviceVendor = deviceVendor;
return this;
}
public Builder deviceProduct(String deviceProduct) {
this.deviceProduct = deviceProduct;
return this;
}
public Builder deviceVersion(String deviceVersion) {
this.deviceVersion = deviceVersion;
return this;
}
public Builder deviceEventClassId(String deviceEventClassId) {
this.deviceEventClassId = deviceEventClassId;
return this;
}
public Builder name(String name) {
this.name = name;
return this;
}
public Builder severity(String severity) {
this.severity = severity;
return this;
}
public Builder map(Map<String, Object> map) {
this.map = map;
return this;
}
@Override
public SyslogMessage build() {
return new DefaultSyslogMessage(this);
}
}
}

View file

@ -0,0 +1,63 @@
package org.xbib.event.syslog;
import java.net.InetAddress;
import java.time.LocalDateTime;
public class DefaultSyslogRequest implements SyslogRequest {
private final Builder builder;
private DefaultSyslogRequest(Builder builder) {
this.builder = builder;
}
public static Builder builder() {
return new Builder();
}
@Override
public LocalDateTime receivedDate() {
return builder.receivedDate;
}
@Override
public InetAddress remoteAddress() {
return builder.remoteAddress;
}
@Override
public String rawMessage() {
return builder.rawMessage;
}
public static class Builder implements SyslogRequest.Builder {
LocalDateTime receivedDate;
InetAddress remoteAddress;
String rawMessage;
private Builder() {
}
public Builder receivedDate(LocalDateTime receivedDate) {
this.receivedDate = receivedDate;
return this;
}
public Builder remoteAddress(InetAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
public Builder rawMessage(String rawMessage) {
this.rawMessage = rawMessage;
return this;
}
public SyslogRequest build() {
return new DefaultSyslogRequest(this);
}
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright © 2018 Jeremy Custenborder (jcustenborder@gmail.com)
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xbib.event.syslog;
import io.netty.buffer.ByteBuf;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
class EncoderHelper {
public static final Charset CHARSET = StandardCharsets.UTF_8;
public static final byte[] LESS_THAN = "<".getBytes(CHARSET);
public static final byte[] GREATER_THAN = ">".getBytes(CHARSET);
public static final byte[] LEFT_SQUARE = "[".getBytes(CHARSET);
public static final byte[] RIGHT_SQUARE = "]".getBytes(CHARSET);
public static final byte[] SPACE = " ".getBytes(CHARSET);
public static final byte[] EQUALS = "=".getBytes(CHARSET);
public static void appendPriority(ByteBuf buffer, Message message) {
if (null != message.facility() && null != message.level()) {
Integer priority = Priority.priority(message.level(), message.facility());
buffer.writeBytes(LESS_THAN);
buffer.writeCharSequence(priority.toString(), CHARSET);
buffer.writeBytes(GREATER_THAN);
}
}
}

View file

@ -1,193 +0,0 @@
package org.xbib.event.syslog;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
/**
* Syslog facility as defined in <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 - The Syslog Protocol</a>.
* See <a href="http://tools.ietf.org/html/rfc5427">RFC 5427 - Textual Conventions for Syslog Management</a> for the {@link #label}.
*/
public enum Facility implements Comparable<Facility> {
/**
* Kernel messages, numerical code 0.
*/
KERN(0, "KERN"),
/**
* User-level messages, numerical code 1.
*/
USER(1, "USER"),
/**
* Mail system, numerical code 2.
*/
MAIL(2, "MAIL"),
/**
* System daemons, numerical code 3.
*/
DAEMON(3, "DAEMON"),
/**
* Security/authorization messages, numerical code 4.
*/
AUTH(4, "AUTH"),
/**
* Messages generated internally by syslogd, numerical code 5.
*/
SYSLOG(5, "SYSLOG"),
/**
* Line printer subsystem, numerical code 6.
*/
LPR(6, "LPR"),
/**
* Network news subsystem, numerical code 7.
*/
NEWS(7, "NEWS"),
/**
* UUCP subsystem, numerical code 8
*/
UUCP(8, "UUCP"),
/**
* Clock daemon, numerical code 9.
*/
CRON(9, "CRON"),
/**
* Security/authorization messages, numerical code 10.
*/
AUTHPRIV(10, "AUTHPRIV"),
/**
* FTP daemon, numerical code 11.
*/
FTP(11, "FTP"),
/**
* NTP subsystem, numerical code 12.
*/
NTP(12, "NTP"),
/**
* Log audit, numerical code 13.
*/
AUDIT(13, "AUDIT"),
/**
* Log alert, numerical code 14.
*/
ALERT(14, "ALERT"),
/**
* Clock daemon, numerical code 15.
*/
CLOCK(15, "CLOCK"),
/**
* Reserved for local use, numerical code 16.
*/
LOCAL0(16, "LOCAL0"),
/**
* Reserved for local use, numerical code 17.
*/
LOCAL1(17, "LOCAL1"),
/**
* Reserved for local use, numerical code 18.
*/
LOCAL2(18, "LOCAL2"),
/**
* Reserved for local use, numerical code 19.
*/
LOCAL3(19, "LOCAL3"),
/**
* Reserved for local use, numerical code 20.
*/
LOCAL4(20, "LOCAL4"),
/**
* Reserved for local use, numerical code 21.
*/
LOCAL5(21, "LOCAL5"),
/**
* Reserved for local use, numerical code 22.
*/
LOCAL6(22, "LOCAL6"),
/**
* Reserved for local use, numerical code 23.
*/
LOCAL7(23, "LOCAL7");
private final static Map<String, Facility> facilityFromLabel = new HashMap<String, Facility>();
private final static Map<Integer, Facility> facilityFromNumericalCode = new HashMap<Integer, Facility>();
static {
for (Facility facility : Facility.values()) {
facilityFromLabel.put(facility.label, facility);
facilityFromNumericalCode.put(facility.numericalCode, facility);
}
}
/**
* Syslog facility numerical code
*/
private final int numericalCode;
/**
* Syslog facility textual code. Not {@code null}
*/
private final String label;
private Facility(int numericalCode, String label) {
this.numericalCode = numericalCode;
this.label = label;
}
/**
* @param numericalCode Syslog facility numerical code
* @return Syslog facility, not {@code null}
* @throws IllegalArgumentException the given numericalCode is not a valid Syslog facility numerical code
*/
public static Facility fromNumericalCode(int numericalCode) throws IllegalArgumentException {
Facility facility = facilityFromNumericalCode.get(numericalCode);
if (facility == null) {
throw new IllegalArgumentException("Invalid facility '" + numericalCode + "'");
}
return facility;
}
/**
* @param label Syslog facility textual code. {@code null} or empty returns {@code null}
* @return Syslog facility, {@code null} if given value is {@code null}
* @throws IllegalArgumentException the given value is not a valid Syslog facility textual code
*/
public static Facility fromLabel(String label) throws IllegalArgumentException {
if (label == null || label.isEmpty()) {
return null;
}
Facility facility = facilityFromLabel.get(label);
if (facility == null) {
throw new IllegalArgumentException("Invalid facility '" + label + "'");
}
return facility;
}
/**
* Syslog facility numerical code
* @return numerical code
*/
public int numericalCode() {
return numericalCode;
}
/**
* Syslog facility textual code. Not {@code null}.
* @return label
*/
public String label() {
return label;
}
/**
* Compare on {@link Facility#numericalCode()}
* @return comparator for facilities
*/
public static Comparator<Facility> comparator() {
return new Comparator<Facility>() {
@Override
public int compare(Facility f1, Facility f2) {
return Integer.compare(f1.numericalCode, f2.numericalCode);
}
};
}
}

View file

@ -1,346 +0,0 @@
package org.xbib.event.syslog;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
class JsonParser {
private static final int DEFAULT_BUFFER_SIZE = 1024;
private final Reader reader;
private final char[] buf;
private int index;
private int fill;
private int ch;
private StringBuilder sb;
private int start;
public JsonParser(Reader reader) {
this(reader, DEFAULT_BUFFER_SIZE);
}
public JsonParser(Reader reader, int buffersize) {
this.reader = reader;
buf = new char[buffersize];
start = -1;
}
public Object parse() throws IOException {
read();
skipBlank();
Object result = parseValue();
skipBlank();
if (ch != -1) {
throw new IOException("unexpected character: " + ch);
}
return result;
}
private Object parseValue() throws IOException {
switch (ch) {
case 'n':
return parseNull();
case 't':
return parseTrue();
case 'f':
return parseFalse();
case '"':
return parseString();
case '[':
return parseList();
case '{':
return parseMap();
case '-':
case '+':
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
return parseNumber();
}
throw new IOException("value");
}
private List<Object> parseList() throws IOException {
read();
List<Object> list = new ArrayList<Object>();
skipBlank();
if (parseChar(']')) {
return list;
}
do {
skipBlank();
list.add(parseValue());
skipBlank();
} while (parseChar(','));
if (!parseChar(']')) {
expected("',' or ']'");
}
return list;
}
private Map<String, Object> parseMap() throws IOException {
read();
Map<String, Object> object = new LinkedHashMap<String, Object>();
skipBlank();
if (parseChar('}')) {
return object;
}
do {
skipBlank();
if (ch != '"') {
expected("name");
}
String name = parseString();
skipBlank();
if (!parseChar(':')) {
expected("':'");
}
skipBlank();
object.put(name, parseValue());
skipBlank();
} while (parseChar(','));
if (!parseChar('}')) {
expected("',' or '}'");
}
return object;
}
private Object parseNull() throws IOException {
read();
checkForChar('u');
checkForChar('l');
checkForChar('l');
return null;
}
private Object parseTrue() throws IOException {
read();
checkForChar('r');
checkForChar('u');
checkForChar('e');
return Boolean.TRUE;
}
private Object parseFalse() throws IOException {
read();
checkForChar('a');
checkForChar('l');
checkForChar('s');
checkForChar('e');
return Boolean.FALSE;
}
private void checkForChar(char ch) throws IOException {
if (!parseChar(ch)) {
expected("'" + ch + "'");
}
}
private String parseString() throws IOException {
read();
startCapture();
while (ch != '"') {
if (ch == '\\') {
pauseCapture();
parseEscaped();
startCapture();
} else if (ch < 0x20) {
expected("valid string character");
} else {
read();
}
}
String s = endCapture();
read();
return s;
}
private void parseEscaped() throws IOException {
read();
switch (ch) {
case '"':
case '/':
case '\\':
sb.append((char) ch);
break;
case 'b':
sb.append('\b');
break;
case 't':
sb.append('\t');
break;
case 'f':
sb.append('\f');
break;
case 'n':
sb.append('\n');
break;
case 'r':
sb.append('\r');
break;
case 'u':
char[] hex = new char[4];
for (int i = 0; i < 4; i++) {
read();
if (!isHexDigit()) {
expected("hexadecimal digit");
}
hex[i] = (char) ch;
}
sb.append((char) Integer.parseInt(String.valueOf(hex), 16));
break;
default:
expected("valid escape sequence");
}
read();
}
private Object parseNumber() throws IOException {
startCapture();
parseChar('-');
int firstDigit = ch;
if (!parseDigit()) {
expected("digit");
}
if (firstDigit != '0') {
while (parseDigit()) {
}
}
parseFraction();
parseExponent();
return endCapture();
}
private boolean parseFraction() throws IOException {
if (!parseChar('.')) {
return false;
}
if (!parseDigit()) {
expected("digit");
}
while (parseDigit()) {
}
return true;
}
private boolean parseExponent() throws IOException {
if (!parseChar('e') && !parseChar('E')) {
return false;
}
if (!parseChar('+')) {
parseChar('-');
}
if (!parseDigit()) {
expected("digit");
}
while (parseDigit()) {
}
return true;
}
private boolean parseChar(char ch) throws IOException {
if (this.ch != ch) {
return false;
}
read();
return true;
}
private boolean parseDigit() throws IOException {
if (!isDigit()) {
return false;
}
read();
return true;
}
private void skipBlank() throws IOException {
while (isWhiteSpace()) {
read();
}
}
private void read() throws IOException {
if (ch == -1) {
throw new IOException("unexpected end of input");
}
if (index == fill) {
if (start != -1) {
sb.append(buf, start, fill - start);
start = 0;
}
fill = reader.read(buf, 0, buf.length);
index = 0;
if (fill == -1) {
ch = -1;
return;
}
}
ch = buf[index++];
}
private void startCapture() {
if (sb == null) {
sb = new StringBuilder();
}
start = index - 1;
}
private void pauseCapture() {
int end = ch == -1 ? index : index - 1;
sb.append(buf, start, end - start);
start = -1;
}
private String endCapture() {
int end = ch == -1 ? index : index - 1;
String captured;
if (sb.length() > 0) {
sb.append(buf, start, end - start);
captured = sb.toString();
sb.setLength(0);
} else {
captured = new String(buf, start, end - start);
}
start = -1;
return captured;
}
private boolean isWhiteSpace() {
return ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r';
}
private boolean isDigit() {
return ch >= '0' && ch <= '9';
}
private boolean isHexDigit() {
return ch >= '0' && ch <= '9'
|| ch >= 'a' && ch <= 'f'
|| ch >= 'A' && ch <= 'F';
}
private void expected(String expected) throws IOException {
if (ch == -1) {
throw new IOException("unexpected end of input");
}
throw new IOException("expected " + expected);
}
}

View file

@ -0,0 +1,94 @@
package org.xbib.event.syslog;
import org.xbib.datastructures.api.Builder;
import org.xbib.event.Event;
import java.net.InetAddress;
import java.time.LocalDateTime;
import java.util.List;
/**
* Represents a standard syslog message.
*/
public interface Message extends Event {
/**
* Date of the message. This is the parsed date from the client.
*
* @return date of the message
*/
LocalDateTime date();
/**
* IP Address for the sender of the message.
*
* @return sender IP Address
*/
InetAddress remoteAddress();
/**
* Unprocessed copy of the message.
*
* @return Unprocessed message
*/
String rawMessage();
MessageType type();
/**
* Level for the message. Parsed from the message.
*
* @return message Level
*/
Integer level();
/**
* Version of the message.
*
* @return message version
*/
Integer version();
/**
* Facility of the message.
*
* @return message facility
*/
Integer facility();
/**
* Host of the message. This is the value from the message.
*
* @return message host
*/
String host();
/**
* Message part of the overall syslog message.
*
* @return message part of the overall syslog message
*/
String message();
String processId();
String tag();
String messageId();
String appName();
List<Builder> structuredData();
String deviceVendor();
String deviceProduct();
String deviceVersion();
String deviceEventClassId();
String name();
String severity();
}

View file

@ -0,0 +1,125 @@
package org.xbib.event.syslog;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
public class MessageEncoder extends MessageToMessageEncoder<Message> {
final DateTimeFormatter cefDateFormat;
final Charset charset;
final byte[] cef;
final byte[] pipe;
public MessageEncoder(DateTimeFormatter cefDateFormat) {
this.cefDateFormat = cefDateFormat;
this.charset = StandardCharsets.UTF_8;
this.cef = "CEF:0".getBytes(charset);
this.pipe = "|".getBytes(charset);
}
@Override
protected void encode(ChannelHandlerContext context, Message message, List<Object> list) {
switch (message.type()) {
case CEF:
encodeCEF(context, message, list);
break;
case RFC3164:
encodeRFC3164(context, message, list);
break;
case RFC5424:
encodeRFC5424(context, message, list);
break;
default:
break;
}
}
private void encodeCEF(ChannelHandlerContext context, Message message, List<Object> output) {
final ByteBuf buffer = context.alloc().buffer();
EncoderHelper.appendPriority(buffer, message);
buffer.writeCharSequence(cefDateFormat.format(message.date()), charset);
buffer.writeBytes(EncoderHelper.SPACE);
buffer.writeCharSequence(message.host(), charset);
buffer.writeBytes(EncoderHelper.SPACE);
buffer.writeBytes(cef);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.deviceVendor(), charset);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.deviceProduct(), charset);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.deviceVersion(), charset);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.deviceEventClassId(), charset);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.name(), charset);
buffer.writeBytes(pipe);
buffer.writeCharSequence(message.severity(), charset);
buffer.writeBytes(pipe);
int index = 0;
for (Map.Entry<String, Object> kvp : message.getMap().entrySet()) {
if (index > 0) {
buffer.writeBytes(EncoderHelper.SPACE);
}
buffer.writeCharSequence(kvp.getKey(), charset);
buffer.writeBytes(EncoderHelper.EQUALS);
buffer.writeCharSequence(kvp.getValue().toString(), charset);
index++;
}
output.add(buffer);
}
private void encodeRFC3164(ChannelHandlerContext context, Message message, List<Object> output) {
ByteBuf buffer = context.alloc().buffer();
EncoderHelper.appendPriority(buffer, message);
buffer.writeCharSequence(message.date().format(cefDateFormat), charset);
buffer.writeCharSequence(" ", charset);
buffer.writeCharSequence(message.host(), charset);
buffer.writeCharSequence(" ", charset);
buffer.writeCharSequence(message.tag(), charset);
if (message.processId() != null) {
buffer.writeCharSequence("[", charset);
buffer.writeCharSequence(message.processId(), charset);
buffer.writeCharSequence("]", charset);
}
buffer.writeCharSequence(": ", charset);
buffer.writeCharSequence(message.message(), charset);
output.add(buffer);
}
private void encodeRFC5424(ChannelHandlerContext context, Message message, List<Object> output) {
final ByteBuf buffer = context.alloc().buffer();
EncoderHelper.appendPriority(buffer, message);
if (message.version() != null) {
buffer.writeCharSequence(message.version().toString(), charset);
}
buffer.writeBytes(EncoderHelper.SPACE);
buffer.writeCharSequence(message.date().format(cefDateFormat), charset);
buffer.writeCharSequence(" ", charset);
buffer.writeCharSequence(message.host(), charset);
buffer.writeCharSequence(" ", charset);
if (message.appName() != null) {
buffer.writeCharSequence(message.appName(), charset);
}
if (message.processId() != null) {
buffer.writeCharSequence(message.processId(), charset);
} else {
buffer.writeCharSequence(" -", charset);
}
if (message.messageId() != null) {
buffer.writeCharSequence(message.messageId(), charset);
} else {
buffer.writeCharSequence(" -", charset);
}
buffer.writeCharSequence(" - ", charset);
buffer.writeCharSequence(message.message(), charset);
output.add(buffer);
}
}

View file

@ -0,0 +1,7 @@
package org.xbib.event.syslog;
public interface MessageKey {
String remoteAddress();
String host();
}

View file

@ -1,231 +1,140 @@
package org.xbib.event.syslog; package org.xbib.event.syslog;
import org.xbib.datastructures.api.Builder;
import org.xbib.datastructures.json.tiny.JsonBuilder;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.time.DateTimeException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit; import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAccessor;
import java.util.HashMap; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.Arrays;
import java.util.Map; import java.util.List;
import java.util.Locale;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.xbib.datastructures.api.Builder;
/** public abstract class MessageParser {
* Parses a syslog message with RFC 3164 or RFC 5424 date format
*/
public class MessageParser {
private final static Pattern TWO_SPACES = Pattern.compile(" "); private static final Logger log = Logger.getLogger(MessageParser.class.getName());
private final static DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME; private static final String NULL_TOKEN = "-";
private final static DateTimeFormatter rfc3164Format = protected final List<DateTimeFormatter> dateFormatters;
DateTimeFormatter.ofPattern("MMM d HH:mm:ss").withZone(ZoneId.of("UTC"));
private final static int RFC3164_LEN = 15; private final ThreadLocal<Matcher> matcherStructuredData;
private final static int RFC5424_PREFIX_LEN = 19; private final ThreadLocal<Matcher> matcherKeyValue;
private final DateTimeFormatter timeParser; private final ZoneId zoneId;
private final Map<String, LocalDateTime> timestampCache;
private final Map<String, String> fieldNames = new HashMap<>() {{
put("host", "host");
put("facility", "facility");
put("severity", "severity");
put("timestamp", "timestamp");
put("message", "message");
}};
private Map<String, Pattern> patterns;
public MessageParser() { public MessageParser() {
timeParser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(ZoneId.of("UTC")); this(ZoneId.of("UTC"));
timestampCache = new LinkedHashMap<>();
/*CacheBuilder.newBuilder().maximumSize(1000).build(
new CacheLoader<>() {
@Override
public LocalDateTime load(String key) {
return LocalDateTime.parse(key, timeParser);
}
});*/
} }
public MessageParser setPatterns(Map<String, Pattern> patterns) { public MessageParser(ZoneId zoneId) {
this.patterns = patterns; this.zoneId = zoneId;
return this; this.dateFormatters = Arrays.asList(DateTimeFormatter.ISO_OFFSET_DATE_TIME,
new DateTimeFormatterBuilder()
.appendPattern("MMM dd")
.optionalStart()
.appendPattern("[ yyyy]")
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1)
.optionalEnd()
.appendPattern(" HH:mm:ss")
.toFormatter(Locale.ROOT)
);
this.matcherStructuredData = initMatcher("\\[([^\\]]+)\\]");
this.matcherKeyValue = initMatcher("(?<key>\\S+)=\"(?<value>[^\"]+)\"|(?<id>\\S+)");
} }
public MessageParser setFieldName(String name, String newName) { /**
fieldNames.put(name, newName); * Method is used to parse an incoming syslog message.
return this; *
* @param request Incoming syslog request.
* @return Object to pass along the pipeline. Null if could not be parsed.
*/
public abstract Message parse(SyslogRequest request) throws IOException;
protected final ThreadLocal<Matcher> initMatcher(String pattern) {
return initMatcher(pattern, 0);
} }
@SuppressWarnings("unchecked") protected final ThreadLocal<Matcher> initMatcher(String pattern, int flags) {
public void parseMessage(String msg, Builder builder) throws IOException { final Pattern p = Pattern.compile(pattern, flags);
int msgLen = msg.length(); return new MatcherInheritableThreadLocal(p);
int pos = 0;
if (msg.charAt(pos) != '<') {
throw new IllegalArgumentException("bad format: invalid priority: cannot find open bracket '<' " + msg);
} }
int end = msg.indexOf('>');
if (end < 0 || end > 6) {
throw new IllegalArgumentException("bad format: invalid priority: cannot find end bracket '>' " + msg);
}
int pri = Integer.parseInt(msg.substring(1, end));
Facility facility = Facility.fromNumericalCode(pri / 8);
Severity severity = Severity.fromNumericalCode(pri % 8);
builder.field(fieldNames.get("facility"), facility.label())
.field(fieldNames.get("severity"), severity.label());
if (msgLen <= end + 1) {
throw new IllegalArgumentException("bad format: no data except priority " + msg);
}
pos = end + 1;
if (msgLen > pos + 2 && "1 ".equals(msg.substring(pos, pos + 2))) {
pos += 2;
}
TemporalAccessor timestamp;
char ch = msg.charAt(pos);
if (ch == '-') {
timestamp = LocalDateTime.now();
if (msgLen <= pos + 2) {
throw new IllegalArgumentException("bad syslog format (missing hostname)");
}
pos += 2;
} else if (ch >= 'A' && ch <= 'Z') {
if (msgLen <= pos + RFC3164_LEN) {
throw new IllegalArgumentException("bad timestamp format");
}
timestamp = parseRFC3164Time(msg.substring(pos, pos + RFC3164_LEN));
pos += RFC3164_LEN + 1;
} else {
int sp = msg.indexOf(' ', pos);
if (sp == -1) {
throw new IllegalArgumentException("bad timestamp format");
}
timestamp = parseRFC5424Date(msg.substring(pos, sp));
pos = sp + 1;
}
builder.field(fieldNames.get("timestamp"), formatter.format(timestamp));
int ns = msg.indexOf(' ', pos);
if (ns == -1) {
throw new IllegalArgumentException("bad syslog format (missing hostname)");
}
String hostname = msg.substring(pos, ns);
builder.field(fieldNames.get("host"), hostname);
String data; protected String nullableString(String groupText) {
if (msgLen > ns + 1) { return NULL_TOKEN.equals(groupText) ? null : groupText;
pos = ns + 1;
data = msg.substring(pos);
} else {
data = msg;
} }
protected LocalDateTime parseDate(String date) {
final String cleanDate = date.replaceAll("\\s+", " ");
LocalDateTime result = null;
for (DateTimeFormatter formatter : dateFormatters) {
try { try {
if (data.startsWith("@cee:")) { TemporalAccessor temporal = formatter.parseBest(cleanDate, OffsetDateTime::from, LocalDateTime::from);
data = data.substring(5); if (temporal instanceof LocalDateTime) {
result = ((LocalDateTime) temporal);
} else {
result = ((OffsetDateTime) temporal).withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime();
} }
JsonParser parser = new JsonParser(new StringReader(data)); if (result.getLong(ChronoField.YEAR_OF_ERA) == 1) {
Map<String,Object> map = (Map<String,Object>)parser.parse(); result = result.withYear(LocalDateTime.now(this.zoneId).getYear());
builder.buildMap(map); }
} catch (Throwable t) { break;
} catch (DateTimeException e) {
// ignore // ignore
} }
String message = fieldNames.get("message");
builder.field(message, data);
if (patterns != null) {
for (Map.Entry<String, Pattern> entry : patterns.entrySet()) {
Matcher m = entry.getValue().matcher(data);
if (m.find()) {
builder.field(entry.getKey(), m.group(1));
}
} }
if (result == null) {
log.log(Level.SEVERE, "could not parse date " + cleanDate);
} }
return result;
} }
private LocalDateTime parseRFC5424Date(String msg) { protected List<Builder> parseStructuredData(String structuredData) throws IOException {
int len = msg.length(); final Matcher matcher = matcherStructuredData.get().reset(structuredData);
if (len <= RFC5424_PREFIX_LEN) { final List<Builder> result = new ArrayList<>();
throw new IllegalArgumentException("bad format: not a valid RFC5424 timestamp: " + msg); while (matcher.find()) {
} final String input = matcher.group(1);
String timestampPrefix = msg.substring(0, RFC5424_PREFIX_LEN); Builder builder = JsonBuilder.builder();
LocalDateTime timestamp = timestampCache.get(timestampPrefix); final Matcher kvpMatcher = matcherKeyValue.get().reset(input);
int pos = RFC5424_PREFIX_LEN; while (kvpMatcher.find()) {
if (timestamp == null) { final String key = kvpMatcher.group("key");
throw new IllegalArgumentException("parse error: timestamp is null"); final String value = kvpMatcher.group("value");
} final String id = kvpMatcher.group("id");
if (msg.charAt(pos) == '.') { if (null != id && !id.isEmpty()) {
boolean found = false; builder.field("id", id);
int end = pos + 1;
if (len <= end) {
throw new IllegalArgumentException("bad timestamp format (no TZ)");
}
while (!found) {
char ch = msg.charAt(end);
if (ch >= '0' && ch <= '9') {
end++;
} else { } else {
found = true; builder.fieldIfNotNull(key, value);
} }
} }
if (end - (pos + 1) > 0) { result.add(builder);
long milliseconds = (long) (Double.parseDouble(msg.substring(pos, end)) * 1000.0);
timestamp.plus(milliseconds, ChronoUnit.MILLIS);
} else {
throw new IllegalArgumentException("bad format: invalid timestamp (fractional portion): " + msg);
} }
pos = end; return result;
}
char ch = msg.charAt(pos);
if (ch != 'Z') {
if (ch == '+' || ch == '-') {
if (len <= pos + 5) {
throw new IllegalArgumentException("bad format: invalid timezone: " + msg);
}
int sign = ch == '+' ? +1 : -1;
char[] hourzone = new char[5];
for (int i = 0; i < 5; i++) {
hourzone[i] = msg.charAt(pos + 1 + i);
}
if (hourzone[0] >= '0' && hourzone[0] <= '9'
&& hourzone[1] >= '0' && hourzone[1] <= '9'
&& hourzone[2] == ':'
&& hourzone[3] >= '0' && hourzone[3] <= '9'
&& hourzone[4] >= '0' && hourzone[4] <= '9') {
int hourOffset = Integer.parseInt(msg.substring(pos + 1, pos + 3));
int minOffset = Integer.parseInt(msg.substring(pos + 4, pos + 6));
timestamp.minus(sign * ((hourOffset * 60L) + minOffset) * 60000, ChronoUnit.MILLIS);
} else {
throw new IllegalArgumentException("bad format: invalid timezone: " + msg);
}
}
}
return timestamp;
} }
private LocalDateTime parseRFC3164Time(String timestamp) { static class MatcherInheritableThreadLocal extends InheritableThreadLocal<Matcher> {
LocalDateTime now = LocalDateTime.now(); private final Pattern pattern;
int year = now.getYear();
timestamp = TWO_SPACES.matcher(timestamp).replaceFirst(" "); MatcherInheritableThreadLocal(Pattern pattern) {
LocalDateTime date; this.pattern = pattern;
try {
date = LocalDateTime.parse(timestamp, rfc3164Format);
} catch (Exception e) {
return LocalDateTime.MIN;
} }
LocalDateTime fixed = date.withYear(year);
if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { @Override
fixed = date.withYear(year - 1); protected Matcher initialValue() {
} else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { return this.pattern.matcher("");
fixed = date.withYear(year + 1);
} }
return fixed;
} }
} }

View file

@ -0,0 +1,20 @@
package org.xbib.event.syslog;
public enum MessageType {
/**
* Message type for a message that could not be parsed.
*/
UNKNOWN,
/**
* Message type for a CEF message.
*/
CEF,
/**
* Message type for a rfc 3164 message.
*/
RFC3164,
/**
* Message type for a rfc 5424 message.
*/
RFC5424
}

View file

@ -0,0 +1,18 @@
package org.xbib.event.syslog;
class Priority {
private Priority() {
}
public static int facility(int priority) {
return priority >> 3;
}
public static int level(int priority, int facility) {
return priority - (facility << 3);
}
public static int priority(int level, int facility) {
return (facility * 8) + level;
}
}

View file

@ -0,0 +1,46 @@
package org.xbib.event.syslog;
import java.time.LocalDateTime;
import java.util.regex.Matcher;
public class RFC3164MessageParser extends MessageParser {
private static final String PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+((?<tag>[^\\[\\s\\]]+)(\\[(?<procid>\\d+)\\])?:)*\\s*(?<message>.+)$";
private final ThreadLocal<Matcher> matcherThreadLocal;
public RFC3164MessageParser() {
this.matcherThreadLocal = initMatcher(PATTERN);
}
@Override
public Message parse(SyslogRequest request) {
final Matcher matcher = matcherThreadLocal.get().reset(request.rawMessage());
if (!matcher.find()) {
return null;
}
final String groupPriority = matcher.group("priority");
final String groupDate = matcher.group("date");
final String groupHost = matcher.group("host");
final String groupMessage = matcher.group("message");
final String groupTag = matcher.group("tag");
final String groupProcId = matcher.group("procid");
final String processId = (groupProcId == null || groupProcId.isEmpty()) ? null : groupProcId;
final Integer priority = (groupPriority == null || groupPriority.isEmpty()) ? null : Integer.parseInt(groupPriority);
final Integer facility = null == priority ? null : Priority.facility(priority);
final Integer level = null == priority ? null : Priority.level(priority, facility);
final LocalDateTime date = parseDate(groupDate);
return DefaultSyslogMessage.builder()
.type(MessageType.RFC3164)
.rawMessage(request.rawMessage())
.remoteAddress(request.remoteAddress())
.date(date)
.host(groupHost)
.level(level)
.facility(facility)
.message(groupMessage)
.tag(groupTag)
.processId(processId)
.build();
}
}

View file

@ -0,0 +1,60 @@
package org.xbib.event.syslog;
import org.xbib.datastructures.api.Builder;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.regex.Matcher;
public class RFC5424MessageParser extends MessageParser {
private static final String PATTERN = "^<(?<priority>\\d+)>(?<version>\\d{1,3})\\s*(?<date>[0-9:+-TZ]+)\\s*(?<host>\\S+)\\s*(?<appname>\\S+)\\s*(?<procid>\\S+)\\s*(?<msgid>\\S+)\\s*(?<structureddata>(-|\\[.+\\]))\\s*(?<message>.+)$";
private final ThreadLocal<Matcher> matcherThreadLocal;
public RFC5424MessageParser() {
this.matcherThreadLocal = initMatcher(PATTERN);
}
@Override
public Message parse(SyslogRequest request) throws IOException {
final Matcher matcher = matcherThreadLocal.get().reset(request.rawMessage());
if (!matcher.find()) {
return null;
}
final String groupPriority = matcher.group("priority");
final String groupVersion = matcher.group("version");
final String groupDate = matcher.group("date");
final String groupHost = matcher.group("host");
final String groupAppName = matcher.group("appname");
final String groupProcID = matcher.group("procid");
final String groupMessageID = matcher.group("msgid");
final String groupStructuredData = matcher.group("structureddata");
final String groupMessage = matcher.group("message");
final int priority = Integer.parseInt(groupPriority);
final int facility = Priority.facility(priority);
final LocalDateTime date = parseDate(groupDate);
final int level = Priority.level(priority, facility);
final Integer version = Integer.parseInt(groupVersion);
final String appName = nullableString(groupAppName);
final String procID = nullableString(groupProcID);
final String messageID = nullableString(groupMessageID);
final List<Builder> structuredData = parseStructuredData(groupStructuredData);
return DefaultSyslogMessage.builder()
.type(MessageType.RFC5424)
.rawMessage(request.rawMessage())
.remoteAddress(request.remoteAddress())
.date(date)
.host(groupHost)
.level(level)
.facility(facility)
.message(groupMessage)
.version(version)
.processId(procID)
.messageId(messageID)
.structuredData(structuredData)
.appName(appName)
.build();
}
}

View file

@ -1,123 +0,0 @@
package org.xbib.event.syslog;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
/**
* Syslog severity as defined in <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 - The Syslog Protocol</a>.
*/
public enum Severity {
/**
* Emergency: system is unusable, numerical code 0.
*/
EMERGENCY(0, "EMERGENCY"),
/**
* Alert: action must be taken immediately, numerical code 1.
*/
ALERT(1, "ALERT"),
/**
* Critical: critical conditions, numerical code 2.
*/
CRITICAL(2, "CRITICAL"),
/**
* Error: error conditions, numerical code 3.
*/
ERROR(3, "ERROR"),
/**
* Warning: warning conditions, numerical code 4.
*/
WARNING(4, "WARNING"),
/**
* Notice: normal but significant condition, numerical code 5.
*/
NOTICE(5, "NOTICE"),
/**
* Informational: informational messages, numerical code 6.
*/
INFORMATIONAL(6, "INFORMATIONAL"),
/**
* Debug: debug-level messages, numerical code 7.
*/
DEBUG(7, "DEBUG");
private final static Map<String, Severity> severityFromLabel = new HashMap<String, Severity>();
private final static Map<Integer, Severity> severityFromNumericalCode = new HashMap<Integer, Severity>();
static {
for (Severity severity : Severity.values()) {
severityFromLabel.put(severity.label, severity);
severityFromNumericalCode.put(severity.numericalCode, severity);
}
}
private final int numericalCode;
private final String label;
private Severity(int numericalCode, String label) {
this.numericalCode = numericalCode;
this.label = label;
}
/**
* @param numericalCode Syslog severity numerical code
* @return Syslog severity, not {@code null}
* @throws IllegalArgumentException the given numericalCode is not a valid Syslog severity numerical code
*/
public static Severity fromNumericalCode(int numericalCode) throws IllegalArgumentException {
Severity severity = severityFromNumericalCode.get(numericalCode);
if (severity == null) {
throw new IllegalArgumentException("Invalid severity '" + numericalCode + "'");
}
return severity;
}
/**
* @param label Syslog severity textual code. {@code null} or empty returns {@code null}
* @return Syslog severity, {@code null} if given value is {@code null}
* @throws IllegalArgumentException the given value is not a valid Syslog severity textual code
*/
public static Severity fromLabel(String label) throws IllegalArgumentException {
if (label == null || label.isEmpty()) {
return null;
}
Severity severity = severityFromLabel.get(label);
if (severity == null) {
throw new IllegalArgumentException("Invalid severity '" + label + "'");
}
return severity;
}
/**
* Syslog severity numerical code
* @return numerical code
*/
public int numericalCode() {
return numericalCode;
}
/**
* Syslog severity textual code. Not {@code null}.
* @return the severity label
*/
public String label() {
return label;
}
/**
* Compare on {@link Severity#numericalCode()}
* @return comparator for severities
*/
public static Comparator<Severity> comparator() {
return new Comparator<Severity>() {
@Override
public int compare(Severity s1, Severity s2) {
return Integer.compare(s1.numericalCode, s2.numericalCode);
}
};
}
}

View file

@ -0,0 +1,43 @@
package org.xbib.event.syslog;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.io.file.FileFollowEventService;
import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class SyslogEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(SyslogEventManager.class.getName());
private final List<SyslogService> syslogServices;
public SyslogEventManager(Settings settings,
AsyncEventBus eventBus) {
this.syslogServices = new ArrayList<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) {
Settings definition = entry.getValue();
if (definition.getAsBoolean("enabled", true)) {
try {
SyslogService syslogService = new SyslogService(definition, eventBus);
syslogServices.add(syslogService);
logger.log(Level.INFO, "syslog service " + entry.getKey() + " added");
} catch (Exception e) {
logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e);
}
}
}
}
@Override
public void close() throws IOException {
for (SyslogService syslogService : syslogServices) {
syslogService.close();
}
}
}

View file

@ -0,0 +1,37 @@
package org.xbib.event.syslog;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
public class SyslogFrameDecoder extends LineBasedFrameDecoder {
final static ByteProcessor INTEGER = b -> b >= ((byte) 48) && b <= ((byte) 57);
public SyslogFrameDecoder(int maxLength) {
super(maxLength, true, false);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf b) throws Exception {
ByteBuf buffer = b.retain();
int lengthIndex = buffer.forEachByte(INTEGER);
final int digitCount = lengthIndex - buffer.readerIndex();
if (digitCount > 0) {
buffer.markReaderIndex();
final String frameLength = buffer.getCharSequence(buffer.readerIndex(), digitCount, CharsetUtil.UTF_8).toString();
buffer.skipBytes(digitCount + 1);
int length = Integer.parseInt(frameLength);
if (b.readerIndex() + length > b.writerIndex()) {
buffer.resetReaderIndex();
return null;
}
return buffer.slice(digitCount + 1, length);
} else {
return super.decode(ctx, buffer);
}
}
}

View file

@ -0,0 +1,25 @@
package org.xbib.event.syslog;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@ChannelHandler.Sharable
class SyslogIdleStateHandler extends ChannelDuplexHandler {
public static final SyslogIdleStateHandler INSTANCE = new SyslogIdleStateHandler();
private SyslogIdleStateHandler() {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent e) {
if (e.state() == IdleState.WRITER_IDLE) {
ctx.close();
}
}
}
}

View file

@ -0,0 +1,9 @@
package org.xbib.event.syslog;
public interface SyslogMessage extends Message {
interface Builder {
SyslogMessage build();
}
}

View file

@ -0,0 +1,48 @@
package org.xbib.event.syslog;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@ChannelHandler.Sharable
public class SyslogMessageHandler extends SimpleChannelInboundHandler<SyslogRequest> {
private final List<MessageParser> parsers;
public SyslogMessageHandler() {
this(Arrays.asList(new CEFMessageParser(), new RFC5424MessageParser(), new RFC3164MessageParser()));
}
public SyslogMessageHandler(List<MessageParser> parsers) {
this.parsers = parsers;
}
@Override
protected void channelRead0(ChannelHandlerContext context, SyslogRequest request) {
context.executor().submit(() -> {
for (MessageParser parser : parsers) {
try {
Object result = parser.parse(request);
if (result != null) {
context.fireChannelRead(result);
return;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
context.write(DefaultSyslogMessage.builder()
.type(MessageType.UNKNOWN)
.date(LocalDateTime.now())
.rawMessage(request.rawMessage())
.remoteAddress(request.remoteAddress())
.build());
});
}
}

View file

@ -0,0 +1,4 @@
package org.xbib.event.syslog;
public interface SyslogMessageKey extends MessageKey {
}

View file

@ -0,0 +1,39 @@
package org.xbib.event.syslog;
import java.net.InetAddress;
import java.time.LocalDateTime;
/**
* Interface represents an incoming syslog request. This interface acts as an intermediary between
* the TCPSyslogMessageDecoder and UDPSyslogMessageDecoder
*
* @see TCPSyslogMessageDecoder
* @see UDPSyslogMessageDecoder
*/
public interface SyslogRequest {
/**
* The time the message was received by Netty.
*
* @return The time the message was received by Netty.
*/
LocalDateTime receivedDate();
/**
* IP Address for the sender of the message.
*
* @return Sender IP Address
*/
InetAddress remoteAddress();
/**
* The raw message that was delivered
*
* @return Raw message.
*/
String rawMessage();
interface Builder {
SyslogRequest build();
}
}

View file

@ -1,249 +1,126 @@
package org.xbib.event.syslog; package org.xbib.event.syslog;
import java.io.IOException; import io.netty.bootstrap.Bootstrap;
import java.net.BindException; import io.netty.bootstrap.ServerBootstrap;
import java.net.InetAddress; import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress; import io.netty.channel.ChannelHandler;
import java.net.SocketAddress; import io.netty.channel.ChannelHandlerContext;
import java.time.format.DateTimeFormatter; import io.netty.channel.ChannelInitializer;
import java.util.HashMap; import io.netty.channel.ChannelOption;
import java.util.Map; import io.netty.channel.ChannelPipeline;
import java.util.concurrent.Executors; import io.netty.channel.EventLoopGroup;
import java.util.concurrent.atomic.AtomicReference; import io.netty.channel.SimpleChannelInboundHandler;
import java.util.logging.Logger; import io.netty.channel.nio.NioEventLoopGroup;
import java.util.regex.Pattern; import io.netty.channel.socket.DatagramChannel;
import org.xbib.datastructures.api.Builder; import io.netty.channel.socket.SocketChannel;
import org.xbib.datastructures.api.ByteSizeUnit; import io.netty.channel.socket.nio.NioDatagramChannel;
import org.xbib.datastructures.api.ByteSizeValue; import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.xbib.datastructures.api.TimeValue; import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
public class SyslogService { import java.io.Closeable;
/* import java.io.IOException;
private static final Logger logger = Logger.getLogger(SyslogService.class.getName()); import java.util.logging.Level;
import java.util.logging.Logger;
private final static String SYSLOG_HOST = "syslog.host"; public class SyslogService implements Closeable {
private final static String SYSLOG_PORT = "syslog.port"; private final int port;
private final static String SYSLOG_RECEIVE_BUFFER_SIZE = "receive_buffer_size"; private final Handler handler;
private final static String SYSLOG_PATTERNS = "patterns"; private final EventBus eventBus;
private final static String SYSLOG_FIELD_NAMES = "field_names"; private EventLoopGroup group;
private final String host; private EventLoopGroup bossGroup;
private final String port; private EventLoopGroup workerGroup;
private final ByteSizeValue receiveBufferSize; private ChannelFuture channelFuture;
private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; public SyslogService(Settings settings,
EventBus eventBus) {
private final MessageParser messageParser; this.port = settings.getAsInt("port", 1514);
this.handler = new Handler();
private DateTimeFormatter formatter; this.eventBus = eventBus;
private ConnectionlessBootstrap udpBootstrap;
private ServerBootstrap tcpBootstrap;
private Channel udpChannel;
private Channel tcpChannel;
public SyslogService(Settings settings) {
this.host = settings.get(SYSLOG_HOST, "127.0.0.1");
this.port = settings.get(SYSLOG_PORT, "9500-9600");
this.receiveBufferSize = settings.getAsBytesSize(SYSLOG_RECEIVE_BUFFER_SIZE, new ByteSizeValue(10, ByteSizeUnit.MB));
this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(receiveBufferSize.bytesAsInt());
Map<String, Object> map = (Map<String, Object>) settings.getAsStructuredMap().get(SYSLOG_PATTERNS);
Map<String, Pattern> patterns = new HashMap<>();
if (map != null) {
for (String key : map.keySet()) {
patterns.put(key, Pattern.compile((String) map.get(key)));
}
}
this.messageParser = new MessageParser().setPatterns(patterns);
map = (Map<String, Object>) settings.getAsStructuredMap().get(SYSLOG_FIELD_NAMES);
if (map != null) {
for (String key : map.keySet()) {
messageParser.setFieldName(key, (String) map.get(key));
}
}
logger.info("syslog server: host [" + host + "], port [" + port + "]");
} }
protected void doStart() throws Exception { public void startUdp() throws InterruptedException {
initializeUDP(); group = new NioEventLoopGroup();
initializeTCP(); Bootstrap bootstrap = new Bootstrap();
logger.info("syslog server up"); bootstrap.group(group)
} .channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
protected void doStop() throws ElasticsearchException { .handler(new ChannelInitializer<DatagramChannel>() {
if (udpChannel != null) {
udpChannel.close().awaitUninterruptibly();
}
if (udpBootstrap != null) {
udpBootstrap.releaseExternalResources();
}
if (tcpChannel != null) {
tcpChannel.close().awaitUninterruptibly();
}
if (tcpBootstrap != null) {
tcpBootstrap.releaseExternalResources();
}
bulkProcessor.close();
logger.info("syslog server down");
}
private void initializeUDP() {
udpBootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), 4));
udpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt());
udpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
udpBootstrap.setOption("broadcast", "false");
udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { protected void initChannel(DatagramChannel datagramChannel) {
return Channels.pipeline(new Handler("udp")); ChannelPipeline channelPipeline = datagramChannel.pipeline();
channelPipeline.addLast(
new UDPSyslogMessageDecoder(),
new SyslogMessageHandler(),
handler
);
} }
}); });
InetAddress address; this.channelFuture = bootstrap.bind(port).sync();
try {
address = NetworkUtils.resolveInetAddress(host, null);
} catch (IOException e) {
logger.warn("failed to resolve host {}", e, host);
return;
} }
final InetAddress hostAddress = address;
PortsRange portsRange = new PortsRange(port); public void startTcp() throws InterruptedException {
final AtomicReference<Exception> lastException = new AtomicReference<>(); bossGroup = new NioEventLoopGroup();
boolean success = portsRange.iterate(new PortsRange.PortCallback() { workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap= new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public boolean onPortNumber(int portNumber) { public void initChannel(SocketChannel ch) {
try { ch.pipeline().addLast(
udpChannel = udpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); new DelimiterBasedFrameDecoder(2000, true, Delimiters.lineDelimiter()),
} catch (Exception e) { new TCPSyslogMessageDecoder(),
lastException.set(e); new SyslogMessageHandler(),
return false; handler
);
} }
return true; })
} .option(ChannelOption.SO_BACKLOG, 128)
}); .childOption(ChannelOption.SO_KEEPALIVE, true);
if (!success) { this.channelFuture = serverBootstrap.bind(port).sync();
logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port);
return;
}
logger.info("UDP listener running, address {}", udpChannel.getLocalAddress());
}
private void initializeTCP() {
tcpBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
settings.getAsInt("tcp.worker", 4)));
tcpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt());
tcpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
tcpBootstrap.setOption("reuseAddress", settings.getAsBoolean("tcp.reuse_address", true));
tcpBootstrap.setOption("tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true));
tcpBootstrap.setOption("keepAlive", settings.getAsBoolean("tcp.keep_alive", true));
tcpBootstrap.setOption("child.receiveBufferSize", receiveBufferSize.bytesAsInt());
tcpBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
tcpBootstrap.setOption("child.reuseAddress", settings.getAsBoolean("tcp.reuse_address", true));
tcpBootstrap.setOption("child.tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true));
tcpBootstrap.setOption("child.keepAlive", settings.getAsBoolean("tcp.keep_alive", true));
tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Handler("tcp"));
}
});
InetAddress address;
try {
address = SyslogNetworkUtils.resolveInetAddress(host, null);
} catch (IOException e) {
logger.warn("failed to resolve host {}", e, host);
return;
}
final InetAddress hostAddress = address;
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {
tcpChannel = tcpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port);
return;
}
logger.info("TCP listener running, address {}", tcpChannel.getLocalAddress());
}
class Handler extends SimpleChannelUpstreamHandler {
private final String protocol;
Handler(String protocol) {
this.protocol = protocol;
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void close() throws IOException {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); if (group != null) {
XContentBuilder builder = jsonBuilder(); group.shutdownGracefully();
parse(ctx, buffer, builder);
IndexRequest indexRequest = new IndexRequest(isTimeWindow ? formatter.print(new DateTime()) : index)
.type(type)
.opType(IndexRequest.OpType.INDEX)
.source(builder);
try {
bulkProcessor.add(indexRequest);
} catch (Exception e1) {
logger.warn("failed to execute bulk request", e1);
} }
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (channelFuture != null) {
channelFuture.channel().close();
}
}
@ChannelHandler.Sharable
private class Handler extends SimpleChannelInboundHandler<Message> {
private static final Logger logger = Logger.getLogger(Handler.class.getName());
public Handler() {
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
if (e.getCause() instanceof BindException) { if (eventBus != null) {
// ignore, this happens when we retry binding to several ports, its fine if we fail... eventBus.post(msg);
return; } else {
} logger.log(Level.INFO, msg.toString());
logger.warn("failure caught", e.getCause());
throw new IOException(e.getCause());
}
private void parse(ChannelHandlerContext ctx, ChannelBuffer buffer, Builder builder) throws IOException {
SocketAddress localAddress = ctx.getChannel().getLocalAddress();
SocketAddress remoteAddress = ctx.getChannel().getRemoteAddress();
ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer);
try {
builder.startObject();
builder.field("protocol", protocol);
if (localAddress != null) {
builder.field("local", localAddress.toString());
}
if (remoteAddress != null) {
builder.field("remote", remoteAddress.toString());
}
messageParser.parseMessage(ref.toUtf8(), builder);
builder.endObject();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} }
} }
} }
*/
} }

View file

@ -0,0 +1,39 @@
package org.xbib.event.syslog;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
@ChannelHandler.Sharable
public class TCPSyslogMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
private final Charset charset;
public TCPSyslogMessageDecoder(Charset charset) {
this.charset = charset;
}
public TCPSyslogMessageDecoder() {
this(StandardCharsets.UTF_8);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> output) {
final InetSocketAddress socketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
final String rawMessage = byteBuf.toString(this.charset);
output.add(DefaultSyslogRequest.builder()
.receivedDate(LocalDateTime.now())
.remoteAddress(socketAddress.getAddress())
.rawMessage(rawMessage)
.build()
);
}
}

View file

@ -0,0 +1,40 @@
package org.xbib.event.syslog;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
@ChannelHandler.Sharable
public class UDPSyslogMessageDecoder extends MessageToMessageDecoder<DatagramPacket> {
private final Charset charset;
public UDPSyslogMessageDecoder(Charset charset) {
this.charset = charset;
}
public UDPSyslogMessageDecoder() {
this(StandardCharsets.UTF_8);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
DatagramPacket datagramPacket,
List<Object> output) {
final String rawMessage = datagramPacket.content().toString(this.charset);
final InetAddress inetAddress = datagramPacket.sender().getAddress();
output.add(DefaultSyslogRequest.builder()
.receivedDate(LocalDateTime.now())
.rawMessage(rawMessage)
.remoteAddress(inetAddress)
.build()
);
}
}

View file

@ -4,6 +4,7 @@ import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -17,4 +18,8 @@ public class TestClockEventConsumer implements EventConsumer {
void onEvent(TestClockEvent event) { void onEvent(TestClockEvent event) {
logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -5,6 +5,7 @@ import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import org.xbib.event.timer.TestTimerEvent; import org.xbib.event.timer.TestTimerEvent;
import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -16,6 +17,10 @@ public class TestFileFollowEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(TestFileFollowEvent event) { void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollw event path = " + event.getPath() + " content = " + event.getContent()); logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent());
}
@Override
public void close() throws IOException {
} }
} }

View file

@ -0,0 +1,20 @@
package org.xbib.event.syslog;
import org.junit.jupiter.api.Test;
import org.xbib.settings.Settings;
import java.io.IOException;
public class SyslogServiceTest {
@Test
public void testSyslogService() throws InterruptedException, IOException {
Settings settings = Settings.settingsBuilder()
.put("port", 1514)
.build();
SyslogService syslogService = new SyslogService(settings, null);
syslogService.startTcp();
Thread.sleep(60000L);
syslogService.close();
}
}

View file

@ -4,6 +4,7 @@ import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -17,4 +18,8 @@ public class TestTimerEventConsumer implements EventConsumer {
void onEvent(TestTimerEvent event) { void onEvent(TestTimerEvent event) {
logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }