From f1838d8ce3ca7e68646093944baab33386d47376 Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Thu, 24 Nov 2016 14:04:50 +0100 Subject: [PATCH] Add working connection tracking --- ...anceKappaOrientDbNetdataImportService.java | 108 +++++++++++++----- .../orientdb/TcpConnection.java | 30 +++++ 2 files changed, 108 insertions(+), 30 deletions(-) diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java index dbc80bb..5dc5833 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java @@ -34,6 +34,8 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe private Vertex tcpPacket; private Vertex icmpPacket; + private long packetCounter; + public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) { super(filename); this.og = orientGraph; @@ -42,6 +44,15 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe } public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { + this.packetCounter++; + if(this.packetCounter > 2000) { + for(LinkedList connList : this.knownTcpConnections.values()) { + for(TcpConnection conn : connList) { + System.out.println(conn.toString()); + } + } + System.exit(0); + } // Clean up state vars before processing the next ethernet frame this.ethernetFrame = null; this.arpPacket = null; @@ -117,6 +128,27 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe // Wire up to its ip packet Edge containsEdge = this.og.addEdge("class:contains", this.ipPacket, this.tcpPacket, "contains"); Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", this.tcpPacket, this.ipPacket, "isContainedIn"); + // Track tcp connections + TcpConnection tcpConnection = this.getTcpConnectionFor(tcp); + // If connection exists and still "up to date" aka time difference < 1s + if(tcpConnection != null && (ts - tcpConnection.endTs <= 1)) { + // Update tcpConnection data + if(tcpConnection.sourceIp.equals(this.ipPacket.getProperty("sourceIp"))) { + // SourceIp -> TargetIp + tcpConnection.addVolumeSourceToTarget(tcp.getRawData().length - tcp.getHeader().length()); + } else { + // TargetIp -> SourceIp + tcpConnection.addVolumeTargetToSource(tcp.getRawData().length - tcp.getHeader().length()); + } + tcpConnection.setEnd(ts, ms); + } else { + // Else create a new one and add it to the list. + String sourceIp = this.ipPacket.getProperty("sourceIp"); + String targetIp = this.ipPacket.getProperty("targetIp"); + tcpConnection = new TcpConnection(tcp, sourceIp, targetIp, ts, ms); + this.addKnownTcpConnectionFor(tcpConnection, tcp); + } + } public void handleIcmpPacket(IcmpV4CommonPacket icmp, long ts, int ms) { @@ -148,47 +180,63 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe } } - private TcpConnection getOrCreateTcpConnectionFor(TcpPacket tcp) { + private String buildConnectionKey(String source, String target) { + int comp = source.compareTo(target); + if(comp > 0) { + return source + "-" + target; + } else if(comp < 0) { + return target + "-" + source; + } else { + // This should NEVER happen! + throw new RuntimeException("I told you so, this was not impossible!"); + } + } + + private TcpConnection getTcpConnectionFor(TcpPacket tcp) { + String source = ""; + String target = ""; String sourceIp = this.ipPacket.getProperty("sourceIp"); String targetIp = this.ipPacket.getProperty("targetIp"); - String sourcePort = tcp.getHeader().getSrcPort().toString(); - String targetPort = tcp.getHeader().getDstPort().toString(); - StringBuilder sb = new StringBuilder(); - sb.append(sourceIp); - sb.append(":"); - sb.append(sourcePort); - sb.append("-"); - sb.append(targetIp); - sb.append(":"); - sb.append(targetPort); - String connectionKey = sb.toString(); + String sourcePort = tcp.getHeader().getSrcPort().valueAsString(); + String targetPort = tcp.getHeader().getDstPort().valueAsString(); + source = sourceIp + ":" + sourcePort; + target = targetIp + ":" + targetPort; + String connectionKey = this.buildConnectionKey(source, target); TcpConnection tcpConnection = null; LinkedList connectionList = null; // Get or create tcp connection list for connection key if(this.knownTcpConnections.containsKey(connectionKey)) { connectionList = this.knownTcpConnections.get(connectionKey); - } else { - connectionList = new LinkedList(); - } - // Get last connection from list and check if it is still ongoing (aka endTs = 0 and endMs = 0) - if(!connectionList.isEmpty() && connectionList.getLast() != null && connectionList.getLast().endTs == 0 && connectionList.getLast().endMs == 0) { - // Use existing connection if not ended yet - tcpConnection = connectionList.getLast(); - } else { - // Else create a new one and add it to the list. - tcpConnection = new TcpConnection(); - tcpConnection.setStart(this.ethernetFrame.getProperty("timestamp"), this.ethernetFrame.getProperty("milliseconds")); - tcpConnection.sourceIp = sourceIp; - tcpConnection.sourcePort = tcp.getHeader().getSrcPort().valueAsInt(); - tcpConnection.targetIp = targetIp; - tcpConnection.targetPort = tcp.getHeader().getDstPort().valueAsInt(); - connectionList.add(tcpConnection); - // Put connection into list of known tcp connections - this.knownTcpConnections.put(connectionKey, connectionList); + // Get last connection from list + if(!connectionList.isEmpty()) { + // Use existing connection if not ended yet + tcpConnection = connectionList.getLast(); + } } return tcpConnection; } + private void addKnownTcpConnectionFor(TcpConnection tcpConnection, TcpPacket tcp) { + String source = ""; + String target = ""; + String sourceIp = this.ipPacket.getProperty("sourceIp"); + String targetIp = this.ipPacket.getProperty("targetIp"); + String sourcePort = tcp.getHeader().getSrcPort().valueAsString(); + String targetPort = tcp.getHeader().getDstPort().valueAsString(); + source = sourceIp + ":" + sourcePort; + target = targetIp + ":" + targetPort; + String connectionKey = this.buildConnectionKey(source, target); + LinkedList connectionList = null; + if(this.knownTcpConnections.containsKey(connectionKey)) { + connectionList = this.knownTcpConnections.get(connectionKey); + } else { + connectionList = new LinkedList(); + this.knownTcpConnections.put(connectionKey, connectionList); + } + // Put connection into list of known tcp connections + connectionList.addLast(tcpConnection); + } + public void afterImport() { // TODO: Insert all TcpConnections! System.out.println("Fertig!"); diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java index a8e61df..d65468d 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java @@ -1,5 +1,7 @@ package de.hsh.inform.orientdb_project.orientdb; +import org.pcap4j.packet.TcpPacket; + public class TcpConnection { public long startTs; @@ -18,6 +20,15 @@ public class TcpConnection { public long volumeTargetToSource; + public TcpConnection(TcpPacket tcp, String sourceIp, String targetIp, long ts, int ms) { + this.setStart(ts, ms); + this.setEnd(ts, ms); + this.sourceIp = sourceIp; + this.sourcePort = tcp.getHeader().getSrcPort().valueAsInt(); + this.targetIp = targetIp; + this.targetPort = tcp.getHeader().getDstPort().valueAsInt(); + } + public void setStart(long ts, int ms) { this.startTs = ts; this.startMs = ms; @@ -39,5 +50,24 @@ public class TcpConnection { public long getTotalVolume() { return this.volumeSourceToTarget + this.volumeTargetToSource; } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append(this.startTs); + sb.append("."); + sb.append(this.startMs); + sb.append("] "); + sb.append(this.sourceIp); + sb.append(":"); + sb.append(this.sourcePort); + sb.append(" -> "); + sb.append(this.targetIp); + sb.append(":"); + sb.append(this.targetPort); + sb.append(" -- "); + sb.append(this.getTotalVolume()); + return sb.toString(); + } }