[TASK] Do the very basic setup and have a simple, working s p o query.
This commit is contained in:
parent
61aa0e5697
commit
0d6f4d2530
|
@ -0,0 +1,46 @@
|
||||||
|
package lu.jpt.csparqltest;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.larkc.csparql.cep.api.RdfQuadruple;
|
||||||
|
import eu.larkc.csparql.cep.api.RdfStream;
|
||||||
|
|
||||||
|
public class CarStreamGenerator extends RdfStream implements Runnable {
|
||||||
|
protected final Logger logger = LoggerFactory.getLogger(CarStreamGenerator.class);
|
||||||
|
|
||||||
|
private int counter = 1;
|
||||||
|
private volatile boolean keepRunning = false;
|
||||||
|
|
||||||
|
public CarStreamGenerator(String iri) {
|
||||||
|
super(iri);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void pleaseStop() {
|
||||||
|
keepRunning = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
this.keepRunning = true;
|
||||||
|
while (this.keepRunning) {
|
||||||
|
final RdfQuadruple q = new RdfQuadruple(getIRI() + "/S" + this.counter, getIRI() + "/P" + this.counter,
|
||||||
|
getIRI() + "/O" + this.counter, System.currentTimeMillis());
|
||||||
|
|
||||||
|
if (this.counter % 10 == 0) {
|
||||||
|
logger.info(this.counter + " triples streamed so far");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.put(q);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,9 +1,70 @@
|
||||||
package lu.jpt.csparqltest;
|
package lu.jpt.csparqltest;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.larkc.csparql.cep.api.RdfStream;
|
||||||
|
import eu.larkc.csparql.core.engine.ConsoleFormatter;
|
||||||
|
import eu.larkc.csparql.core.engine.CsparqlEngine;
|
||||||
|
import eu.larkc.csparql.core.engine.CsparqlEngineImpl;
|
||||||
|
import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy;
|
||||||
|
|
||||||
public class Main {
|
public class Main {
|
||||||
|
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(Main.class);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
System.out.println("Hello");
|
logger.debug("Let's go!");
|
||||||
|
|
||||||
|
// Instantiate and initialize engine
|
||||||
|
CsparqlEngine engine = new CsparqlEngineImpl();
|
||||||
|
engine.initialize(true);
|
||||||
|
|
||||||
|
// Create and register stream generator at specific URI
|
||||||
|
RdfStream carStreamGenerator = new CarStreamGenerator("http://myexample.org/stream");
|
||||||
|
engine.registerStream(carStreamGenerator);
|
||||||
|
|
||||||
|
// Run stream generator
|
||||||
|
final Thread t = new Thread((Runnable) carStreamGenerator);
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
// Now build a query to run
|
||||||
|
String query = "REGISTER QUERY MyQueryName AS "
|
||||||
|
+ "PREFIX ex: <http://myexample.org/> "
|
||||||
|
+ "SELECT ?s ?p ?o "
|
||||||
|
+ "FROM STREAM <http://myexample.org/stream> [RANGE 5s STEP 1s] "
|
||||||
|
+ "WHERE { ?s ?p ?o "
|
||||||
|
+ "} ";
|
||||||
|
|
||||||
|
// Create a result proxy by registering the query at the engine
|
||||||
|
CsparqlQueryResultProxy resultProxy = null;
|
||||||
|
try {
|
||||||
|
resultProxy = engine.registerQuery(query, false);
|
||||||
|
} catch (ParseException e1) {
|
||||||
|
logger.error(e1.toString());
|
||||||
|
logger.error(e1.getStackTrace().toString());
|
||||||
|
}
|
||||||
|
// Add ConsoleFormatter as observer so it gets notified of every query result
|
||||||
|
resultProxy.addObserver(new ConsoleFormatter());
|
||||||
|
|
||||||
|
// Let it all run for a couple of seconds
|
||||||
|
try {
|
||||||
|
Thread.sleep(200000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unregister the resultProxy (and thus the query) from the engine
|
||||||
|
engine.unregisterQuery(resultProxy.getId());
|
||||||
|
|
||||||
|
// Softly stop the stream generator and unregister it from the engine
|
||||||
|
((CarStreamGenerator) carStreamGenerator).pleaseStop();
|
||||||
|
engine.unregisterStream(carStreamGenerator.getIRI());
|
||||||
|
|
||||||
|
// That's it!
|
||||||
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue