diff --git a/pom.xml b/pom.xml
index 6311831..bd49522 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
eu.larkc.csparql
csparql-core
- 0.9.7
+ 0.9.6
it.polimi.deib
diff --git a/src/main/java/lu/jpt/csparqlproject/SimulationContext.java b/src/main/java/lu/jpt/csparqlproject/SimulationContext.java
index 9f7dbaf..239dd62 100644
--- a/src/main/java/lu/jpt/csparqlproject/SimulationContext.java
+++ b/src/main/java/lu/jpt/csparqlproject/SimulationContext.java
@@ -11,9 +11,11 @@ import eu.larkc.csparql.cep.api.RdfStream;
import eu.larkc.csparql.core.engine.CsparqlEngine;
import eu.larkc.csparql.core.engine.CsparqlEngineImpl;
import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy;
+import eu.larkc.csparql.core.engine.RDFStreamFormatter;
import lu.jpt.csparqlproject.gui.FancyTextObserverWindow;
import lu.jpt.csparqlproject.rentacar.RentACarSimulation;
import lu.jpt.csparqlproject.util.CsparqlQueryHelper;
+import lu.jpt.csparqlproject.util.CsparqlQueryHelper.CsparqlQueryInfo;
/**
* This class encapsulates the use of the C-SPARQL Engine. Here, the RentACarSimulation,
@@ -153,23 +155,38 @@ public class SimulationContext {
RdfStream driverStream = simulation.getDriverStream();
this.engine.registerStream(driverStream);
this.registeredStreams.add(driverStream);
- // Register all the queries and add result observers!
Collection queriesToRegister = new ArrayList();
- queriesToRegister.add(RentACarSimulation.getEventsQuery());
- queriesToRegister.add(RentACarSimulation.getAverageDataByCar());
+ // Collect the queries to use!
+ //queriesToRegister.add(RentACarSimulation.getEventsQuery());
+ queriesToRegister.add(RentACarSimulation.getAverageDataByCarAsStream());
+ queriesToRegister.add(RentACarSimulation.selectFromRegisteredStream());
+ // Now register each query appropriately!
for(String query : queriesToRegister) {
- String queryName = CsparqlQueryHelper.getQueryName(query);
+ CsparqlQueryInfo queryInfo = CsparqlQueryHelper.getQueryInfo(query);
CsparqlQueryResultProxy resultProxy = null;
try {
- resultProxy = this.engine.registerQuery(query, true);
+ resultProxy = this.engine.registerQuery(query, false);
+ // Take care of streams and queries differently.
+ if(queryInfo.isStream) {
+ // If the query is a stream, we need additional components to feed it back into the engine.
+ String streamUri = "http://example.org/stream/"+queryInfo.name;
+ RDFStreamFormatter rdfStreamFormatter = new RDFStreamFormatter(streamUri);
+ engine.registerStream(rdfStreamFormatter);
+ resultProxy.addObserver(rdfStreamFormatter);
+ Observer resultObserver = this.createResultObserverWindow(queryInfo.name);
+ resultProxy.addObserver(resultObserver);
+ } else {
+ // If it is a regular query, just attach a fitting observer
+ Observer resultObserver = this.createResultObserverWindow(queryInfo.name);
+ resultProxy.addObserver(resultObserver);
+ }
+ this.queryResultProxies.add(resultProxy);
+ SimulationContext.logger.info("Successfully registered query " + queryInfo.name + ": " + query);
} catch (Exception e) {
SimulationContext.logger.error(e.toString());
- SimulationContext.logger.error("Could not register query "+queryName);
+ SimulationContext.logger.error("Could not register query "+queryInfo.name);
SimulationContext.logger.error(query);
}
- this.queryResultProxies.add(resultProxy);
- Observer resultObserver = this.createResultObserverWindow(queryName);
- resultProxy.addObserver(resultObserver);
}
// Setup complete, ready to run.
SimulationContext.logger.info("Simulation set up and ready to go!");
diff --git a/src/main/java/lu/jpt/csparqlproject/rentacar/RentACarSimulation.java b/src/main/java/lu/jpt/csparqlproject/rentacar/RentACarSimulation.java
index 5179e3d..16a5e72 100644
--- a/src/main/java/lu/jpt/csparqlproject/rentacar/RentACarSimulation.java
+++ b/src/main/java/lu/jpt/csparqlproject/rentacar/RentACarSimulation.java
@@ -42,7 +42,7 @@ public class RentACarSimulation implements Runnable {
public RentACarSimulation() {
this.registerOwnPrefixes();
- int numberOfCars = 5;
+ int numberOfCars = 2;
int numberOfCustomers = 1;
// Create a car pool and drivers
this.carPool = new CarPool(numberOfCars);
@@ -204,20 +204,44 @@ public class RentACarSimulation implements Runnable {
+ "}";
}
- public static String getAverageDataByCar() {
- return "REGISTER QUERY getAverageSpeedByCar AS "
+ public static String getAverageDataByCarAsStream() {
+ return "REGISTER STREAM getAverageSpeedByCar AS "
+ "PREFIX rdf: "
+ "PREFIX f: "
+ "PREFIX xsd: "
+ "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> "
- + "SELECT ?car (AVG(?speed) AS ?avgSpeed) (AVG(?rpm) AS ?avgRPM) "
+ + "CONSTRUCT { "
+ + " [] rdf:type car:AverageSpeedEvent . "
+ + " [] car:relatedCar ?car . "
+ + " [] car:averageSpeed ?avgSpeed . "
+ + "} "
+ "FROM STREAM <"+RentACarSimulation.CAR_STREAM_IRI+"> [RANGE 5s STEP 1s] "
+ "WHERE { "
- + " ?e rdf:type car:CarStatusEvent . "
- + " ?e car:relatedCar ?car . "
- + " ?e car:motorRPM ?rpm . "
- + " ?e car:speed ?speed . "
- + "} "
- + "GROUP BY (?car) ";
+ + " { "
+ + " SELECT (AVG(?speed) AS ?avgSpeed) "
+ + " WHERE { "
+ + " ?e rdf:type car:CarStatusEvent . "
+ + " ?e car:relatedCar ?car . "
+ + " ?e car:speed ?speed . "
+ + " } "
+ + " GROUP BY (?car) "
+ + " }"
+ + "} ";
+ }
+
+ /**
+ * IMPORTANT NOTE: STREAM gets matched everywhere, so don't call stuff "stream"!
+ */
+ public static String selectFromRegisteredStream() {
+ return "REGISTER Query askRegisteredStrm AS "
+ + "PREFIX rdf: "
+ + "PREFIX f: "
+ + "PREFIX xsd: "
+ + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> "
+ + "SELECT ?s ?p ?o "
+ + "FROM STREAM [RANGE 5s STEP 1s] "
+ + "WHERE { "
+ + " ?s ?p ?o . "
+ + "} ";
}
}
diff --git a/src/main/java/lu/jpt/csparqlproject/util/CsparqlQueryHelper.java b/src/main/java/lu/jpt/csparqlproject/util/CsparqlQueryHelper.java
index 5c9a8cc..102284b 100644
--- a/src/main/java/lu/jpt/csparqlproject/util/CsparqlQueryHelper.java
+++ b/src/main/java/lu/jpt/csparqlproject/util/CsparqlQueryHelper.java
@@ -8,13 +8,26 @@ import java.util.StringTokenizer;
*/
public class CsparqlQueryHelper {
+ /**
+ * Small inner class containing query info
+ */
+ public class CsparqlQueryInfo {
+ public boolean isStream;
+ public String name;
+
+ public CsparqlQueryInfo() {
+ this.name = "[UNRESOLVED NAME]";
+ this.isStream = false;
+ }
+ }
+
/**
* Returns the csparql query name stated within the query.
* @param query given csparql query
- * @return name of query
+ * @return query info using class CsparqlQueryInfo
*/
- public static String getQueryName(String query) {
- String queryName = "[UNRESOLVED NAME]";
+ public static CsparqlQueryInfo getQueryInfo(String query) {
+ CsparqlQueryHelper.CsparqlQueryInfo result = new CsparqlQueryHelper().new CsparqlQueryInfo();
StringTokenizer tokenizer = new StringTokenizer(query);
boolean gotName = false;
int stateCounter = 0;
@@ -29,18 +42,19 @@ public class CsparqlQueryHelper {
break;
case 1:
if(trimmedLowerToken.equals("query") || trimmedLowerToken.equals("stream")) {
+ if(trimmedLowerToken.equals("stream")) result.isStream = true;
stateCounter++;
}
break;
case 2:
- queryName = token.trim();
+ result.name = token.trim();
gotName = true;
break;
default:
break;
}
}
- return queryName;
+ return result;
}
}