Add working connection tracking

This commit is contained in:
Jan Philipp Timme 2016-11-24 14:04:50 +01:00
parent 2662a810fb
commit f1838d8ce3
Signed by untrusted user: JPT
GPG Key ID: 5F2C85EC6F3754B7
2 changed files with 108 additions and 30 deletions

View File

@ -34,6 +34,8 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
private Vertex tcpPacket;
private Vertex icmpPacket;
private long packetCounter;
public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) {
super(filename);
this.og = orientGraph;
@ -42,6 +44,15 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
}
public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) {
this.packetCounter++;
if(this.packetCounter > 2000) {
for(LinkedList<TcpConnection> connList : this.knownTcpConnections.values()) {
for(TcpConnection conn : connList) {
System.out.println(conn.toString());
}
}
System.exit(0);
}
// Clean up state vars before processing the next ethernet frame
this.ethernetFrame = null;
this.arpPacket = null;
@ -117,6 +128,27 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
// Wire up to its ip packet
Edge containsEdge = this.og.addEdge("class:contains", this.ipPacket, this.tcpPacket, "contains");
Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", this.tcpPacket, this.ipPacket, "isContainedIn");
// Track tcp connections
TcpConnection tcpConnection = this.getTcpConnectionFor(tcp);
// If connection exists and still "up to date" aka time difference < 1s
if(tcpConnection != null && (ts - tcpConnection.endTs <= 1)) {
// Update tcpConnection data
if(tcpConnection.sourceIp.equals(this.ipPacket.getProperty("sourceIp"))) {
// SourceIp -> TargetIp
tcpConnection.addVolumeSourceToTarget(tcp.getRawData().length - tcp.getHeader().length());
} else {
// TargetIp -> SourceIp
tcpConnection.addVolumeTargetToSource(tcp.getRawData().length - tcp.getHeader().length());
}
tcpConnection.setEnd(ts, ms);
} else {
// Else create a new one and add it to the list.
String sourceIp = this.ipPacket.getProperty("sourceIp");
String targetIp = this.ipPacket.getProperty("targetIp");
tcpConnection = new TcpConnection(tcp, sourceIp, targetIp, ts, ms);
this.addKnownTcpConnectionFor(tcpConnection, tcp);
}
}
public void handleIcmpPacket(IcmpV4CommonPacket icmp, long ts, int ms) {
@ -148,47 +180,63 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
}
}
private TcpConnection getOrCreateTcpConnectionFor(TcpPacket tcp) {
private String buildConnectionKey(String source, String target) {
int comp = source.compareTo(target);
if(comp > 0) {
return source + "-" + target;
} else if(comp < 0) {
return target + "-" + source;
} else {
// This should NEVER happen!
throw new RuntimeException("I told you so, this was not impossible!");
}
}
private TcpConnection getTcpConnectionFor(TcpPacket tcp) {
String source = "";
String target = "";
String sourceIp = this.ipPacket.getProperty("sourceIp");
String targetIp = this.ipPacket.getProperty("targetIp");
String sourcePort = tcp.getHeader().getSrcPort().toString();
String targetPort = tcp.getHeader().getDstPort().toString();
StringBuilder sb = new StringBuilder();
sb.append(sourceIp);
sb.append(":");
sb.append(sourcePort);
sb.append("-");
sb.append(targetIp);
sb.append(":");
sb.append(targetPort);
String connectionKey = sb.toString();
String sourcePort = tcp.getHeader().getSrcPort().valueAsString();
String targetPort = tcp.getHeader().getDstPort().valueAsString();
source = sourceIp + ":" + sourcePort;
target = targetIp + ":" + targetPort;
String connectionKey = this.buildConnectionKey(source, target);
TcpConnection tcpConnection = null;
LinkedList<TcpConnection> connectionList = null;
// Get or create tcp connection list for connection key
if(this.knownTcpConnections.containsKey(connectionKey)) {
connectionList = this.knownTcpConnections.get(connectionKey);
} else {
connectionList = new LinkedList<TcpConnection>();
}
// Get last connection from list and check if it is still ongoing (aka endTs = 0 and endMs = 0)
if(!connectionList.isEmpty() && connectionList.getLast() != null && connectionList.getLast().endTs == 0 && connectionList.getLast().endMs == 0) {
// Use existing connection if not ended yet
tcpConnection = connectionList.getLast();
} else {
// Else create a new one and add it to the list.
tcpConnection = new TcpConnection();
tcpConnection.setStart(this.ethernetFrame.getProperty("timestamp"), this.ethernetFrame.getProperty("milliseconds"));
tcpConnection.sourceIp = sourceIp;
tcpConnection.sourcePort = tcp.getHeader().getSrcPort().valueAsInt();
tcpConnection.targetIp = targetIp;
tcpConnection.targetPort = tcp.getHeader().getDstPort().valueAsInt();
connectionList.add(tcpConnection);
// Put connection into list of known tcp connections
this.knownTcpConnections.put(connectionKey, connectionList);
// Get last connection from list
if(!connectionList.isEmpty()) {
// Use existing connection if not ended yet
tcpConnection = connectionList.getLast();
}
}
return tcpConnection;
}
private void addKnownTcpConnectionFor(TcpConnection tcpConnection, TcpPacket tcp) {
String source = "";
String target = "";
String sourceIp = this.ipPacket.getProperty("sourceIp");
String targetIp = this.ipPacket.getProperty("targetIp");
String sourcePort = tcp.getHeader().getSrcPort().valueAsString();
String targetPort = tcp.getHeader().getDstPort().valueAsString();
source = sourceIp + ":" + sourcePort;
target = targetIp + ":" + targetPort;
String connectionKey = this.buildConnectionKey(source, target);
LinkedList<TcpConnection> connectionList = null;
if(this.knownTcpConnections.containsKey(connectionKey)) {
connectionList = this.knownTcpConnections.get(connectionKey);
} else {
connectionList = new LinkedList<TcpConnection>();
this.knownTcpConnections.put(connectionKey, connectionList);
}
// Put connection into list of known tcp connections
connectionList.addLast(tcpConnection);
}
public void afterImport() {
// TODO: Insert all TcpConnections!
System.out.println("Fertig!");

View File

@ -1,5 +1,7 @@
package de.hsh.inform.orientdb_project.orientdb;
import org.pcap4j.packet.TcpPacket;
public class TcpConnection {
public long startTs;
@ -18,6 +20,15 @@ public class TcpConnection {
public long volumeTargetToSource;
public TcpConnection(TcpPacket tcp, String sourceIp, String targetIp, long ts, int ms) {
this.setStart(ts, ms);
this.setEnd(ts, ms);
this.sourceIp = sourceIp;
this.sourcePort = tcp.getHeader().getSrcPort().valueAsInt();
this.targetIp = targetIp;
this.targetPort = tcp.getHeader().getDstPort().valueAsInt();
}
public void setStart(long ts, int ms) {
this.startTs = ts;
this.startMs = ms;
@ -40,4 +51,23 @@ public class TcpConnection {
return this.volumeSourceToTarget + this.volumeTargetToSource;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(this.startTs);
sb.append(".");
sb.append(this.startMs);
sb.append("] ");
sb.append(this.sourceIp);
sb.append(":");
sb.append(this.sourcePort);
sb.append(" -> ");
sb.append(this.targetIp);
sb.append(":");
sb.append(this.targetPort);
sb.append(" -- ");
sb.append(this.getTotalVolume());
return sb.toString();
}
}