diff --git a/src/lu/jpt/csparqltest/CarStreamGenerator.java b/src/lu/jpt/csparqltest/CarStreamGenerator.java
new file mode 100644
index 0000000..33cf277
--- /dev/null
+++ b/src/lu/jpt/csparqltest/CarStreamGenerator.java
@@ -0,0 +1,46 @@
+package lu.jpt.csparqltest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.larkc.csparql.cep.api.RdfQuadruple;
+import eu.larkc.csparql.cep.api.RdfStream;
+
+public class CarStreamGenerator extends RdfStream implements Runnable {
+ protected final Logger logger = LoggerFactory.getLogger(CarStreamGenerator.class);
+
+ private int counter = 1;
+ private volatile boolean keepRunning = false;
+
+ public CarStreamGenerator(String iri) {
+ super(iri);
+ }
+
+ public void pleaseStop() {
+ keepRunning = false;
+ }
+
+ @Override
+ public void run() {
+ this.keepRunning = true;
+ while (this.keepRunning) {
+ final RdfQuadruple q = new RdfQuadruple(getIRI() + "/S" + this.counter, getIRI() + "/P" + this.counter,
+ getIRI() + "/O" + this.counter, System.currentTimeMillis());
+
+ if (this.counter % 10 == 0) {
+ logger.info(this.counter + " triples streamed so far");
+ }
+
+ this.put(q);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ this.counter++;
+ }
+ }
+
+}
diff --git a/src/lu/jpt/csparqltest/Main.java b/src/lu/jpt/csparqltest/Main.java
index 6b9d5cb..f24ede7 100644
--- a/src/lu/jpt/csparqltest/Main.java
+++ b/src/lu/jpt/csparqltest/Main.java
@@ -1,9 +1,70 @@
package lu.jpt.csparqltest;
+import java.text.ParseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.larkc.csparql.cep.api.RdfStream;
+import eu.larkc.csparql.core.engine.ConsoleFormatter;
+import eu.larkc.csparql.core.engine.CsparqlEngine;
+import eu.larkc.csparql.core.engine.CsparqlEngineImpl;
+import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy;
+
public class Main {
+ private static Logger logger = LoggerFactory.getLogger(Main.class);
+
public static void main(String[] args) {
- System.out.println("Hello");
+ logger.debug("Let's go!");
+
+ // Instantiate and initialize engine
+ CsparqlEngine engine = new CsparqlEngineImpl();
+ engine.initialize(true);
+
+ // Create and register stream generator at specific URI
+ RdfStream carStreamGenerator = new CarStreamGenerator("http://myexample.org/stream");
+ engine.registerStream(carStreamGenerator);
+
+ // Run stream generator
+ final Thread t = new Thread((Runnable) carStreamGenerator);
+ t.start();
+
+ // Now build a query to run
+ String query = "REGISTER QUERY MyQueryName AS "
+ + "PREFIX ex: "
+ + "SELECT ?s ?p ?o "
+ + "FROM STREAM [RANGE 5s STEP 1s] "
+ + "WHERE { ?s ?p ?o "
+ + "} ";
+
+ // Create a result proxy by registering the query at the engine
+ CsparqlQueryResultProxy resultProxy = null;
+ try {
+ resultProxy = engine.registerQuery(query, false);
+ } catch (ParseException e1) {
+ logger.error(e1.toString());
+ logger.error(e1.getStackTrace().toString());
+ }
+ // Add ConsoleFormatter as observer so it gets notified of every query result
+ resultProxy.addObserver(new ConsoleFormatter());
+
+ // Let it all run for a couple of seconds
+ try {
+ Thread.sleep(200000);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ // Unregister the resultProxy (and thus the query) from the engine
+ engine.unregisterQuery(resultProxy.getId());
+
+ // Softly stop the stream generator and unregister it from the engine
+ ((CarStreamGenerator) carStreamGenerator).pleaseStop();
+ engine.unregisterStream(carStreamGenerator.getIRI());
+
+ // That's it!
+ System.exit(0);
}
}