package lu.jpt.csparqlproject.rentacar; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.larkc.csparql.cep.api.RdfQuadruple; import eu.larkc.csparql.cep.api.RdfStream; import lu.jpt.csparqlproject.Main; import lu.jpt.csparqlproject.misc.QueryContainer; import lu.jpt.csparqlproject.util.WindowLoggingRdfStream; /** * This is the main simulation part of the project. * It is being run within a separate thread, continously feeding its * data into the rdf event streams which are registered with the engine. * * The simulation itself is not that sophisticated and based on a lot of * random numbers. */ public class RentACarSimulation implements Runnable { public static Logger logger = LoggerFactory.getLogger(RentACarSimulation.class); public static final String BASE_URI = "http://example.org/carSim"; public static final String BASE_ONTOLOGY_IRI = BASE_URI + "/carSimulationOntology#"; public static final String BASE_STREAM_IRI = BASE_URI + "/stream"; public static final String BASE_OBJECT_IRI = BASE_URI + "/objects"; public static final String CAR_STREAM_IRI = BASE_STREAM_IRI + "/carStream"; public static final String DRIVER_STREAM_IRI = BASE_STREAM_IRI + "/driverStream"; private CarPool carPool; private List drivers; private volatile boolean runSimulation; private volatile boolean pauseSimulation; private RdfStream carStream; private RdfStream driverStream; public RentACarSimulation() { this.registerOwnPrefixes(); int numberOfCars = 2; int numberOfCustomers = 1; // Create a car pool and drivers this.carPool = new CarPool(numberOfCars); this.drivers = new ArrayList(); for(int i = 0; i < numberOfCustomers; i++) { Driver driver = new Driver(i, this.carPool); RentACarSimulation.logger.debug(driver.toString()); this.drivers.add(driver); } // Create streams this.carStream = new WindowLoggingRdfStream(CAR_STREAM_IRI); this.driverStream = new WindowLoggingRdfStream(DRIVER_STREAM_IRI); // Green light for simulation this.runSimulation = true; this.pauseSimulation = false; } private void registerOwnPrefixes() { Main.prefixManager.registerPrefix("event", RentACarSimulation.BASE_OBJECT_IRI+"/event#"); Main.prefixManager.registerPrefix("carOnt", RentACarSimulation.BASE_ONTOLOGY_IRI); Main.prefixManager.registerPrefix("car", RentACarSimulation.BASE_OBJECT_IRI+"/Car#"); Main.prefixManager.registerPrefix("driver", RentACarSimulation.BASE_OBJECT_IRI+"/Driver#"); } public RdfStream getCarStream() { return this.carStream; } public RdfStream getDriverStream() { return this.driverStream; } /** * All the simulation is happening within its own Thread */ @Override public void run() { // Run the simulation as long as needed while(this.runSimulation) { if(this.pauseSimulation) { this.waitForSimulationToResume(); } // A driver will autonomously pick a car, use it for some time and leave it. for(Driver driver : this.drivers) { driver.tick(); } for(Car car: this.carPool.getCars()) { car.tick(); } // Push the new data into the stream(s) this.pushDataToStream(); // Now wait a short while try { Thread.sleep(1000); } catch (InterruptedException e) { // We got interrupted, so what. } } } /** * Push all the data into the corresponding streams for processing. */ private void pushDataToStream() { for(Driver d : this.drivers) { List quads = d.getQuadruples(); for(RdfQuadruple q : quads) { this.driverStream.put(q); } } for(Car c : this.carPool.getCars()) { List quads = c.getQuadruples(); for(RdfQuadruple q : quads) { this.carStream.put(q); } } } private synchronized void waitForSimulationToResume() { try { while(this.pauseSimulation) this.wait(); } catch (InterruptedException e) { // No matter what, just wait again. } } public synchronized void pauseSimulation() { this.pauseSimulation = true; } public synchronized void resumeSimulation() { this.pauseSimulation = false; this.notify(); } /** * Tells the simulation to stop */ public void pleaseStopSimulation() { this.runSimulation = false; } public static QueryContainer getEventsQuery() { String query = "REGISTER QUERY getCarStatusEvents AS " + "PREFIX rdf: " + "PREFIX f: " + "PREFIX xsd: " + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> " + "SELECT ?car ?locked ?on ?speed ?rpm ?handbrake ?tirePressure1 ?tirePressure2 ?tirePressure3 ?tirePressure4 " + "FROM STREAM <"+RentACarSimulation.CAR_STREAM_IRI+"> [RANGE 5s STEP 1s] " + "FROM STREAM <"+RentACarSimulation.DRIVER_STREAM_IRI+"> [RANGE 15s STEP 1s] " + "WHERE { " + " ?e rdf:type car:CarStatusEvent . " + " ?e car:relatedCar ?car . " + " ?e car:locked ?locked . " + " ?e car:motorOn ?on . " + " ?e car:speed ?speed . " + " ?e car:motorRPM ?rpm . " + " ?e car:handbrakeEngaged ?handbrake . " + " ?e car:tirePressure1 ?tirePressure1 . " + " ?e car:tirePressure2 ?tirePressure2 . " + " ?e car:tirePressure3 ?tirePressure3 . " + " ?e car:tirePressure4 ?tirePressure4 . " + "}"; QueryContainer queryContainer = new QueryContainer("getCarStatusEvents", query, false); queryContainer.useObserverWindow(); return queryContainer; } public static QueryContainer getEventUsingBackgroundKnowledge() { String query = "REGISTER QUERY getEventsCombinedWithBackgroundKnowledge AS " + "PREFIX rdf: " + "PREFIX f: " + "PREFIX xsd: " + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> " + "SELECT ?car ?driverName ?driverPhone " + "FROM STREAM <"+RentACarSimulation.CAR_STREAM_IRI+"> [RANGE 5s STEP 1s] " + "FROM STREAM <"+RentACarSimulation.DRIVER_STREAM_IRI+"> [RANGE 15s STEP 1s] " + "FROM " + "WHERE { " + " ?e rdf:type car:CarStatusEvent . " + " ?e car:relatedCar ?car . " + " ?car car:isDrivenBy ?driver . " + " ?driver car:hasName ?driverName . " + " ?driver car:hasPhoneNumber ?driverPhone . " + "}"; QueryContainer queryContainer = new QueryContainer("getEventsCombinedWithBackgroundKnowledge", query, false); queryContainer.useObserverWindow(); return queryContainer; } public static QueryContainer getSpeedByCar() { String query = "REGISTER QUERY getSpeedByCar AS " + "PREFIX rdf: " + "PREFIX f: " + "PREFIX xsd: " + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> " + "SELECT ?car ?speed " + "FROM STREAM <"+RentACarSimulation.CAR_STREAM_IRI+"> [RANGE 5s STEP 1s] " + "WHERE { " + " ?e rdf:type car:CarStatusEvent . " + " ?e car:relatedCar ?car . " + " ?e car:speed ?speed . " + "}"; QueryContainer queryContainer = new QueryContainer("getSpeedByCar", query, false); queryContainer.useObserverWindow(); return queryContainer; } public static QueryContainer getAverageDataByCarAsStream() { String query = "REGISTER STREAM getAverageSpeedByCar AS " + "PREFIX rdf: " + "PREFIX f: " + "PREFIX xsd: " + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> " + "CONSTRUCT { " + " [] rdf:type car:AverageSpeedEvent " + " ; car:relatedCar ?car " + " ; car:averageSpeed ?avgSpeed . " + "} " + "FROM STREAM <"+RentACarSimulation.CAR_STREAM_IRI+"> [RANGE 5s STEP 1s] " + "WHERE { " + " { " + " SELECT ?car (AVG(?speed) AS ?avgSpeed) " + " WHERE { " + " ?e rdf:type car:CarStatusEvent . " + " ?e car:relatedCar ?car . " + " ?e car:speed ?speed . " + " } " + " GROUP BY (?car) " + " }" + "} "; QueryContainer queryContainer = new QueryContainer("getAverageSpeedByCar", query, true); return queryContainer; } /** * IMPORTANT NOTE: STREAM gets matched everywhere, so don't call stuff "stream"! */ public static QueryContainer selectFromRegisteredStream() { String query = "REGISTER Query askRegisteredStrm AS " + "PREFIX rdf: " + "PREFIX f: " + "PREFIX xsd: " + "PREFIX car: <"+RentACarSimulation.BASE_ONTOLOGY_IRI+"> " + "SELECT ?s ?p ?o " + "FROM STREAM [RANGE 5s STEP 1s] " + "WHERE { " + " ?s ?p ?o . " + "} "; QueryContainer queryContainer = new QueryContainer("askRegisteredStrm", query, false); queryContainer.useObserverWindow(); return queryContainer; } }