Some refactoring and more optimization attempts
This commit is contained in:
parent
67aa56fec4
commit
c797d33fd8
|
@ -6,7 +6,6 @@ import java.util.concurrent.TimeoutException;
|
||||||
import org.pcap4j.core.NotOpenException;
|
import org.pcap4j.core.NotOpenException;
|
||||||
import org.pcap4j.core.PcapNativeException;
|
import org.pcap4j.core.PcapNativeException;
|
||||||
|
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE;
|
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
|
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
|
||||||
|
|
||||||
import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService;
|
import de.hsh.inform.orientdb_project.netdata.AbstractNetdataImportService;
|
||||||
|
@ -16,24 +15,32 @@ import de.hsh.inform.orientdb_project.orientdb.OrientDbHelperService;
|
||||||
public class Main {
|
public class Main {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
OrientDbHelperService odhs = new OrientDbHelperService("127.0.0.1", "hshtest", "root", "root");
|
// TODO: Make this configurable or easy to exchange.
|
||||||
|
String filename = "/home/jpt/Temp/tcpdump_2";
|
||||||
|
OrientDbHelperService odhs = new OrientDbHelperService("192.168.0.110", "hshtest", "root", "root");
|
||||||
|
|
||||||
|
// Clean up existing database and set up schema from scratch
|
||||||
odhs.cleanUpServer();
|
odhs.cleanUpServer();
|
||||||
odhs.setupSchema();
|
odhs.setupSchema();
|
||||||
|
|
||||||
String filename = "/home/jpt/Temp/tcpdump_2";
|
// Get "handle" for database to pass to import service
|
||||||
OrientGraphNoTx ogf = odhs.getOrientGraphFactory().getNoTx();
|
OrientGraphNoTx ogf = odhs.getOrientGraphNoTx();
|
||||||
ogf.setThreadMode(THREAD_MODE.MANUAL);
|
|
||||||
|
|
||||||
//AbstractNetdataImportService importService = new DummyImportService(filename);
|
//AbstractNetdataImportService importService = new DummyImportService(filename);
|
||||||
//AbstractNetdataImportService importService = new LowPerformanceOrientDbNetdataImportService(filename, ogf);
|
//AbstractNetdataImportService importService = new LowPerformanceOrientDbNetdataImportService(filename, ogf);
|
||||||
AbstractNetdataImportService importService = new HighPerformanceKappaOrientDbNetdataImportService(filename, ogf);
|
AbstractNetdataImportService importService = new HighPerformanceKappaOrientDbNetdataImportService(filename, ogf);
|
||||||
|
|
||||||
|
// Go go gadget import service!
|
||||||
try {
|
try {
|
||||||
System.out.println(System.currentTimeMillis() + ": Begin import of data ...");
|
System.out.println(System.currentTimeMillis()/1000L + ": Begin import of data ...");
|
||||||
importService.run();
|
importService.partialRun(12000);
|
||||||
System.out.println("Import of data done!");
|
System.out.println("Import of data done!");
|
||||||
} catch (EOFException | PcapNativeException | TimeoutException | NotOpenException e) {
|
} catch (EOFException | PcapNativeException | TimeoutException | NotOpenException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
// Done
|
||||||
|
odhs.close();
|
||||||
|
System.out.println("End of program.");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,13 +25,25 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser
|
||||||
|
|
||||||
private String filename;
|
private String filename;
|
||||||
|
|
||||||
|
private long packetCounter;
|
||||||
|
private long packetLimit;
|
||||||
|
|
||||||
|
private boolean limitedImportRun;
|
||||||
|
|
||||||
public AbstractNetdataImportService(String filename) {
|
public AbstractNetdataImportService(String filename) {
|
||||||
this.filename = filename;
|
this.filename = filename;
|
||||||
|
this.limitedImportRun = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void partialRun(long packetLimit) throws EOFException, PcapNativeException, TimeoutException, NotOpenException {
|
||||||
|
this.packetLimit = packetLimit;
|
||||||
|
this.limitedImportRun = true;
|
||||||
|
this.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void run() throws PcapNativeException, EOFException, TimeoutException, NotOpenException {
|
public final void run() throws PcapNativeException, EOFException, TimeoutException, NotOpenException {
|
||||||
PcapHandle handle = Pcaps.openOffline(this.filename);
|
PcapHandle handle = Pcaps.openOffline(this.filename);
|
||||||
long packetCounter = 0;
|
this.packetCounter = 0;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Packet packet = handle.getNextPacketEx();
|
Packet packet = handle.getNextPacketEx();
|
||||||
if(packet == null) break;
|
if(packet == null) break;
|
||||||
|
@ -39,10 +51,14 @@ public abstract class AbstractNetdataImportService implements NetdataResultObser
|
||||||
int ms = handle.getTimestampMicros();
|
int ms = handle.getTimestampMicros();
|
||||||
EthernetPacket ether = packet.get(EthernetPacket.class);
|
EthernetPacket ether = packet.get(EthernetPacket.class);
|
||||||
this.handleEthernetPacket(ether, ts, ms);
|
this.handleEthernetPacket(ether, ts, ms);
|
||||||
if(packetCounter % 1000 == 0) {
|
if(this.packetCounter % 1000 == 0) {
|
||||||
System.out.println(System.currentTimeMillis() + ": " + packetCounter);
|
System.out.println(System.currentTimeMillis()/1000L + ": " + this.packetCounter);
|
||||||
|
}
|
||||||
|
this.packetCounter++;
|
||||||
|
if(this.limitedImportRun && this.packetCounter > this.packetLimit) {
|
||||||
|
System.out.println("Limited import run done. Breaking.");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
packetCounter++;
|
|
||||||
}
|
}
|
||||||
this.afterImport();
|
this.afterImport();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,6 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
|
||||||
private Vertex tcpPacket;
|
private Vertex tcpPacket;
|
||||||
private Vertex icmpPacket;
|
private Vertex icmpPacket;
|
||||||
|
|
||||||
private long packetCounter;
|
|
||||||
|
|
||||||
public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) {
|
public HighPerformanceKappaOrientDbNetdataImportService(String filename, OrientGraphNoTx orientGraph) {
|
||||||
super(filename);
|
super(filename);
|
||||||
this.og = orientGraph;
|
this.og = orientGraph;
|
||||||
|
@ -44,15 +42,6 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) {
|
public void handleEthernetPacket(EthernetPacket ether, long ts, int ms) {
|
||||||
this.packetCounter++;
|
|
||||||
if(this.packetCounter > 2000) {
|
|
||||||
for(LinkedList<TcpConnection> connList : this.knownTcpConnections.values()) {
|
|
||||||
for(TcpConnection conn : connList) {
|
|
||||||
System.out.println(conn.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.exit(0);
|
|
||||||
}
|
|
||||||
// Clean up state vars before processing the next ethernet frame
|
// Clean up state vars before processing the next ethernet frame
|
||||||
this.ethernetFrame = null;
|
this.ethernetFrame = null;
|
||||||
this.arpPacket = null;
|
this.arpPacket = null;
|
||||||
|
@ -239,7 +228,13 @@ public class HighPerformanceKappaOrientDbNetdataImportService extends AbstractNe
|
||||||
|
|
||||||
public void afterImport() {
|
public void afterImport() {
|
||||||
// TODO: Insert all TcpConnections!
|
// TODO: Insert all TcpConnections!
|
||||||
System.out.println("Fertig!");
|
System.out.println("All done. Processing collected TcpConnections ...");
|
||||||
|
for(LinkedList<TcpConnection> connList : this.knownTcpConnections.values()) {
|
||||||
|
for(TcpConnection conn : connList) {
|
||||||
|
// TODO
|
||||||
|
System.out.println(conn.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,13 +3,14 @@ package de.hsh.inform.orientdb_project.orientdb;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import com.orientechnologies.orient.client.remote.OServerAdmin;
|
import com.orientechnologies.orient.client.remote.OServerAdmin;
|
||||||
|
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
|
||||||
import com.orientechnologies.orient.core.intent.OIntentMassiveInsert;
|
import com.orientechnologies.orient.core.intent.OIntentMassiveInsert;
|
||||||
import com.orientechnologies.orient.core.metadata.schema.OType;
|
import com.orientechnologies.orient.core.metadata.schema.OType;
|
||||||
|
import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE;
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientEdgeType;
|
import com.tinkerpop.blueprints.impls.orient.OrientEdgeType;
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
|
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
|
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientVertexType;
|
import com.tinkerpop.blueprints.impls.orient.OrientVertexType;
|
||||||
import com.tinkerpop.blueprints.impls.orient.OrientConfigurableGraph.THREAD_MODE;
|
|
||||||
|
|
||||||
public class OrientDbHelperService {
|
public class OrientDbHelperService {
|
||||||
|
|
||||||
|
@ -29,15 +30,33 @@ public class OrientDbHelperService {
|
||||||
this.factory = null;
|
this.factory = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public OrientGraphFactory getOrientGraphFactory() {
|
public OrientGraphFactory getOrientGraphFactory() {
|
||||||
if(this.factory == null) {
|
if(this.factory == null) {
|
||||||
this.factory = new OrientGraphFactory(getDbUri(true), this.user, this.pass);
|
this.factory = new OrientGraphFactory(getDbUri(true), this.user, this.pass);
|
||||||
|
// Settings concerning import performance (?)
|
||||||
this.factory.declareIntent(new OIntentMassiveInsert());
|
this.factory.declareIntent(new OIntentMassiveInsert());
|
||||||
this.factory.setThreadMode(THREAD_MODE.ALWAYS_AUTOSET);
|
this.factory.setThreadMode(THREAD_MODE.MANUAL);
|
||||||
|
this.factory.setAutoStartTx(false);
|
||||||
|
this.factory.setKeepInMemoryReferences(false);
|
||||||
|
this.factory.setRequireTransaction(false);
|
||||||
|
this.factory.setUseLog(false);
|
||||||
|
this.factory.setupPool(1, 1);
|
||||||
}
|
}
|
||||||
|
// Return the factory
|
||||||
return this.factory;
|
return this.factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.getOrientGraphFactory().close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public OrientGraphNoTx getOrientGraphNoTx() {
|
||||||
|
OrientGraphNoTx ogf = this.getOrientGraphFactory().getNoTx();
|
||||||
|
return ogf;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public String getDbUri(boolean withDb) {
|
public String getDbUri(boolean withDb) {
|
||||||
String uri = "remote:" + this.host;
|
String uri = "remote:" + this.host;
|
||||||
if(withDb) {
|
if(withDb) {
|
||||||
|
@ -49,20 +68,28 @@ public class OrientDbHelperService {
|
||||||
public void cleanUpServer() {
|
public void cleanUpServer() {
|
||||||
//String storageType = "plocal";
|
//String storageType = "plocal";
|
||||||
String storageType = "memory";
|
String storageType = "memory";
|
||||||
// Drop old database and re-create it
|
// Drop old database ...
|
||||||
OServerAdmin admin = null;
|
OServerAdmin admin = null;
|
||||||
try {
|
try {
|
||||||
admin = new OServerAdmin(getDbUri(false));
|
admin = new OServerAdmin(getDbUri(false));
|
||||||
admin.connect(this.user, this.pass);
|
admin.connect(this.user, this.pass);
|
||||||
admin.dropDatabase(this.db, storageType);
|
admin.dropDatabase(this.db, storageType);
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Could not drop database: " + this.getDbUri(true));
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
// Create new database ...
|
||||||
|
try {
|
||||||
|
admin = new OServerAdmin(getDbUri(false));
|
||||||
|
admin.connect(this.user, this.pass);
|
||||||
admin.createDatabase(this.db, "graph", storageType);
|
admin.createDatabase(this.db, "graph", storageType);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
try {
|
System.err.println("Could not create new database: " + this.getDbUri(true));
|
||||||
admin.createDatabase(this.db, "graph", storageType);
|
e.printStackTrace();
|
||||||
} catch (IOException e1) {
|
} finally {
|
||||||
e1.printStackTrace();
|
admin.close();
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,6 +98,20 @@ public class OrientDbHelperService {
|
||||||
this.createClusters();
|
this.createClusters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createClusters() {
|
||||||
|
OServerAdmin admin = null;
|
||||||
|
try {
|
||||||
|
admin = new OServerAdmin(getDbUri(false));
|
||||||
|
admin.connect(this.user, this.pass);
|
||||||
|
// TODO: Create clusters!
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Failed to create custom clusters!");
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void createClasses() {
|
private void createClasses() {
|
||||||
OrientGraphNoTx og = this.getOrientGraphFactory().getNoTx();
|
OrientGraphNoTx og = this.getOrientGraphFactory().getNoTx();
|
||||||
|
|
||||||
|
@ -133,19 +174,4 @@ public class OrientDbHelperService {
|
||||||
og.shutdown();
|
og.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createClusters() {
|
|
||||||
OServerAdmin admin = null;
|
|
||||||
try {
|
|
||||||
admin = new OServerAdmin(getDbUri(false));
|
|
||||||
admin.connect(this.user, this.pass);
|
|
||||||
} catch (IOException e) {
|
|
||||||
try {
|
|
||||||
admin.createDatabase(this.db, "graph", "plocal");
|
|
||||||
} catch (IOException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue