diff --git a/src/main/java/lu/jpt/csparqltest/Main.java b/src/main/java/lu/jpt/csparqltest/Main.java index 2875ca7..cfac9b7 100644 --- a/src/main/java/lu/jpt/csparqltest/Main.java +++ b/src/main/java/lu/jpt/csparqltest/Main.java @@ -14,6 +14,7 @@ import eu.larkc.csparql.core.engine.CsparqlEngineImpl; import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy; import lu.jpt.csparqltest.rentacar.RentACarSimulation; import lu.jpt.csparqltest.util.RandomHelper; +import lu.jpt.csparqltest.util.TestStreamGenerator; public class Main { @@ -35,12 +36,14 @@ public class Main { engine.initialize(true); // Add local background knowledge from file into graph + /* try { engine.putStaticNamedModel("http://example.org/carSimKnowledgeGraph", CsparqlUtils.serializeRDFFile("data/carSimulationABox.rdf")); } catch (Exception e) { logger.error(e.toString()); logger.error(e.getStackTrace().toString()); } + */ /* Use a SPARQL Query to update the local knowledge from code instead of using CONSTRUCT within the engine. String updateQuery = "PREFIX : " @@ -55,17 +58,21 @@ public class Main { //logger.debug("Engine from: " + engine.getClass().getProtectionDomain().getCodeSource()); // Create and register stream generator at specific URI - RentACarSimulation simulation = new RentACarSimulation(); - engine.registerStream(simulation.getCarStream()); - engine.registerStream(simulation.getDriverStream()); + //RentACarSimulation simulation = new RentACarSimulation(); + //engine.registerStream(simulation.getCarStream()); + //engine.registerStream(simulation.getDriverStream()); + TestStreamGenerator testStreamGenerator = new TestStreamGenerator("http://example.org"); + engine.registerStream(testStreamGenerator); // Run simulation that is feeding the streams - Thread simulationThread = new Thread(simulation); - simulationThread.start(); + //Thread simulationThread = new Thread(simulation); + //simulationThread.start(); + Thread testGeneratorThread = new Thread(testStreamGenerator); + testGeneratorThread.start(); // Now build a query to run - interchangeable - //String query = Main.getSPO(); - String query = RentACarSimulation.getEventUsingBackgroundKnowledge(); + String query = Main.getTestPatternQuery(); + //String query = RentACarSimulation.getEventUsingBackgroundKnowledge(); // Create a result proxy by registering the query at the engine CsparqlQueryResultProxy resultProxy = null; @@ -104,16 +111,104 @@ public class Main { engine.unregisterQuery(resultProxy.getId()); // Softly stop the simulation and unregister its streams - simulation.pleaseStopSimulation(); - simulationThread.interrupt(); - engine.unregisterStream(simulation.getCarStream().getIRI()); - engine.unregisterStream(simulation.getDriverStream().getIRI()); + //simulation.pleaseStopSimulation(); + //simulationThread.interrupt(); + //engine.unregisterStream(simulation.getCarStream().getIRI()); + //engine.unregisterStream(simulation.getDriverStream().getIRI()); + + testStreamGenerator.pleaseStop(); + testGeneratorThread.interrupt(); + engine.unregisterStream(testStreamGenerator.getIRI()); // That's it! System.exit(0); } private static String getSPO() { + return "REGISTER QUERY getSPO AS " + + "PREFIX f: " + + "PREFIX xsd: " + + "SELECT ?s ?p ?o " + + "FROM STREAM [RANGE 5s STEP 1s] " + + "WHERE { " + + " ?s ?p ?o . " + + "}"; + } + + private static String getTestPatternQuery() { + return "REGISTER QUERY matchANotBC AS " + + "PREFIX f: " + + "PREFIX rdf: " + + "PREFIX xsd: " + + "SELECT ?a ?b ?c " + + "FROM STREAM [RANGE 5s STEP 1s] " + + "WHERE { " + + " ?a a . " + + " ?c a . " + + " BIND(f:timestamp(?a,rdf:type,) AS ?tsa) " + + " BIND(f:timestamp(?c,rdf:type,) AS ?tsc) " + + " FILTER(?tsa < ?tsc) " + + " OPTIONAL {" + + " ?b a . " + + " BIND(f:timestamp(?b,rdf:type,) AS ?tsb) " + + " FILTER(?tsb < ?tsa || ?tsc < ?tsb)" + + " } " + + " BIND(BOUND(?b) AS ?bbound) " + + " BIND(?tsa < ?tsb AS ?abeforeb) " + + " BIND(?tsb < ?tsc AS ?bbeforec) " + + "}"; + } + + private static String getPatternANOTBCQuery() { + return "REGISTER QUERY matchANotBC AS " + + "PREFIX f: " + + "PREFIX rdf: " + + "PREFIX xsd: " + + "SELECT ?bbound ?abeforeb ?bbeforec ?tsa ?tsb ?tsc " + + "FROM STREAM [RANGE 5s STEP 1s] " + + "WHERE { " + + " ?a a . " + + " ?c a . " + + " BIND(f:timestamp(?a,rdf:type,) AS ?tsa) " + + " BIND(f:timestamp(?c,rdf:type,) AS ?tsc) " + + " FILTER(?tsa < ?tsc) " + + " OPTIONAL {" + + " ?b a . " + + " BIND(f:timestamp(?b,rdf:type,) AS ?tsb) " + + " FILTER(?tsb < ?tsa || ?tsc < ?tsb)" + + " } " + + " BIND(BOUND(?b) AS ?bbound) " + + " BIND(?tsa < ?tsb AS ?abeforeb) " + + " BIND(?tsb < ?tsc AS ?bbeforec) " + + "}"; + } + + + private static String getTestPatternABCQuery() { + return "REGISTER QUERY matchABC AS " + + "PREFIX f: " + + "PREFIX rdf: " + + "PREFIX xsd: " + + "SELECT ?tsa ?tsb ?tsc " + + "FROM STREAM [RANGE 5s STEP 1s] " + + "WHERE { " + + " ?a a . " + + " ?a a ?atype . " + + " OPTIONAL {" + + " ?b a . " + + " ?b a ?btype . " + + " } " + + " ?c a . " + + " ?c a ?ctype . " + + " BIND(f:timestamp(?a,rdf:type,?atype) AS ?tsa) " + + " BIND(f:timestamp(?b,rdf:type,?btype) AS ?tsb) " + + " BIND(f:timestamp(?c,rdf:type,?ctype) AS ?tsc) " + + " FILTER(f:timestamp(?a,rdf:type,?atype) < f:timestamp(?b,rdf:type,?btype)) " + + " FILTER(f:timestamp(?b,rdf:type,?btype) < f:timestamp(?c,rdf:type,?ctype)) " + + "}"; + } + + private static String getSPOFromRentACar() { return "REGISTER QUERY BasicCarInfo AS " + "PREFIX f: " + "PREFIX xsd: " diff --git a/src/main/java/lu/jpt/csparqltest/util/TestStreamGenerator.java b/src/main/java/lu/jpt/csparqltest/util/TestStreamGenerator.java new file mode 100644 index 0000000..ca2f3cb --- /dev/null +++ b/src/main/java/lu/jpt/csparqltest/util/TestStreamGenerator.java @@ -0,0 +1,48 @@ +package lu.jpt.csparqltest.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.larkc.csparql.cep.api.RdfQuadruple; +import eu.larkc.csparql.cep.api.RdfStream; + +public class TestStreamGenerator extends RdfStream implements Runnable { + protected final Logger logger = LoggerFactory.getLogger(TestStreamGenerator.class); + + private volatile boolean keepRunning = false; + + public TestStreamGenerator(String iri) { + super(iri); + } + + public void pleaseStop() { + keepRunning = false; + } + + @Override + public void run() { + String[] EVENT_TYPES = {"A", "B", "C", "D", "E"}; + this.keepRunning = true; + while (this.keepRunning) { + long currentTime = System.currentTimeMillis(); + String eventID = this.getIRI() + "/event#" + currentTime; + int randomEventNumber = RandomHelper.getRandomNumberWithin(0, EVENT_TYPES.length-1); + String eventType = EVENT_TYPES[randomEventNumber]; + this.put(new RdfQuadruple( + eventID, + "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", + this.getIRI()+"/type/"+eventType, + currentTime + ) + ); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + +}