251 lines
9.8 KiB
Java

package lu.jpt.csparqlproject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Observer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.larkc.csparql.cep.api.RdfStream;
import eu.larkc.csparql.common.utils.CsparqlUtils;
import eu.larkc.csparql.core.engine.CsparqlEngine;
import eu.larkc.csparql.core.engine.CsparqlEngineImpl;
import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy;
import eu.larkc.csparql.core.engine.RDFStreamFormatter;
import lu.jpt.csparqlproject.gui.FancyTextObserverWindow;
import lu.jpt.csparqlproject.misc.QueryContainer;
import lu.jpt.csparqlproject.rentacar.RentACarSimulation;
/**
* This class encapsulates the use of the C-SPARQL Engine. Here, the RentACarSimulation,
* which provides the needed event streams is being initialized, and its event streams
* are being registered with the engine.
*
* Also, the necessary C-SPARQL queries are fetched from the RentACarSimulation and
* also getting registered with the engine. This way, everything you may need to know
* about how to use the engine itself lies within here.
*
* Take a look at the methods setup()
* and teardown(), since they contain the most important parts.
*/
public class SimulationContext {
public static Logger logger = LoggerFactory.getLogger(SimulationContext.class);
public enum SimulationState {UNINITIALIZED, INITIALIZED, RUNNING, PAUSED, SUSPENDED, TORNDOWN, ERROR};
// Internal simulation context
private SimulationState currentState;
// C-SPARQL related
private CsparqlEngine engine;
private Collection<RdfStream> registeredStreams;
private Collection<CsparqlQueryResultProxy> queryResultProxies;
// Custom simulation related
private RentACarSimulation simulation;
private Thread simulationThread;
/**
* Constructor
*/
public SimulationContext() {
this.currentState = SimulationState.UNINITIALIZED;
this.setup();
}
public void start() {
if(this.currentState != SimulationState.INITIALIZED) {
throw new RuntimeException("Action not allowed in this state!");
}
simulationThread = new Thread(simulation);
simulationThread.start();
SimulationContext.logger.info("Simulation started!");
this.currentState = SimulationState.RUNNING;
}
public void pause() {
if(this.currentState != SimulationState.RUNNING) {
throw new RuntimeException("Action not allowed in this state!");
}
this.simulation.pauseSimulation();
SimulationContext.logger.info("Simulation paused!");
this.currentState = SimulationState.PAUSED;
}
public void resume() {
if(this.currentState != SimulationState.PAUSED) {
throw new RuntimeException("Action not allowed in this state!");
}
this.simulation.resumeSimulation();
SimulationContext.logger.info("Simulation resumed!");
this.currentState = SimulationState.RUNNING;
}
public void suspend() {
if(this.currentState != SimulationState.RUNNING
&& this.currentState != SimulationState.PAUSED) {
throw new RuntimeException("Action not allowed in this state!");
}
this.simulation.pleaseStopSimulation();
this.simulationThread.interrupt();
SimulationContext.logger.info("Simulation suspended!");
this.currentState = SimulationState.SUSPENDED;
}
public void tearDown() {
if(this.currentState != SimulationState.SUSPENDED) {
throw new RuntimeException("Action not allowed in this state!");
}
// Unregister all queries from the engine
for(CsparqlQueryResultProxy resultProxy : this.queryResultProxies) {
this.engine.unregisterQuery(resultProxy.getId());
}
// Unregister all streams from the engine
for(RdfStream eventStream : this.registeredStreams) {
this.engine.unregisterStream(eventStream.getIRI());
}
SimulationContext.logger.info("Simulation shut down!");
this.currentState = SimulationState.TORNDOWN;
System.exit(0);
}
public SimulationState getSimulationState() {
return this.currentState;
}
private void setup() {
// Prepare collections for registered streams and query result proxies
this.registeredStreams = new ArrayList<RdfStream>();
this.queryResultProxies = new ArrayList<CsparqlQueryResultProxy>();
// Instantiate and initialize engine
this.engine = new CsparqlEngineImpl();
// Initialize with true to allow use of timestamp function
this.engine.initialize(true);
// Debugging output
SimulationContext.logger.debug("CWD: " + System.getProperty("user.dir"));
SimulationContext.logger.debug("Engine from: " + this.engine.getClass().getProtectionDomain().getCodeSource());
// Load local domain knowledge into its target graph
try {
engine.putStaticNamedModel("http://example.org/carSimKnowledgeGraph", CsparqlUtils.serializeRDFFile("data/carSimulationABox.rdf"));
} catch (Exception e) {
SimulationContext.logger.error(e.toString());
SimulationContext.logger.error(e.getStackTrace().toString());
}
// From the C-SPARQL-ReadyToGoPack:
// Note: This is how local domain knowledge can be updated during runtime:
/* Use a SPARQL Query to update the local knowledge from code instead of using CONSTRUCT within the engine.
String updateQuery = "PREFIX : <http://www.streamreasoning.org/ontologies/sr4ld2014-onto#> "
+ "INSERT DATA "
+ "{ GRAPH <http://streamreasoning.org/roomConnection> { :room :isConnectedTo :room2 } }";
this.engine.execUpdateQueryOverDatasource(updateQuery);
*/
// DEBUGGING ONLY
//ReasoningTester rt = new ReasoningTester(this.engine);
// Spawn the whole simulation - this takes care of simulation-specific things
simulation = new RentACarSimulation(this.engine);
// Register all the event streams
// Register car event stream
RdfStream carStream = simulation.getCarStream();
this.engine.registerStream(carStream);
this.registeredStreams.add(carStream);
// Register driver event stream
RdfStream driverStream = simulation.getDriverStream();
this.engine.registerStream(driverStream);
this.registeredStreams.add(driverStream);
Collection<QueryContainer> queriesToRegister = new ArrayList<QueryContainer>();
// Collect the queries to use!
queriesToRegister.add(simulation.getStandingCarsStream());
queriesToRegister.add(simulation.getMovingCarsStream());
queriesToRegister.add(simulation.getStronglyAcceleratingCarsStream());
queriesToRegister.add(simulation.getStronglyBrakingCarsStream());
queriesToRegister.add(simulation.getEngineWearStream());
queriesToRegister.add(simulation.getBrakeWearStream());
queriesToRegister.add(simulation.getHandbrakeWearStream());
queriesToRegister.add(simulation.getTireWearStream());
queriesToRegister.add(simulation.getCarTakenEventsQuery());
queriesToRegister.add(simulation.getCarReturnedEventsQuery());
// Now register each query appropriately!
for(QueryContainer queryContainer : queriesToRegister) {
CsparqlQueryResultProxy resultProxy = null;
try {
resultProxy = this.engine.registerQuery(queryContainer.query, queryContainer.reasoningEnabled);
if(queryContainer.reasoningEnabled) {
this.enableReasoningForResultProxy(resultProxy, queryContainer);
}
// Take care of streams and queries differently.
if(queryContainer.isStream) {
// If the query is a stream, we need additional components to feed it back into the engine.
String streamUri = "http://example.org/stream/"+queryContainer.name;
RDFStreamFormatter rdfStreamFormatter = new RDFStreamFormatter(streamUri);
engine.registerStream(rdfStreamFormatter);
resultProxy.addObserver(rdfStreamFormatter);
Observer resultObserver = this.createResultObserverWindow(queryContainer.name);
resultProxy.addObserver(resultObserver);
} else {
// If it is a regular query, just attach a fitting observer
Observer resultObserver = this.createResultObserverWindow(queryContainer.name);
resultProxy.addObserver(resultObserver);
}
// Attach custom observers to resultProxy if available
// These are used for SPARQL UPDATE queries on local domain knowledge
if(queryContainer.customObserver != null) {
resultProxy.addObserver(queryContainer.customObserver);
}
this.queryResultProxies.add(resultProxy);
SimulationContext.logger.info("Successfully registered query " + queryContainer.name + ": " + queryContainer);
} catch (Exception e) {
SimulationContext.logger.error(e.toString());
SimulationContext.logger.error("Could not register query "+queryContainer.name);
SimulationContext.logger.error(queryContainer.query);
}
}
// Setup complete, ready to run.
SimulationContext.logger.info("Simulation set up and ready to go!");
this.currentState = SimulationState.INITIALIZED;
}
/**
* Builds a query result observer depending on the given queryName.
* This is useful if a queries result set is larger than usual.
* @param queryName
* @return Observer for query result
*/
private Observer createResultObserverWindow(String queryName) {
Observer observer = null;
/*
if(false && queryName.equals("getEvents")) {
observer = new RawTextObserverWindow("[ResultProxy] " + queryName);
} else {
observer = new FancyTextObserverWindow("[ResultProxy] " + queryName);
}*/
observer = new FancyTextObserverWindow("[ResultProxy] " + queryName);
return observer;
}
private void enableReasoningForResultProxy(CsparqlQueryResultProxy resultProxy, QueryContainer queryContainer) {
try {
/* If not using the queryContainer, these would be valid parameters:
resultProxy.getSparqlQueryId(),
CsparqlUtils.fileToString("data/rdfs.rules"),
ReasonerChainingType.HYBRID,
CsparqlUtils.serializeRDFFile("data/carSimulationTBox.rdf")
*/
engine.updateReasoner(
resultProxy.getSparqlQueryId(),
queryContainer.ruleSet,
queryContainer.reasonerChainingType,
queryContainer.TBox
);
} catch(Exception e) {
e.printStackTrace();
SimulationContext.logger.error(e.toString());
SimulationContext.logger.error(e.getStackTrace().toString());
}
}
}