From 2662a810fbf30585bd7d006fcedc7ce377147a47 Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Thu, 24 Nov 2016 11:52:29 +0100 Subject: [PATCH] Start implementing tcp connection tracking --- .../netdata/AbstractNetdataImportService.java | 3 +- .../netdata/NetdataResultObserver.java | 2 + ...anceKappaOrientDbNetdataImportService.java | 87 ++++++++++++++++++- ...rformanceOrientDbNetdataImportService.java | 3 + .../orientdb/OrientDbHelperService.java | 6 +- .../orientdb/TcpConnection.java | 43 +++++++++ 6 files changed, 136 insertions(+), 8 deletions(-) create mode 100644 src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java diff --git a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java index 6c57fb5..579ca97 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java @@ -43,7 +43,8 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser System.out.println(System.currentTimeMillis() + ": " + packetCounter); } packetCounter++; - } + } + this.afterImport(); } public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { diff --git a/src/main/java/de/hsh/inform/orientdb_project/netdata/NetdataResultObserver.java b/src/main/java/de/hsh/inform/orientdb_project/netdata/NetdataResultObserver.java index a2537d2..db77f90 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/netdata/NetdataResultObserver.java +++ b/src/main/java/de/hsh/inform/orientdb_project/netdata/NetdataResultObserver.java @@ -21,4 +21,6 @@ public interface NetdataResultObserver { public abstract void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms); + public abstract void afterImport(); + } diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java index 54ab7b0..dbc80bb 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java @@ -1,5 +1,9 @@ package de.hsh.inform.orientdb_project.orientdb; +import java.net.Inet4Address; +import java.util.HashMap; +import java.util.LinkedList; + import org.pcap4j.packet.ArpPacket; import org.pcap4j.packet.EthernetPacket; import org.pcap4j.packet.IcmpV4CommonPacket; @@ -7,11 +11,9 @@ import org.pcap4j.packet.IpV4Packet; import org.pcap4j.packet.TcpPacket; import org.pcap4j.packet.UdpPacket; -import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; -import com.tinkerpop.blueprints.impls.orient.OrientVertex; import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; @@ -19,6 +21,12 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe private OrientGraphNoTx og; + // HashMap that contains all already known hosts (aka already inserted into database) + private HashMap knownHosts; + + // To keep track of tcp connections + private HashMap> knownTcpConnections; + private Vertex ethernetFrame; private Vertex arpPacket; private Vertex ipPacket; @@ -29,6 +37,8 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) { super(filename); this.og = orientGraph; + this.knownHosts = new HashMap(); + this.knownTcpConnections = new HashMap>(); } public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { @@ -65,9 +75,14 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe } public void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms) { + Inet4Address sourceIp = ipv4.getHeader().getSrcAddr(); + Inet4Address targetIp = ipv4.getHeader().getDstAddr(); + // Add hosts to database if new + this.addHostIfNew(sourceIp); + this.addHostIfNew(targetIp); Object[] arguments = { - "sourceIp", ipv4.getHeader().getSrcAddr().toString().split("/")[1], - "targetIp", ipv4.getHeader().getDstAddr().toString().split("/")[1], + "sourceIp", sourceIp.toString().split("/")[1], + "targetIp", targetIp.toString().split("/")[1], "size", ipv4.getRawData().length, "payloadSize", ipv4.getRawData().length - ipv4.getHeader().length(), }; @@ -114,6 +129,70 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe Edge containsEdge = this.og.addEdge("class:contains", this.ipPacket, this.icmpPacket, "contains"); Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", this.icmpPacket, this.ipPacket, "isContainedIn"); } + + private void addHostIfNew(Inet4Address ipAddress) { + if(this.knownHosts.containsKey(ipAddress)) { + // Host already known, nothing to do! + return; + } else { + // Check internal/external by IP + boolean isInternal = ipAddress.isSiteLocalAddress(); // TODO: VERIFY IF THIS IS CORRECT! + // Create Vertex and add to HashMap + String ipAddressStr = ipAddress.toString().split("/")[1]; + Object[] arguments = { + "ipAddress", ipAddressStr, + "internal", isInternal, + }; + Vertex host = this.og.addVertex("class:Host", arguments); + this.knownHosts.put(ipAddressStr, host); + } + } + + private TcpConnection getOrCreateTcpConnectionFor(TcpPacket tcp) { + String sourceIp = this.ipPacket.getProperty("sourceIp"); + String targetIp = this.ipPacket.getProperty("targetIp"); + String sourcePort = tcp.getHeader().getSrcPort().toString(); + String targetPort = tcp.getHeader().getDstPort().toString(); + StringBuilder sb = new StringBuilder(); + sb.append(sourceIp); + sb.append(":"); + sb.append(sourcePort); + sb.append("-"); + sb.append(targetIp); + sb.append(":"); + sb.append(targetPort); + String connectionKey = sb.toString(); + TcpConnection tcpConnection = null; + LinkedList connectionList = null; + // Get or create tcp connection list for connection key + if(this.knownTcpConnections.containsKey(connectionKey)) { + connectionList = this.knownTcpConnections.get(connectionKey); + } else { + connectionList = new LinkedList(); + } + // Get last connection from list and check if it is still ongoing (aka endTs = 0 and endMs = 0) + if(!connectionList.isEmpty() && connectionList.getLast() != null && connectionList.getLast().endTs == 0 && connectionList.getLast().endMs == 0) { + // Use existing connection if not ended yet + tcpConnection = connectionList.getLast(); + } else { + // Else create a new one and add it to the list. + tcpConnection = new TcpConnection(); + tcpConnection.setStart(this.ethernetFrame.getProperty("timestamp"), this.ethernetFrame.getProperty("milliseconds")); + tcpConnection.sourceIp = sourceIp; + tcpConnection.sourcePort = tcp.getHeader().getSrcPort().valueAsInt(); + tcpConnection.targetIp = targetIp; + tcpConnection.targetPort = tcp.getHeader().getDstPort().valueAsInt(); + connectionList.add(tcpConnection); + // Put connection into list of known tcp connections + this.knownTcpConnections.put(connectionKey, connectionList); + } + return tcpConnection; + } + + public void afterImport() { + // TODO: Insert all TcpConnections! + System.out.println("Fertig!"); + } } diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java index 9e55e6e..44e950d 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java @@ -14,6 +14,9 @@ import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; +/** + * This service is incomplete! DO NOT USE! + */ public class LowPerformanceOrientDbNetdataImportService extends AbstractNetdataImportService { private OrientGraphNoTx og; diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java index 6125ea5..175c7ef 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java @@ -121,9 +121,9 @@ public class OrientDbHelperService { tcpConnectionType.createProperty("end", OType.DATETIME); tcpConnectionType.createProperty("sourcePort", OType.INTEGER); tcpConnectionType.createProperty("targetPort", OType.INTEGER); - tcpConnectionType.createProperty("volumeSourceToTarget", OType.INTEGER); - tcpConnectionType.createProperty("volumeTargetToSource", OType.INTEGER); - tcpConnectionType.createProperty("totalVolume", OType.INTEGER); + tcpConnectionType.createProperty("volumeSourceToTarget", OType.LONG); + tcpConnectionType.createProperty("volumeTargetToSource", OType.LONG); + tcpConnectionType.createProperty("totalVolume", OType.LONG); OrientEdgeType isContainedInType = og.createEdgeType("isContainedIn", "E"); isContainedInType.setDescription("isContainedIn"); diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java new file mode 100644 index 0000000..a8e61df --- /dev/null +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/TcpConnection.java @@ -0,0 +1,43 @@ +package de.hsh.inform.orientdb_project.orientdb; + +public class TcpConnection { + + public long startTs; + public int startMs; + + public long endTs; + public int endMs; + + public String sourceIp; + public int sourcePort; + + public String targetIp; + public int targetPort; + + public long volumeSourceToTarget; + public long volumeTargetToSource; + + + public void setStart(long ts, int ms) { + this.startTs = ts; + this.startMs = ms; + } + + public void setEnd(long ts, int ms) { + this.endTs = ts; + this.endMs = ms; + } + + public void addVolumeSourceToTarget(long vol) { + this.volumeSourceToTarget += vol; + } + + public void addVolumeTargetToSource(long vol) { + this.volumeTargetToSource += vol; + } + + public long getTotalVolume() { + return this.volumeSourceToTarget + this.volumeTargetToSource; + } + +}