From c797d33fd8aaa5f8efa39663fc864255cb67a70c Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Mon, 28 Nov 2016 19:17:15 +0100 Subject: [PATCH] Some refactoring and more optimization attempts --- .../de/hsh/inform/orientdb_project/Main.java | 23 ++++-- .../netdata/AbstractNetdataImportService.java | 24 +++++- ...anceKappaOrientDbNetdataImportService.java | 19 ++--- .../orientdb/OrientDbHelperService.java | 74 +++++++++++++------ 4 files changed, 92 insertions(+), 48 deletions(-) diff --git a/src/main/java/de/hsh/inform/orientdb_project/Main.java b/src/main/java/de/hsh/inform/orientdb_project/Main.java index b3defab..fb78031 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/Main.java +++ b/src/main/java/de/hsh/inform/orientdb_project/Main.java @@ -6,7 +6,6 @@ import java.util.concurrent.TimeoutException; import org.pcap4j.core.NotOpenException; import org.pcap4j.core.PcapNativeException; -import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService; @@ -16,24 +15,32 @@ import de.hsh.inform.orientdb_project.orientdb.OrientDbHelperService; public class Main { public static void main(String[] args) { - OrientDbHelperService odhs = new OrientDbHelperService("127.0.0.1", "hshtest", "root", "root"); + // TODO: Make this configurable or easy to exchange. + String filename = "/home/jpt/Temp/tcpdump_2"; + OrientDbHelperService odhs = new OrientDbHelperService("192.168.0.110", "hshtest", "root", "root"); + + // Clean up existing database and set up schema from scratch odhs.cleanUpServer(); odhs.setupSchema(); - - String filename = "/home/jpt/Temp/tcpdump_2"; - OrientGraphNoTx ogf = odhs.getOrientGraphFactory().getNoTx(); - ogf.setThreadMode(THREAD_MODE.MANUAL); + // Get "handle" for database to pass to import service + OrientGraphNoTx ogf = odhs.getOrientGraphNoTx(); + //AbstractNetdataImportService importService = new DummyImportService(filename); //AbstractNetdataImportService importService = new LowPerformanceOrientDbNetdataImportService(filename, ogf); AbstractNetdataImportService importService = new HighPerformanceKappaOrientDbNetdataImportService(filename, ogf); + + // Go go gadget import service! try { - System.out.println(System.currentTimeMillis() + ": Begin import of data ..."); - importService.run(); + System.out.println(System.currentTimeMillis()/1000L + ": Begin import of data ..."); + importService.partialRun(12000); System.out.println("Import of data done!"); } catch (EOFException | PcapNativeException | TimeoutException | NotOpenException e) { e.printStackTrace(); } + // Done + odhs.close(); + System.out.println("End of program."); } } diff --git a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java index 579ca97..4da4c13 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/netdata/AbstractNetdataImportService.java @@ -25,13 +25,25 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser private String filename; + private long packetCounter; + private long packetLimit; + + private boolean limitedImportRun; + public AbstractNetdataImportService(String filename) { this.filename = filename; + this.limitedImportRun = false; + } + + public final void partialRun(long packetLimit) throws EOFException, PcapNativeException, TimeoutException, NotOpenException { + this.packetLimit = packetLimit; + this.limitedImportRun = true; + this.run(); } public final void run() throws PcapNativeException, EOFException, TimeoutException, NotOpenException { PcapHandle handle = Pcaps.openOffline(this.filename); - long packetCounter = 0; + this.packetCounter = 0; for (;;) { Packet packet = handle.getNextPacketEx(); if(packet == null) break; @@ -39,10 +51,14 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser int ms = handle.getTimestampMicros(); EthernetPacket ether = packet.get(EthernetPacket.class); this.handleEthernetPacket(ether, ts, ms); - if(packetCounter % 1000 == 0) { - System.out.println(System.currentTimeMillis() + ": " + packetCounter); + if(this.packetCounter % 1000 == 0) { + System.out.println(System.currentTimeMillis()/1000L + ": " + this.packetCounter); + } + this.packetCounter++; + if(this.limitedImportRun && this.packetCounter > this.packetLimit) { + System.out.println("Limited import run done. Breaking."); + break; } - packetCounter++; } this.afterImport(); } diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java index 5dc5833..cca48b3 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/HighPerformanceKappaOrientDbNetdataImportService.java @@ -34,8 +34,6 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe private Vertex tcpPacket; private Vertex icmpPacket; - private long packetCounter; - public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) { super(filename); this.og = orientGraph; @@ -44,15 +42,6 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe } public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) { - this.packetCounter++; - if(this.packetCounter > 2000) { - for(LinkedList connList : this.knownTcpConnections.values()) { - for(TcpConnection conn : connList) { - System.out.println(conn.toString()); - } - } - System.exit(0); - } // Clean up state vars before processing the next ethernet frame this.ethernetFrame = null; this.arpPacket = null; @@ -239,7 +228,13 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe public void afterImport() { // TODO: Insert all TcpConnections! - System.out.println("Fertig!"); + System.out.println("All done. Processing collected TcpConnections ..."); + for(LinkedList connList : this.knownTcpConnections.values()) { + for(TcpConnection conn : connList) { + // TODO + System.out.println(conn.toString()); + } + } } diff --git a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java index 175c7ef..1b8329f 100644 --- a/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java +++ b/src/main/java/de/hsh/inform/orientdb_project/orientdb/OrientDbHelperService.java @@ -3,13 +3,14 @@ package de.hsh.inform.orientdb_project.orientdb; import java.io.IOException; import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.intent.OIntentMassiveInsert; import com.orientechnologies.orient.core.metadata.schema.OType; +import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE; import com.tinkerpop.blueprints.impls.orient.OrientEdgeType; import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import com.tinkerpop.blueprints.impls.orient.OrientVertexType; -import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE; public class OrientDbHelperService { @@ -29,14 +30,32 @@ public class OrientDbHelperService { this.factory = null; } + public OrientGraphFactory getOrientGraphFactory() { if(this.factory == null) { this.factory = new OrientGraphFactory(getDbUri(true), this.user, this.pass); + // Settings concerning import performance (?) this.factory.declareIntent(new OIntentMassiveInsert()); - this.factory.setThreadMode(THREAD_MODE.ALWAYS_AUTOSET); + this.factory.setThreadMode(THREAD_MODE.MANUAL); + this.factory.setAutoStartTx(false); + this.factory.setKeepInMemoryReferences(false); + this.factory.setRequireTransaction(false); + this.factory.setUseLog(false); + this.factory.setupPool(1, 1); } + // Return the factory return this.factory; } + + public void close() { + this.getOrientGraphFactory().close(); + } + + public OrientGraphNoTx getOrientGraphNoTx() { + OrientGraphNoTx ogf = this.getOrientGraphFactory().getNoTx(); + return ogf; + + } public String getDbUri(boolean withDb) { String uri = "remote:" + this.host; @@ -49,20 +68,28 @@ public class OrientDbHelperService { public void cleanUpServer() { //String storageType = "plocal"; String storageType = "memory"; - // Drop old database and re-create it + // Drop old database ... OServerAdmin admin = null; try { admin = new OServerAdmin(getDbUri(false)); admin.connect(this.user, this.pass); admin.dropDatabase(this.db, storageType); + } catch (IOException e) { + System.err.println("Could not drop database: " + this.getDbUri(true)); + e.printStackTrace(); + } finally { + admin.close(); + } + // Create new database ... + try { + admin = new OServerAdmin(getDbUri(false)); + admin.connect(this.user, this.pass); admin.createDatabase(this.db, "graph", storageType); } catch (IOException e) { - try { - admin.createDatabase(this.db, "graph", storageType); - } catch (IOException e1) { - e1.printStackTrace(); - System.exit(1); - } + System.err.println("Could not create new database: " + this.getDbUri(true)); + e.printStackTrace(); + } finally { + admin.close(); } } @@ -70,6 +97,20 @@ public class OrientDbHelperService { this.createClasses(); this.createClusters(); } + + private void createClusters() { + OServerAdmin admin = null; + try { + admin = new OServerAdmin(getDbUri(false)); + admin.connect(this.user, this.pass); + // TODO: Create clusters! + } catch (IOException e) { + System.err.println("Failed to create custom clusters!"); + e.printStackTrace(); + } finally { + admin.close(); + } + } private void createClasses() { OrientGraphNoTx og = this.getOrientGraphFactory().getNoTx(); @@ -133,19 +174,4 @@ public class OrientDbHelperService { og.shutdown(); } - private void createClusters() { - OServerAdmin admin = null; - try { - admin = new OServerAdmin(getDbUri(false)); - admin.connect(this.user, this.pass); - } catch (IOException e) { - try { - admin.createDatabase(this.db, "graph", "plocal"); - } catch (IOException e1) { - e1.printStackTrace(); - System.exit(1); - } - } - } - }