From bc1110058dce5134ab2edc5978a4a4b9e1533d18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Wed, 7 Aug 2019 09:35:46 +0200 Subject: [PATCH] add systemd jounral consumer --- .../org/xbib/systemd/DefaultJournalEntry.java | 426 ++++++++++++++++++ .../java/org/xbib/systemd/JournalEntry.java | 80 ++++ .../xbib/systemd/SystemdJournalConsumer.java | 138 +++++- .../xbib/systemd/SystemdJournalListener.java | 2 +- .../systemd/SystemdJournalReaderTest.java | 8 +- 5 files changed, 636 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/xbib/systemd/DefaultJournalEntry.java create mode 100644 src/main/java/org/xbib/systemd/JournalEntry.java diff --git a/src/main/java/org/xbib/systemd/DefaultJournalEntry.java b/src/main/java/org/xbib/systemd/DefaultJournalEntry.java new file mode 100644 index 0000000..5e39be7 --- /dev/null +++ b/src/main/java/org/xbib/systemd/DefaultJournalEntry.java @@ -0,0 +1,426 @@ +package org.xbib.systemd; + +public class DefaultJournalEntry implements JournalEntry { + + private String message; + + private String messageId; + + private int priority; + + private String codeFile; + + private String codeLine; + + private String codeFunc; + + private String errno; + + private String syslogFacility; + + private String syslogIdentifier; + + private String syslogPid; + + private String syslogTimestamp; + + private String syslogRaw; + + private String pid; + + private String uid; + + private String gid; + + private String comm; + + private String exe; + + private String cmdLine; + + private String capEffective; + + private String auditSession; + + private String auditLoginUid; + + private String systemdCgroup; + + private String systemdSlice; + + private String systemdUnit; + + private String systemdUserSlice; + + private String systemdUserUnit; + + private String systemdSession; + + private String systemdOwnerUid; + + private String selinuxContext; + + private String sourceRealtimeTimestamp; + + private String bootId; + + private String machineId; + + private String systemdInvocationId; + + private String hostname; + + private String transport; + + private String streamId; + + private String lineBreak; + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String getMessage() { + return message; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + @Override + public String getMessageId() { + return messageId; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + @Override + public int getPriority() { + return priority; + } + + public void setCodeFile(String codeFile) { + this.codeFile = codeFile; + } + + @Override + public String getCodeFile() { + return codeFile; + } + + public void setCodeLine(String codeLine) { + this.codeLine = codeLine; + } + + @Override + public String getCodeLine() { + return codeLine; + } + + public void setCodeFunc(String codeFunc) { + this.codeFunc = codeFunc; + } + + @Override + public String getCodeFunc() { + return codeFunc; + } + + public void setErrno(String errno) { + this.errno = errno; + } + + @Override + public String getErrno() { + return errno; + } + + public void setSyslogFacility(String syslogFacility) { + this.syslogFacility = syslogFacility; + } + + @Override + public String getSyslogFacility() { + return syslogFacility; + } + + public void setSyslogIdentifier(String syslogIdentifier) { + this.syslogIdentifier = syslogIdentifier; + } + + @Override + public String getSyslogIdentifier() { + return syslogIdentifier; + } + + public void setSyslogPid(String syslogPid) { + this.syslogPid = syslogPid; + } + + @Override + public String getSyslogPid() { + return syslogPid; + } + + public void setSyslogTimestamp(String syslogTimestamp) { + this.syslogTimestamp = syslogTimestamp; + } + + @Override + public String getSyslogTimestamp() { + return syslogTimestamp; + } + + public void setSyslogRaw(String syslogRaw) { + this.syslogRaw = syslogRaw; + } + + @Override + public String getSyslogRaw() { + return syslogRaw; + } + + public void setPid(String pid) { + this.pid = pid; + } + + @Override + public String getPid() { + return pid; + } + + public void setUid(String uid) { + this.uid = uid; + } + + @Override + public String getUid() { + return uid; + } + + public void setGid(String gid) { + this.gid = gid; + } + + @Override + public String getGid() { + return gid; + } + + public void setComm(String comm) { + this.comm = comm; + } + + @Override + public String getComm() { + return comm; + } + + public void setExe(String exe) { + this.exe = exe; + } + + @Override + public String getExe() { + return exe; + } + + public void setCmdLine(String cmdline) { + this.cmdLine = cmdline; + } + + @Override + public String getCmdLine() { + return cmdLine; + } + + public void setCapEffective(String capEffective) { + this.capEffective = capEffective; + } + + @Override + public String getCapEffective() { + return capEffective; + } + + public void setAuditSession(String auditSession) { + this.auditSession = auditSession; + } + + @Override + public String getAuditSession() { + return auditSession; + } + + public void setAuditLoginUid(String auditLoginUid) { + this.auditLoginUid = auditLoginUid; + } + + @Override + public String getAuditLoginUid() { + return auditLoginUid; + } + + public void setSystemdCgroup(String systemdCgroup) { + this.systemdCgroup = systemdCgroup; + } + + @Override + public String getSystemdCgroup() { + return systemdCgroup; + } + + public void setSystemdSlice(String systemdSlice) { + this.systemdSlice = systemdSlice; + } + + @Override + public String getSystemdSlice() { + return systemdSlice; + } + + public void setSystemdUnit(String systemdUnit) { + this.systemdUnit = systemdUnit; + } + + @Override + public String getSystemdUnit() { + return systemdUnit; + } + + public void setSystemdUserSlice(String systemdUserSlice) { + this.systemdUserSlice = systemdUserSlice; + } + + @Override + public String getSystemdUserSlice() { + return systemdUserSlice; + } + + public void setSystemdUserUnit(String systemdUserUnit) { + this.systemdUserUnit = systemdUserUnit; + } + + @Override + public String getSystemdUserUnit() { + return systemdUserUnit; + } + + public void setSystemdSession(String systemdSession) { + this.systemdSession = systemdSession; + } + + @Override + public String getSystemdSession() { + return systemdSession; + } + + public void setSystemdOwnerUid(String systemdOwnerUid) { + this.systemdOwnerUid = systemdOwnerUid; + } + + @Override + public String getSystemdOwnerUid() { + return systemdOwnerUid; + } + + public void setSelinuxContext(String selinuxContext) { + this.selinuxContext = selinuxContext; + } + + @Override + public String getSelinuxContext() { + return selinuxContext; + } + + public void setSourceRealtimeTimestamp(String sourceRealtimeTimestamp) { + this.sourceRealtimeTimestamp = sourceRealtimeTimestamp; + } + + @Override + public String getSourceRealtimeTimestamp() { + return sourceRealtimeTimestamp; + } + + public void setBootId(String bootId) { + this.bootId = bootId; + } + + @Override + public String getBootId() { + return bootId; + } + + public void setMachineId(String machineId) { + this.machineId = machineId; + } + + @Override + public String getMachineId() { + return machineId; + } + + public void setSystemdInvocationId(String systemdInvocationId) { + this.systemdInvocationId = systemdInvocationId; + } + + @Override + public String getSystemdInvocationId() { + return systemdInvocationId; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + @Override + public String getHostname() { + return hostname; + } + + public void setTransport(String transport) { + this.transport = transport; + } + + @Override + public String getTransport() { + return transport; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + @Override + public String getStreamId() { + return streamId; + } + + public void setLineBreak(String lineBreak) { + this.lineBreak = lineBreak; + } + + @Override + public String getLineBreak() { + return lineBreak; + } + + @Override + public String getFieldValue(String fieldName) { + return null; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("priority=") + .append(this.priority) + .append(",message=") + .append(this.message); + return sb.toString(); + } +} diff --git a/src/main/java/org/xbib/systemd/JournalEntry.java b/src/main/java/org/xbib/systemd/JournalEntry.java new file mode 100644 index 0000000..d6fc899 --- /dev/null +++ b/src/main/java/org/xbib/systemd/JournalEntry.java @@ -0,0 +1,80 @@ +package org.xbib.systemd; + +public interface JournalEntry { + + String getMessage(); + + String getMessageId(); + + int getPriority(); + + String getCodeFile(); + + String getCodeLine(); + + String getCodeFunc(); + + String getErrno(); + + String getSyslogFacility(); + + String getSyslogIdentifier(); + + String getSyslogPid(); + + String getSyslogTimestamp(); + + String getSyslogRaw(); + + String getPid(); + + String getUid(); + + String getGid(); + + String getComm(); + + String getExe(); + + String getCmdLine(); + + String getCapEffective(); + + String getAuditSession(); + + String getAuditLoginUid(); + + String getSystemdCgroup(); + + String getSystemdSlice(); + + String getSystemdUnit(); + + String getSystemdUserSlice(); + + String getSystemdUserUnit(); + + String getSystemdSession(); + + String getSystemdOwnerUid(); + + String getSelinuxContext(); + + String getSourceRealtimeTimestamp(); + + String getBootId(); + + String getMachineId(); + + String getSystemdInvocationId(); + + String getHostname(); + + String getTransport(); + + String getStreamId(); + + String getLineBreak(); + + String getFieldValue(String fieldName); +} diff --git a/src/main/java/org/xbib/systemd/SystemdJournalConsumer.java b/src/main/java/org/xbib/systemd/SystemdJournalConsumer.java index 00e7c71..65194a7 100644 --- a/src/main/java/org/xbib/systemd/SystemdJournalConsumer.java +++ b/src/main/java/org/xbib/systemd/SystemdJournalConsumer.java @@ -5,6 +5,7 @@ import org.bridj.SizeT; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -19,6 +20,10 @@ public class SystemdJournalConsumer implements Runnable { private final SystemdJournalListener listener; + public SystemdJournalConsumer(String match, SystemdJournalListener listener) { + this(match, null, listener); + } + public SystemdJournalConsumer(String match, String field, SystemdJournalListener listener) { this.match = match; this.field = field; @@ -27,10 +32,14 @@ public class SystemdJournalConsumer implements Runnable { @Override public void run() { - loop(); + try { + loop(); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } } - private void loop() { + private void loop() throws IOException { Pointer> sdJournalPointer = Pointer.allocatePointer(SystemdJournalLibrary.sdJournal.class); int r = SystemdJournalLibrary.sd_journal_open(sdJournalPointer, SystemdJournalLibrary.SD_JOURNAL_LOCAL_ONLY); if (r < 0) { @@ -54,17 +63,18 @@ public class SystemdJournalConsumer implements Runnable { do { r = SystemdJournalLibrary.sd_journal_wait(sdJournal, -1); } while (r == 0); // NOP - List list = new ArrayList<>(); while (SystemdJournalLibrary.sd_journal_next(sdJournal) > 0) { if (field != null) { Pointer> dataPointer = Pointer.allocatePointer(); Pointer sizePointer = Pointer.allocateSizeT(); r = SystemdJournalLibrary.sd_journal_get_data(sdJournal, Pointer.pointerToCString(field), dataPointer, sizePointer); if (r < 0) { - logger.log(Level.WARNING, "error in get_data: " + r); + throw new IOException("error in get_data: " + r); } Pointer data = dataPointer.as(Byte.class); - list.add(data.getPointer(Byte.class).getCString()); + if (listener != null) { + listener.handleEntry(makeEntry(Collections.singletonList(data.getPointer(Byte.class).getCString()))); + } } else { cursorPointer = Pointer.allocatePointer(Byte.TYPE); SystemdJournalLibrary.sd_journal_get_cursor(sdJournal, cursorPointer); @@ -73,24 +83,124 @@ public class SystemdJournalConsumer implements Runnable { existingCursor = newCursor; Pointer> dataPointer = Pointer.allocatePointer(); Pointer sizePointer = Pointer.allocateSizeT(); + List list = new ArrayList<>(); while (SystemdJournalLibrary.sd_journal_enumerate_data(sdJournal, dataPointer, sizePointer) > 0) { Pointer data = dataPointer.as(Byte.class); String line = data.getPointer(Byte.class).getCString(); list.add(line); } SystemdJournalLibrary.sd_journal_restart_data(sdJournal); - } - } - } - if (listener != null) { - for (String string : list) { - try { - listener.handleMessage(string); - } catch (IOException e) { - logger.log(Level.WARNING, e.getMessage(), e); + if (listener != null) { + listener.handleEntry(makeEntry(list)); + } } } } } } + + private JournalEntry makeEntry(List list) { + DefaultJournalEntry journalEntry = new DefaultJournalEntry(); + for (String string : list) { + logger.log(Level.INFO, "entry: " + string); + if (string.startsWith("MESSAGE=")) { + journalEntry.setMessage(string.substring(8)); + continue; + } + if (string.startsWith("MESSAGE_ID=")) { + journalEntry.setMessageId(string.substring(11)); + continue; + } + if (string.startsWith("PRIORITY=")) { + journalEntry.setPriority(Integer.parseInt(string.substring(9))); + continue; + } + if (string.startsWith("CODE_FILE=")) { + journalEntry.setCodeFile(string.substring(10)); + continue; + } + if (string.startsWith("CODE_LINE=")) { + journalEntry.setCodeLine(string.substring(10)); + continue; + } + if (string.startsWith("CODE_FUNC=")) { + journalEntry.setCodeFunc(string.substring(10)); + continue; + } + if (string.startsWith("ERRNO=")) { + journalEntry.setErrno(string.substring(6)); + continue; + } + if (string.startsWith("SYSLOG_FACILITY=")) { + journalEntry.setSyslogFacility(string.substring(16)); + continue; + } + if (string.startsWith("SYSLOG_IDENTIFIER=")) { + journalEntry.setSyslogIdentifier(string.substring(18)); + continue; + } + if (string.startsWith("SYSLOG_PID=")) { + journalEntry.setSyslogPid(string.substring(11)); + continue; + } + if (string.startsWith("SYSLOG_TIMESTAMP=")) { + journalEntry.setSyslogTimestamp(string.substring(17)); + continue; + } + if (string.startsWith("SYSLOG_RAW=")) { + journalEntry.setSyslogRaw(string.substring(11)); + continue; + } + if (string.startsWith("_PID=")) { + journalEntry.setPid(string.substring(5)); + continue; + } + if (string.startsWith("_UID=")) { + journalEntry.setUid(string.substring(5)); + continue; + } + if (string.startsWith("_GID=")) { + journalEntry.setGid(string.substring(5)); + continue; + } + if (string.startsWith("_COMM=")) { + journalEntry.setComm(string.substring(6)); + continue; + } + if (string.startsWith("_EXE=")) { + journalEntry.setExe(string.substring(5)); + continue; + } + if (string.startsWith("_CMDLINE=")) { + journalEntry.setCmdLine(string.substring(9)); + continue; + } + if (string.startsWith("_CAP_EFFECTIVE=")) { + journalEntry.setCapEffective(string.substring(15)); + continue; + } + if (string.startsWith("_TRANSPORT=")) { + journalEntry.setTransport(string.substring(11)); + continue; + } + if (string.startsWith("_SYSTEMD_OWNER_UID=")) { + journalEntry.setSystemdOwnerUid(string.substring(19)); + continue; + } + if (string.startsWith("_SYSTEMD_UNIT=")) { + journalEntry.setSystemdUnit(string.substring(13)); + continue; + } + if (string.startsWith("_SYSTEMD_USER_SLICE=")) { + journalEntry.setSystemdUserSlice(string.substring(19)); + continue; + } + if (string.startsWith("_SYSTEMD_USER_UNIT=")) { + journalEntry.setSystemdUserUnit(string.substring(18)); + continue; + } + + } + return journalEntry; + } } diff --git a/src/main/java/org/xbib/systemd/SystemdJournalListener.java b/src/main/java/org/xbib/systemd/SystemdJournalListener.java index bff89d1..93daa59 100644 --- a/src/main/java/org/xbib/systemd/SystemdJournalListener.java +++ b/src/main/java/org/xbib/systemd/SystemdJournalListener.java @@ -4,5 +4,5 @@ import java.io.IOException; public interface SystemdJournalListener { - void handleMessage(String message) throws IOException; + void handleEntry(JournalEntry entry) throws IOException; } diff --git a/src/test/java/org/xbib/systemd/SystemdJournalReaderTest.java b/src/test/java/org/xbib/systemd/SystemdJournalReaderTest.java index af1f698..a5d2fff 100644 --- a/src/test/java/org/xbib/systemd/SystemdJournalReaderTest.java +++ b/src/test/java/org/xbib/systemd/SystemdJournalReaderTest.java @@ -5,18 +5,20 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import java.util.concurrent.Executors; +import java.util.logging.Level; import java.util.logging.Logger; @DisabledOnOs({OS.MAC, OS.WINDOWS}) -public class SystemdJournalReaderTest { +class SystemdJournalReaderTest { private static final Logger logger = Logger.getLogger(SystemdJournalReaderTest.class.getName()); @Test void testConsumer() throws InterruptedException { - SystemdJournalConsumer consumer = new SystemdJournalConsumer("SYSLOG_IDENTIFIER=su", null, - logger::info); + SystemdJournalConsumer consumer = new SystemdJournalConsumer("SYSLOG_IDENTIFIER=su", + entry -> logger.log(Level.INFO, entry.toString())); Executors.newSingleThreadExecutor().submit(consumer); + // exit after 1 minute Thread.sleep(60000L); } }