From fc72c8f2530614a1527d959d2ab6838ad204ca6c Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Sun, 6 Nov 2016 21:30:45 +0100 Subject: [PATCH] Further improvements, still need for optimization --- .../orientdb_project/DummyImportService.java | 39 +++++++ .../de/hsh/inform/orientdb_project/Main.java | 10 +- .../netdata/AbstractNetdataImportService.java | 7 +- ...rformanceOrientDbNetdataImportService.java | 105 ++++++++++++++++++ .../orientdb/OrientDbHelperService.java | 15 ++- .../OrientDbNetdataImportService.java | 5 +- 6 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 src/main/java/de/hsh/inform/orientdb_project/DummyImportService.java create mode 100644 src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java diff --git a/src/main/java/de/hsh/inform/orientdb_project/DummyImportService.java b/src/main/java/de/hsh/inform/orientdb_project/DummyImportService.java new file mode 100644 index 0000000..9be8daa --- /dev/null +++ b/src/main/java/de/hsh/inform/orientdb_project/DummyImportService.java @@ -0,0 +1,39 @@ +package de.hsh.inform.orientdb_project; + +import org.pcap4j.packet.ArpPacket; +import org.pcap4j.packet.EthernetPacket; +import org.pcap4j.packet.IcmpV4CommonPacket; +import org.pcap4j.packet.IpV4Packet; +import org.pcap4j.packet.TcpPacket; +import org.pcap4j.packet.UdpPacket; + +import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; + +/** + * For benchmarking purposes only. Does not do a thing! + */ +public class DummyImportService extends AbstractNetdataImportService { + + + public DummyImportService(String filename) { + super(filename); + } + + public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { + super.handleEthernetPacket(ether, ts, ms); + } + + public void handleArpPacket(ArpPacket arp, long ts, int ms) {} + + public void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms) { + super.handleIpV4Packet(ipv4, ts, ms); + } + + public void handleUdpPacket(UdpPacket udp, long ts, int ms) {} + + public void handleTcpPacket(TcpPacket tcp, long ts, int ms) {} + + public void handleIcmpPacket(IcmpV4CommonPacket icmp, long ts, int ms) {} + + +} diff --git a/src/main/java/de/hsh/inform/orientdb_project/Main.java b/src/main/java/de/hsh/inform/orientdb_project/Main.java index c2ee046..42e9aca 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/Main.java +++ b/src/main/java/de/hsh/inform/orientdb_project/Main.java @@ -6,8 +6,9 @@ import java.util.concurrent.TimeoutException; import org.pcap4j.core.NotOpenException; import org.pcap4j.core.PcapNativeException; +import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; +import de.hsh.inform.orientdb_project.orientdb.LowPerformanceOrientDbNetdataImportService; import de.hsh.inform.orientdb_project.orientdb.OrientDbHelperService; -import de.hsh.inform.orientdb_project.orientdb.OrientDbNetdataImportService; public class Main { @@ -17,10 +18,11 @@ public class Main { odhs.setupSchema(); String filename = "/home/jpt/Temp/tcpdump_2"; - OrientDbNetdataImportService odbis = new OrientDbNetdataImportService(filename, odhs.getOrientGraphFactory().getNoTx()); + //AbstractNetdataImportService importService = new DummyImportService(filename); + AbstractNetdataImportService importService = new LowPerformanceOrientDbNetdataImportService(filename, odhs.getOrientGraphFactory().getNoTx()); try { - System.out.println("Begin import of data ..."); - odbis.run(); + System.out.println(System.currentTimeMillis() + ": Begin import of data ..."); + importService.run(); System.out.println("Import of data done!"); } catch (EOFException | PcapNativeException | TimeoutException | NotOpenException e) { e.printStackTrace(); diff --git a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java index 30a3128..6c57fb5 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java @@ -31,6 +31,7 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser public final void run() throws PcapNativeException, EOFException, TimeoutException, NotOpenException { PcapHandle handle = Pcaps.openOffline(this.filename); + long packetCounter = 0; for (;;) { Packet packet = handle.getNextPacketEx(); if(packet == null) break; @@ -38,6 +39,10 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser int ms = handle.getTimestampMicros(); EthernetPacket ether = packet.get(EthernetPacket.class); this.handleEthernetPacket(ether, ts, ms); + if(packetCounter % 1000 == 0) { + System.out.println(System.currentTimeMillis() + ": " + packetCounter); + } + packetCounter++; } } @@ -57,7 +62,7 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser public void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms) { IpNumber ipnum = ipv4.getHeader().getProtocol(); if (ipv4.getPayload() instanceof FragmentedPacket) { - System.out.println("Fragmented IP Packet!"); + //System.out.println("Fragmented IP Packet!"); } else if (ipnum.equals(IpNumber.TCP)) { TcpPacket tcp = ipv4.getPayload().get(TcpPacket.class); this.handleTcpPacket(tcp, ts, ms); diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java new file mode 100644 index 0000000..9e55e6e --- /dev/null +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/LowPerformanceOrientDbNetdataImportService.java @@ -0,0 +1,105 @@ +package de.hsh.inform.orientdb_project.orientdb; + +import org.pcap4j.packet.ArpPacket; +import org.pcap4j.packet.EthernetPacket; +import org.pcap4j.packet.IcmpV4CommonPacket; +import org.pcap4j.packet.IpV4Packet; +import org.pcap4j.packet.TcpPacket; +import org.pcap4j.packet.UdpPacket; + +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; + +import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; + +public class LowPerformanceOrientDbNetdataImportService extends AbstractNetdataImportService { + + private OrientGraphNoTx og; + + public LowPerformanceOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) { + super(filename); + this.og = orientGraph; + } + + public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { + Vertex ethernetFrame = this.og.addVertex("class:EthernetFrame"); + ethernetFrame.setProperty("sourceMac", ether.getHeader().getSrcAddr().toString()); + ethernetFrame.setProperty("targetMac", ether.getHeader().getDstAddr().toString()); + ethernetFrame.setProperty("rawData", ether.getRawData()); + ethernetFrame.setProperty("size", ether.getRawData().length); + ethernetFrame.setProperty("payloadSize", ether.getRawData().length - ether.getHeader().length()); + ethernetFrame.setProperty("timestamp", ts); + ethernetFrame.setProperty("microseconds", ms); + super.handleEthernetPacket(ether, ts, ms); + } + + public void handleArpPacket(ArpPacket arp, long ts, int ms) { + Vertex arpPacket = this.og.addVertex("class:ArpPacket"); + arpPacket.setProperty("size", arp.getRawData().length); + // TODO: Not finished yet! + arpPacket.setProperty("payloadSize", arp.getRawData().length - arp.getHeader().length()); + // Wire up to its ethernet frame + Iterable result = this.og.getVertices("EthernetFrame", new String[]{"microseconds", "timestamp"}, new Object[]{ms, ts}); + Vertex ethernetFrame = result.iterator().next(); + Edge containsEdge = this.og.addEdge("class:contains", ethernetFrame, arpPacket, "contains"); + Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", arpPacket, ethernetFrame, "isContainedIn"); + } + + public void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms) { + Vertex ipPacket = this.og.addVertex("class:IpPacket"); + ipPacket.setProperty("sourceIp", ipv4.getHeader().getSrcAddr().toString().split("/")[1]); + ipPacket.setProperty("targetIp", ipv4.getHeader().getDstAddr().toString().split("/")[1]); + ipPacket.setProperty("size", ipv4.getRawData().length); + ipPacket.setProperty("payloadSize", ipv4.getRawData().length - ipv4.getHeader().length()); + // Wire up to its ethernet frame + Iterable result = this.og.getVertices("EthernetFrame", new String[]{"microseconds", "timestamp"}, new Object[]{ms, ts}); + Vertex ethernetFrame = result.iterator().next(); + Edge containsEdge = this.og.addEdge("class:contains", ethernetFrame, ipPacket, "contains"); + Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", ipPacket, ethernetFrame, "isContainedIn"); + super.handleIpV4Packet(ipv4, ts, ms); + } + + public void handleUdpPacket(UdpPacket udp, long ts, int ms) { + Vertex udpPacket = this.og.addVertex("class:UdpPacket"); + udpPacket.setProperty("sourcePort", udp.getHeader().getSrcPort().valueAsInt()); + udpPacket.setProperty("targetPort", udp.getHeader().getDstPort().valueAsInt()); + udpPacket.setProperty("size", udp.getRawData().length); + udpPacket.setProperty("payloadSize", udp.getRawData().length - udp.getHeader().length()); + // Wire up to its ip packet + Iterable result = this.og.getVertices("EthernetFrame", new String[]{"microseconds", "timestamp"}, new Object[]{ms, ts}); + Vertex ethernetFrame = result.iterator().next(); + Vertex ipPacket = ethernetFrame.getEdges(Direction.OUT, "contains").iterator().next().getVertex(Direction.IN); + Edge containsEdge = this.og.addEdge("class:contains", ipPacket, udpPacket, "contains"); + Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", udpPacket, ipPacket, "isContainedIn"); + } + + public void handleTcpPacket(TcpPacket tcp, long ts, int ms) { + Vertex tcpPacket = this.og.addVertex("class:TcpPacket"); + tcpPacket.setProperty("sourcePort", tcp.getHeader().getSrcPort().valueAsInt()); + tcpPacket.setProperty("targetPort", tcp.getHeader().getDstPort().valueAsInt()); + tcpPacket.setProperty("size", tcp.getRawData().length); + tcpPacket.setProperty("payloadSize", tcp.getRawData().length - tcp.getHeader().length()); + // Wire up to its ip packet + Iterable result = this.og.getVertices("EthernetFrame", new String[]{"microseconds", "timestamp"}, new Object[]{ms, ts}); + Vertex ethernetFrame = result.iterator().next(); + Vertex ipPacket = ethernetFrame.getEdges(Direction.OUT, "contains").iterator().next().getVertex(Direction.IN); + Edge containsEdge = this.og.addEdge("class:contains", ipPacket, tcpPacket, "contains"); + Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", tcpPacket, ipPacket, "isContainedIn"); + } + + public void handleIcmpPacket(IcmpV4CommonPacket icmp, long ts, int ms) { + Vertex icmpPacket = this.og.addVertex("class:IcmpPacket"); + icmpPacket.setProperty("size", icmp.getRawData().length); + icmpPacket.setProperty("payloadSize", icmp.getRawData().length - icmp.getHeader().length()); + // Wire up to its ip packet + Iterable result = this.og.getVertices("EthernetFrame", new String[]{"microseconds", "timestamp"}, new Object[]{ms, ts}); + Vertex ethernetFrame = result.iterator().next(); + Vertex ipPacket = ethernetFrame.getEdges(Direction.OUT, "contains").iterator().next().getVertex(Direction.IN); + Edge containsEdge = this.og.addEdge("class:contains", ipPacket, icmpPacket, "contains"); + Edge isContainedInEdge = this.og.addEdge("class:isContainedIn", icmpPacket, ipPacket, "isContainedIn"); + } + + +} diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java index 49198a4..7dc0525 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java @@ -3,11 +3,13 @@ package de.hsh.inform.orientdb_project.orientdb; import java.io.IOException; import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.intent.OIntentMassiveInsert; import com.orientechnologies.orient.core.metadata.schema.OType; import com.tinkerpop.blueprints.impls.orient.OrientEdgeType; import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import com.tinkerpop.blueprints.impls.orient.OrientVertexType; +import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE; public class OrientDbHelperService { @@ -30,6 +32,8 @@ public class OrientDbHelperService { public OrientGraphFactory getOrientGraphFactory() { if(this.factory == null) { this.factory = new OrientGraphFactory(getDbUri(true), this.user, this.pass); + this.factory.declareIntent(new OIntentMassiveInsert()); + this.factory.setThreadMode(THREAD_MODE.ALWAYS_AUTOSET); } return this.factory; } @@ -46,18 +50,25 @@ public class OrientDbHelperService { //String storageType = "plocal"; String storageType = "memory"; // Drop old database and re-create it + OServerAdmin admin = null; try { - OServerAdmin admin = new OServerAdmin(getDbUri(false)); + admin = new OServerAdmin(getDbUri(false)); admin.connect(this.user, this.pass); admin.dropDatabase(this.db, storageType); admin.createDatabase(this.db, "graph", storageType); } catch (IOException e) { - e.printStackTrace(); + try { + admin.createDatabase(this.db, "graph", storageType); + } catch (IOException e1) { + e1.printStackTrace(); + System.exit(1); + } } } public void setupSchema() { OrientGraphNoTx og = this.getOrientGraphFactory().getNoTx(); + OrientVertexType ethernetFrameType = og.createVertexType("EthernetFrame", "V"); ethernetFrameType.createProperty("sourceMac", OType.STRING); ethernetFrameType.createProperty("targetMac", OType.STRING); diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbNetdataImportService.java index 498f475..ff7ac1b 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbNetdataImportService.java @@ -17,6 +17,7 @@ import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; public class OrientDbNetdataImportService extends AbstractNetdataImportService { private OrientGraphNoTx og; + public OrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) { super(filename); @@ -49,8 +50,8 @@ public class OrientDbNetdataImportService extends AbstractNetdataImportService { public void handleIpV4Packet(IpV4Packet ipv4, long ts, int ms) { Vertex ipPacket = this.og.addVertex("class:IpPacket"); - ipPacket.setProperty("sourceIp", ipv4.getHeader().getSrcAddr().getAddress().toString()); - ipPacket.setProperty("targetIp", ipv4.getHeader().getDstAddr().getAddress().toString()); + ipPacket.setProperty("sourceIp", ipv4.getHeader().getSrcAddr().toString().split("/")[1]); + ipPacket.setProperty("targetIp", ipv4.getHeader().getDstAddr().toString().split("/")[1]); ipPacket.setProperty("size", ipv4.getRawData().length); ipPacket.setProperty("payloadSize", ipv4.getRawData().length - ipv4.getHeader().length()); // Wire up to its ethernet frame