From 0d6f4d25306eb4e540e7104c512ed5b5ea7d2e9b Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Tue, 24 May 2016 14:08:24 +0200 Subject: [PATCH] [TASK] Do the very basic setup and have a simple, working s p o query. --- .../jpt/csparqltest/CarStreamGenerator.java | 46 ++++++++++++++ src/lu/jpt/csparqltest/Main.java | 63 ++++++++++++++++++- 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 src/lu/jpt/csparqltest/CarStreamGenerator.java diff --git a/src/lu/jpt/csparqltest/CarStreamGenerator.java b/src/lu/jpt/csparqltest/CarStreamGenerator.java new file mode 100644 index 0000000..33cf277 --- /dev/null +++ b/src/lu/jpt/csparqltest/CarStreamGenerator.java @@ -0,0 +1,46 @@ +package lu.jpt.csparqltest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.larkc.csparql.cep.api.RdfQuadruple; +import eu.larkc.csparql.cep.api.RdfStream; + +public class CarStreamGenerator extends RdfStream implements Runnable { + protected final Logger logger = LoggerFactory.getLogger(CarStreamGenerator.class); + + private int counter = 1; + private volatile boolean keepRunning = false; + + public CarStreamGenerator(String iri) { + super(iri); + } + + public void pleaseStop() { + keepRunning = false; + } + + @Override + public void run() { + this.keepRunning = true; + while (this.keepRunning) { + final RdfQuadruple q = new RdfQuadruple(getIRI() + "/S" + this.counter, getIRI() + "/P" + this.counter, + getIRI() + "/O" + this.counter, System.currentTimeMillis()); + + if (this.counter % 10 == 0) { + logger.info(this.counter + " triples streamed so far"); + } + + this.put(q); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + this.counter++; + } + } + +} diff --git a/src/lu/jpt/csparqltest/Main.java b/src/lu/jpt/csparqltest/Main.java index 6b9d5cb..f24ede7 100644 --- a/src/lu/jpt/csparqltest/Main.java +++ b/src/lu/jpt/csparqltest/Main.java @@ -1,9 +1,70 @@ package lu.jpt.csparqltest; +import java.text.ParseException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.larkc.csparql.cep.api.RdfStream; +import eu.larkc.csparql.core.engine.ConsoleFormatter; +import eu.larkc.csparql.core.engine.CsparqlEngine; +import eu.larkc.csparql.core.engine.CsparqlEngineImpl; +import eu.larkc.csparql.core.engine.CsparqlQueryResultProxy; + public class Main { + private static Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) { - System.out.println("Hello"); + logger.debug("Let's go!"); + + // Instantiate and initialize engine + CsparqlEngine engine = new CsparqlEngineImpl(); + engine.initialize(true); + + // Create and register stream generator at specific URI + RdfStream carStreamGenerator = new CarStreamGenerator("http://myexample.org/stream"); + engine.registerStream(carStreamGenerator); + + // Run stream generator + final Thread t = new Thread((Runnable) carStreamGenerator); + t.start(); + + // Now build a query to run + String query = "REGISTER QUERY MyQueryName AS " + + "PREFIX ex: " + + "SELECT ?s ?p ?o " + + "FROM STREAM [RANGE 5s STEP 1s] " + + "WHERE { ?s ?p ?o " + + "} "; + + // Create a result proxy by registering the query at the engine + CsparqlQueryResultProxy resultProxy = null; + try { + resultProxy = engine.registerQuery(query, false); + } catch (ParseException e1) { + logger.error(e1.toString()); + logger.error(e1.getStackTrace().toString()); + } + // Add ConsoleFormatter as observer so it gets notified of every query result + resultProxy.addObserver(new ConsoleFormatter()); + + // Let it all run for a couple of seconds + try { + Thread.sleep(200000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + + // Unregister the resultProxy (and thus the query) from the engine + engine.unregisterQuery(resultProxy.getId()); + + // Softly stop the stream generator and unregister it from the engine + ((CarStreamGenerator) carStreamGenerator).pleaseStop(); + engine.unregisterStream(carStreamGenerator.getIRI()); + + // That's it! + System.exit(0); } }