Skip to content

Commit

Permalink
Make OPTIONAL clause in SPARQL path builder optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Schultz committed Nov 16, 2020
1 parent 8909655 commit 9b3faf9
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,35 @@ object SparqlPathBuilder {
}

/**
* Builds a SPARQL pattern from a sequence of paths.
*
* @param paths The paths
* @param subject The subject e.g. ?s or <uri>
* @param valuesPrefix The value of every path will be bound to a variable of the form: valuesPrefix{path.id}
*/
def apply(paths: Seq[UntypedPath], subject: String = "?s", valuesPrefix: String = "?v", tempVarPrefix: String = "?t", filterVarPrefix: String = "?f"): String = {
* Builds a SPARQL pattern from a sequence of paths.
*
* @param paths The paths
* @param subject The subject e.g. ?s or <uri>
* @param valuesPrefix The value of every path will be bound to a variable of the form: valuesPrefix{path.id}
* @param useOptional If the path query should use optionals around the path pattern.
*/
def apply(paths: Seq[UntypedPath],
subject: String = "?s",
valuesPrefix: String = "?v",
tempVarPrefix: String = "?t",
filterVarPrefix: String = "?f",
useOptional: Boolean): String = {
val vars = new Vars(subject, valuesPrefix, tempVarPrefix, filterVarPrefix)
paths.zipWithIndex.map {
case (path, index) => buildPath(path, index, vars)
case (path, index) => buildPath(path, index, vars, useOptional)
}.mkString
}

/**
* Builds a SPARQL pattern from a single Path.
*/
private def buildPath(path: UntypedPath, index: Int, vars: Vars): String = {
"OPTIONAL {\n" +
buildOperators(vars.subject, path.operators, vars).replace(vars.curTempVar, vars.newValueVar(path, index)) +
"}\n"
private def buildPath(path: UntypedPath, index: Int, vars: Vars, useOptional: Boolean): String = {
val pathPattern = buildOperators(vars.subject, path.operators, vars).replace(vars.curTempVar, vars.newValueVar(path, index))
if(useOptional) {
s"OPTIONAL {\n$pathPattern}\n"
} else {
pathPattern
}
}

/**
Expand Down Expand Up @@ -104,4 +113,4 @@ object SparqlPathBuilder {
def newValueVar(path: UntypedPath, index: Int): String = valuesPrefix + index
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class SparqlSource(params: SparqlParams, val sparqlEndpoint: SparqlEndpoint)
useDistinct = false,
graphUri = params.graph,
useOrderBy = false,
varPrefix = "v"
varPrefix = "v",
useOptional = false
)
val results = sparqlEndpoint.select(pathQuery, limit = limit.getOrElse(Int.MaxValue))
for(result <- results.bindings;
Expand Down Expand Up @@ -181,4 +182,4 @@ class SparqlSource(params: SparqlParams, val sparqlEndpoint: SparqlEndpoint)
SparqlSamplePathsCollector(sparqlEndpoint, params.graph, restriction, limit).toIndexedSeq
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package org.silkframework.plugins.dataset.rdf.sparql
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue, TimeUnit}
import java.util.logging.{Level, Logger}

import org.silkframework.dataset.rdf.{RdfNode, Resource, SparqlEndpoint}
import org.silkframework.dataset.rdf.{RdfNode, Resource, SparqlEndpoint, SparqlResults}
import org.silkframework.entity.paths.UntypedPath
import org.silkframework.entity.rdf.{SparqlEntitySchema, SparqlPathBuilder, SparqlRestriction}
import org.silkframework.entity.{Entity, EntitySchema}
Expand Down Expand Up @@ -67,7 +67,7 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
val startTime = System.currentTimeMillis()

val pathRetrievers = for (typedPath <- entitySchema.typedPaths) yield {
new PathRetriever(entityUris, SparqlEntitySchema.fromSchema(entitySchema, entityUris), typedPath.toUntypedPath, limit)
new PathRetriever(SparqlEntitySchema.fromSchema(entitySchema, entityUris), typedPath.toUntypedPath, limit)
}

pathRetrievers.foreach(_.start())
Expand Down Expand Up @@ -120,7 +120,7 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
}
}

private class PathRetriever(entityUris: Seq[Uri], entityDesc: SparqlEntitySchema, path: UntypedPath, limit: Option[Int])
private class PathRetriever(entityDesc: SparqlEntitySchema, path: UntypedPath, limit: Option[Int])
(implicit userContext: UserContext) extends Thread {
private val queue = new LinkedBlockingQueue[PathValues](maxQueueSize)

Expand Down Expand Up @@ -166,40 +166,38 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
}
}

private def queryPath()(implicit userContext: UserContext) = {
private def queryPath()(implicit userContext: UserContext): SparqlResults = {
val sparqlQuery = ParallelEntityRetriever.pathQuery(entityDesc.variable, entityDesc.restrictions, path,
useDistinct = true, graphUri = graphUri, useOrderBy = useOrderBy, varPrefix = varPrefix)
useDistinct = true, graphUri = graphUri, useOrderBy = useOrderBy, varPrefix = varPrefix, useOptional = true)

endpoint.select(sparqlQuery, limit.getOrElse(Int.MaxValue))
}

private val QUEUE_OFFER_TIMEOUT = 3600 // 1 hour, just a high number
private def queueElement(pathValues: PathValues): Boolean = queue.offer(pathValues, QUEUE_OFFER_TIMEOUT, TimeUnit.SECONDS)

private def parseResults(sparqlResults: Traversable[Map[String, RdfNode]], fixedSubject: Option[Uri] = None): Unit = {
var currentSubject: Option[String] = fixedSubject.map(_.uri)
private def parseResults(sparqlResults: Traversable[Map[String, RdfNode]]): Unit = {
var currentSubject: Option[String] = None
var currentValues: Seq[String] = Seq.empty

for (result <- sparqlResults) {
if (canceled) {
return
}

if (fixedSubject.isEmpty) {
//Check if we are still reading values for the current subject
val subject = result.get(entityDesc.variable) match {
case Some(Resource(value)) => Some(value)
case _ => None
}
//Check if we are still reading values for the current subject
val subject = result.get(entityDesc.variable) match {
case Some(Resource(value)) => Some(value)
case _ => None
}

if (currentSubject.isEmpty) {
currentSubject = subject
} else if (subject.isDefined && subject != currentSubject) {
queueElement(PathValues(currentSubject.get, currentValues))
if (currentSubject.isEmpty) {
currentSubject = subject
} else if (subject.isDefined && subject != currentSubject) {
queueElement(PathValues(currentSubject.get, currentValues))

currentSubject = subject
currentValues = Seq.empty
}
currentSubject = subject
currentValues = Seq.empty
}

if (currentSubject.isDefined) {
Expand Down Expand Up @@ -239,7 +237,8 @@ object ParallelEntityRetriever {
useDistinct: Boolean,
graphUri: Option[String],
useOrderBy: Boolean,
varPrefix: String): String = {
varPrefix: String,
useOptional: Boolean): String = {
//Select
val sparql = new StringBuilder
sparql append "SELECT "
Expand All @@ -262,7 +261,7 @@ object ParallelEntityRetriever {
} else {
sparql append restriction.toSparql + "\n"
}
sparql append SparqlPathBuilder(path :: Nil, "?" + subjectVar, "?" + varPrefix)
sparql append SparqlPathBuilder(path :: Nil, "?" + subjectVar, "?" + varPrefix, useOptional = useOptional)

sparql append "}" // END WHERE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SimpleEntityRetriever(endpoint: SparqlEndpoint,

addRestrictions(sparqlEntitySchema, sparql)

sparql append SparqlPathBuilder(sparqlEntitySchema.paths, "?" + sparqlEntitySchema.variable, "?" + varPrefix)
sparql append SparqlPathBuilder(sparqlEntitySchema.paths, "?" + sparqlEntitySchema.variable, "?" + varPrefix, useOptional = true)
// End GRAPH in subselect case
for (graph <- graphUri if !graph.isEmpty && useSubSelect) sparql append s"}"
sparql append "}" // END WHERE
Expand Down Expand Up @@ -180,4 +180,4 @@ class SimpleEntityRetriever(endpoint: SparqlEndpoint,

object SimpleEntityRetriever {
final val DEFAULT_PAGE_SIZE = 1000
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ abstract class EntityRetrieverBaseTest extends FlatSpec with MustMatchers with B
private val city = s"${pn}city"
private val country = s"${pn}country"
private val name = s"${pn}name"
private val age = s"${pn}age"

// Instances
private val person1 = s"${pn}Person1"
Expand Down Expand Up @@ -125,20 +126,25 @@ abstract class EntityRetrieverBaseTest extends FlatSpec with MustMatchers with B
}

it should "fetch a specific root entity" in {
val entitySchema = schema(Person, Seq(path(name)))
val entitySchema = schema(Person, Seq(path(name), path(age)))
val entities = retriever.retrieve(entitySchema, entities = Seq(person1), limit = None).toArray.toSeq
entities.map(_.uri.toString) mustBe Seq(person1)
entities.head.values mustBe IndexedSeq(Seq("John Doe"))
entities.head.values mustBe IndexedSeq(Seq("John Doe"), Seq("23"))
}

it should "fetch a specific root entity with restrictions" in {
for((personURI, expectedResult) <- Seq(person1, person2).zip(Seq(Seq("John Doe"), Seq()))) {
val entitySchema = schema(Person, Seq(path(name)), filter = Restriction.custom(s"?a <${pn}age> 23"))
for ((personURI, expectedResult) <- Seq(
person1,
person2) zip (Seq(
Seq(IndexedSeq(Seq("John Doe"), Seq("23"))),
Seq())
)) {
val entitySchema = schema(Person, Seq(path(name), path(age)), filter = Restriction.custom(s"?a <${pn}age> 23"))
val entities = retriever.retrieve(entitySchema, entities = Seq(personURI), limit = None).toArray.toSeq
entities.size mustBe expectedResult.size
if(entities.nonEmpty) {
entities.head.uri.toString mustBe personURI
entities.head.values mustBe IndexedSeq(expectedResult)
entities.head.values mustBe expectedResult.head
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.silkframework.plugins.dataset.rdf.sparql

import org.silkframework.dataset.rdf.SparqlEndpoint
import org.silkframework.dataset.rdf.{SparqlEndpoint, SparqlParams, SparqlResults}
import org.silkframework.entity.EntitySchema
import org.silkframework.entity.paths.UntypedPath
import org.silkframework.runtime.activity.UserContext

import scala.collection.mutable.ArrayBuffer

class ParallelEntityRetrieverTest extends EntityRetrieverBaseTest {
behavior of "Parallel entity retriever"
Expand All @@ -11,3 +16,21 @@ class ParallelEntityRetrieverTest extends EntityRetrieverBaseTest {
new ParallelEntityRetriever(endpoint, SimpleEntityRetriever.DEFAULT_PAGE_SIZE, graphUri, useOrderBy)
}
}

case class TestMockSparqlEndpoint(sparqlParams: SparqlParams) extends SparqlEndpoint {

private val queryQueue = new ArrayBuffer[(String, Int)]()

override def withSparqlParams(sparqlParams: SparqlParams): SparqlEndpoint = TestMockSparqlEndpoint(sparqlParams)

override def select(query: String, limit: Int)(implicit userContext: UserContext): SparqlResults = {
queryQueue.append((query, limit))
SparqlResults(Seq.empty)
}

def queries: Seq[(String, Int)] = queryQueue

def clearQueue(): Unit = {
queryQueue.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ class SparqlPathBuilderTest extends FlatSpec with Matchers {
build(s"?a\\$p1") should be(equalIgnoringWhitespace(s"OPTIONAL { ?v0 $p1 ?s . }"))
}

"SparqlPathBuilder" should "include Filter statements" in {
it should "build SPARQL patterns without OPTIONALs" in {
build(s"?a/$p1", useOptional = false) should be(equalIgnoringWhitespace(s"?s $p1 ?v0 ."))
build(s"?a\\$p1", useOptional = false) should be(equalIgnoringWhitespace(s"?v0 $p1 ?s ."))
}

it should "include Filter statements" in {
build(s"?a/<1>[<2> = <3>]") should be(equalIgnoringWhitespace("OPTIONAL { ?s <1> ?v0 . ?v0 <2> ?f1 . FILTER(?f1 = <3>). }"))
}

def build(path: String) = SparqlPathBuilder(Seq(UntypedPath.parse(path)))
def build(path: String, useOptional: Boolean = true): String = SparqlPathBuilder(Seq(UntypedPath.parse(path)), useOptional = useOptional)

}

0 comments on commit 9b3faf9

Please sign in to comment.