diff --git a/src/lu/jpt/csparqltest/Main.java b/src/lu/jpt/csparqltest/Main.java
index 592cb79..365ebfe 100644
--- a/src/lu/jpt/csparqltest/Main.java
+++ b/src/lu/jpt/csparqltest/Main.java
@@ -24,25 +24,20 @@ public class Main {
engine.initialize(true);
// Create and register stream generator at specific URI
- RdfStream carStreamGenerator = new CarStreamGenerator("http://myexample.org/stream");
+ RdfStream carStreamGenerator = new CarStreamGenerator("http://myexample.org/cars");
engine.registerStream(carStreamGenerator);
// Run stream generator
final Thread t = new Thread((Runnable) carStreamGenerator);
t.start();
- // Now build a query to run
- String query = "REGISTER QUERY MyQueryName AS "
- + "PREFIX ex: "
- + "SELECT ?s ?p ?o "
- + "FROM STREAM [RANGE 5s STEP 1s] "
- + "WHERE { ?s ?p ?o "
- + "} ";
+ // Now build a query to run - interchangeable
+ String query = Main.getTemperatureChangeQuery();
// Create a result proxy by registering the query at the engine
CsparqlQueryResultProxy resultProxy = null;
try {
- resultProxy = engine.registerQuery(query, false);
+ resultProxy = engine.registerQuery(query, true);
} catch (ParseException e1) {
logger.error(e1.toString());
logger.error(e1.getStackTrace().toString());
@@ -52,7 +47,7 @@ public class Main {
// Let it all run for a couple of seconds (3 min, 20 seconds)
try {
- Thread.sleep(10000);
+ Thread.sleep(20000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
@@ -68,4 +63,31 @@ public class Main {
System.exit(0);
}
+ private static String getBasicCarInfoQuery() {
+ return "REGISTER QUERY BasicCarInfo AS "
+ + "PREFIX f: "
+ + "PREFIX xsd: "
+ + "PREFIX cars: "
+ + "SELECT ?car ?speed ?temp) "
+ + "FROM STREAM [RANGE 5s STEP 1s] "
+ + "WHERE { "
+ + " ?car cars:currentSpeed ?speed . "
+ + " ?car cars:currentTemperature ?temp . "
+ + "}";
+ }
+
+ private static String getTemperatureChangeQuery() {
+ return "REGISTER QUERY TemperatureChange AS "
+ + "PREFIX f: "
+ + "PREFIX xsd: "
+ + "PREFIX cars: "
+ + "SELECT ?car ?temp1 ?temp2 "
+ + "FROM STREAM [RANGE 5s STEP 1s] "
+ + "WHERE { "
+ + " ?car cars:currentTemperature ?temp1 . "
+ + " ?car cars:currentTemperature ?temp2 . "
+ + " FILTER(?temp1 != ?temp2) "
+ + "}";
+ }
+
}
diff --git a/src/lu/jpt/csparqltest/carexample/Car.java b/src/lu/jpt/csparqltest/carexample/Car.java
new file mode 100644
index 0000000..a95b082
--- /dev/null
+++ b/src/lu/jpt/csparqltest/carexample/Car.java
@@ -0,0 +1,144 @@
+package lu.jpt.csparqltest.carexample;
+
+import java.util.Random;
+
+public class Car {
+
+ private int id;
+ private int motorTemperature;
+ private int kilometersTotal;
+ private int kilometersPerHour;
+
+ private CarState state;
+
+ private enum CarState {
+ PARK, DRIVE, FAIL, POST_FAIL
+ }
+
+ private final int maxSpeed;
+
+ private static final int MAX_MOTOR_TEMPERATURE = 125;
+ private static final int MIN_MOTOR_TEMPERATURE = 20;
+
+ private static final Random rand = new Random();
+
+ public Car(int number) {
+ this.id = number;
+ this.motorTemperature = this.getRandomNumberWithin(20, 30);
+ this.kilometersTotal = this.getRandomNumberWithin(1000, 270000);
+ this.kilometersPerHour = 0;
+ this.state = CarState.PARK;
+ this.maxSpeed = this.getRandomNumberWithin(130, 210);
+ }
+
+ public void simulateNextState() {
+ switch(this.state) {
+ case PARK:
+ if(this.isLucky(0.35)) {
+ this.state = CarState.DRIVE;
+ this.kilometersPerHour = this.getRandomNumberWithin(7, 15);
+ }
+ break;
+ case DRIVE:
+ double willAccelerate = Math.abs(0.5 - this.getSpeedRatio()) + 0.2;
+ if(this.isLucky(willAccelerate)) {
+ this.kilometersPerHour += this.getRandomNumberWithin(5, 14);
+ } else {
+ this.kilometersPerHour += (-1) * this.getRandomNumberWithin(5, 14);
+ }
+ double willFail = 0.001;
+ if(this.kilometersTotal > 100000) willFail += 0.001;
+ if(this.motorTemperature > 100) willFail += 0.01;
+ if(this.isLucky(willFail)) {
+ this.state = CarState.FAIL;
+ }
+ break;
+ case FAIL:
+ if(this.kilometersPerHour == 0) {
+ this.state = CarState.POST_FAIL;
+ }
+ this.kilometersPerHour += (-1) * this.getRandomNumberWithin(15, 24);
+ break;
+ case POST_FAIL:
+ break;
+ }
+ this.enforceLimits();
+ this.adaptStateToCurrentParams();
+ this.enforceLimits();
+ }
+
+ public int getID() {
+ return this.id;
+ }
+
+ public int getMotorTemperature() {
+ return this.motorTemperature;
+ }
+
+ public int getKilometersTotal() {
+ return this.kilometersTotal;
+ }
+
+ public int getKilometersPerHour() {
+ return this.kilometersPerHour;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("[#");
+ sb.append(this.id);
+ sb.append(" STATE=");
+ sb.append(this.state);
+ sb.append(", KMH=");
+ sb.append(this.kilometersPerHour);
+ sb.append(", TEMP=");
+ sb.append(this.motorTemperature);
+ sb.append(", TOTAL_KM=");
+ sb.append(this.kilometersTotal);
+ sb.append("]");
+ return sb.toString();
+ }
+
+ private void adaptStateToCurrentParams() {
+ double speedRatio = this.getSpeedRatio();
+ int resultingHeatDelta = 0;
+ switch(this.state) {
+ case PARK:
+ resultingHeatDelta = (-1) * this.getRandomNumberWithin(2, 5);
+ break;
+ case DRIVE:
+ int heatDifficulty = (int) (5 * ( (double) this.motorTemperature / (double) Car.MAX_MOTOR_TEMPERATURE ) );
+ resultingHeatDelta = -1 * heatDifficulty;
+ resultingHeatDelta += (int) (10 * speedRatio);
+ break;
+ case FAIL:
+ resultingHeatDelta = Math.abs((int) (this.motorTemperature * (speedRatio + 0.85)));
+ break;
+ case POST_FAIL:
+ resultingHeatDelta = (-1) * this.getRandomNumberWithin(2, 4);
+ break;
+ }
+ this.motorTemperature += resultingHeatDelta;
+ this.kilometersTotal += (int) (((double) this.kilometersPerHour) / 60.0);
+ }
+
+ private int getRandomNumberWithin(int min, int max) {
+ return min + Car.rand.nextInt((max + 1) - min);
+ }
+
+ private boolean isLucky(double chance) {
+ return Car.rand.nextDouble() < chance;
+ }
+
+ private double getSpeedRatio() {
+ return ((double) this.kilometersPerHour) / ((double) this.maxSpeed);
+ }
+
+ private void enforceLimits() {
+ if(this.kilometersPerHour < 0) this.kilometersPerHour = 0;
+ if(this.kilometersPerHour > this.maxSpeed) this.kilometersPerHour = this.maxSpeed;
+ if(this.motorTemperature < Car.MIN_MOTOR_TEMPERATURE) this.motorTemperature = MIN_MOTOR_TEMPERATURE;
+ if(this.motorTemperature > Car.MAX_MOTOR_TEMPERATURE) this.motorTemperature = MAX_MOTOR_TEMPERATURE;
+ }
+
+
+}
diff --git a/src/lu/jpt/csparqltest/carexample/CarSimulator.java b/src/lu/jpt/csparqltest/carexample/CarSimulator.java
new file mode 100644
index 0000000..37be6ca
--- /dev/null
+++ b/src/lu/jpt/csparqltest/carexample/CarSimulator.java
@@ -0,0 +1,29 @@
+package lu.jpt.csparqltest.carexample;
+
+import java.util.ArrayList;
+
+public class CarSimulator {
+
+ /**
+ * Simple debugging programm to check the simulators properties
+ */
+ public static void main(String[] args) {
+ System.out.println("Let's go!");
+ ArrayList cars = new ArrayList();
+ cars.add(new Car(0));
+ while(true) {
+ for(Car currentCar : cars) {
+ currentCar.simulateNextState();
+ System.out.println(currentCar);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+ }
+
+}
diff --git a/src/lu/jpt/csparqltest/carexample/CarStreamGenerator.java b/src/lu/jpt/csparqltest/carexample/CarStreamGenerator.java
index 03c4699..ee26de8 100644
--- a/src/lu/jpt/csparqltest/carexample/CarStreamGenerator.java
+++ b/src/lu/jpt/csparqltest/carexample/CarStreamGenerator.java
@@ -1,5 +1,7 @@
package lu.jpt.csparqltest.carexample;
+import java.util.ArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -9,11 +11,13 @@ import eu.larkc.csparql.cep.api.RdfStream;
public class CarStreamGenerator extends RdfStream implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(CarStreamGenerator.class);
- private int counter = 1;
private volatile boolean keepRunning = false;
+ private ArrayList cars;
+
public CarStreamGenerator(String iri) {
super(iri);
+ this.initializeCars(10);
}
public void pleaseStop() {
@@ -24,22 +28,45 @@ public class CarStreamGenerator extends RdfStream implements Runnable {
public void run() {
this.keepRunning = true;
while (this.keepRunning) {
- final RdfQuadruple q = new RdfQuadruple(getIRI() + "/S" + this.counter, getIRI() + "/P" + this.counter,
- getIRI() + "/O" + this.counter, System.currentTimeMillis());
-
- if (this.counter % 10 == 0) {
- logger.info(this.counter + " triples streamed so far");
+ this.updateCars();
+ for(Car currentCar : this.cars) {
+ long currentTime = System.currentTimeMillis();
+
+ this.put(new RdfQuadruple(
+ getIRI() + "/" + currentCar.getID(),
+ getIRI() + "#currentSpeed",
+ ""+currentCar.getKilometersPerHour() + "^^http://www.w3.org/2001/XMLSchema#integer",
+ currentTime
+ )
+ );
+ this.put(new RdfQuadruple(
+ getIRI() + "/" + currentCar.getID(),
+ getIRI() + "#currentTemperature",
+ ""+currentCar.getMotorTemperature() + "^^http://www.w3.org/2001/XMLSchema#integer",
+ currentTime
+ )
+ );
}
-
- this.put(q);
try {
- Thread.sleep(500);
+ Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
- this.counter++;
+ }
+ }
+
+ private void initializeCars(int n) {
+ this.cars = new ArrayList();
+ for(int i=0; i