Skip to content

Commit

Permalink
Fix parallel entity retriever error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Schultz committed Nov 17, 2020
1 parent d578218 commit 4355ae2
Showing 1 changed file with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.silkframework.dataset.rdf.{RdfNode, Resource, SparqlEndpoint, SparqlR
import org.silkframework.entity.paths.UntypedPath
import org.silkframework.entity.rdf.{SparqlEntitySchema, SparqlPathBuilder, SparqlRestriction}
import org.silkframework.entity.{Entity, EntitySchema}
import org.silkframework.plugins.dataset.rdf.sparql.ParallelEntityRetriever.{ExceptionPathValues, ExistingPathValues, PathValues, QueueEndMarker}
import org.silkframework.runtime.activity.UserContext
import org.silkframework.util.Uri

Expand Down Expand Up @@ -124,7 +125,6 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
(implicit userContext: UserContext) extends Thread {
private val queue = new LinkedBlockingQueue[PathValues](maxQueueSize)

@volatile private var exception: Throwable = _
private var nextElement: Option[PathValues] = None

def hasNext: Boolean = {
Expand All @@ -133,21 +133,18 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
} else {
nextElement = Option(queue.take())
//Throw exceptions which occurred during querying
if (exception != null) throw exception
moreEntriesAvailable
}
}

private def moreEntriesAvailable: Boolean = nextElement match {
case Some(e) => e != EndPathValues
case Some(e) => e != QueueEndMarker
case _ => false
}

def next(): PathValues = {
//Throw exceptions which occurred during querying
if (exception != null) throw exception
nextElement match {
case Some(e) if e != EndPathValues=>
case Some(e) if e != QueueEndMarker =>
nextElement = None
e
case _ =>
Expand All @@ -162,7 +159,8 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
parseResults(sparqlResults.bindings)
}
catch {
case ex: Throwable => exception = ex
case ex: Throwable =>
queue.put(ExceptionPathValues(ex))
}
}

Expand Down Expand Up @@ -194,7 +192,7 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
if (currentSubject.isEmpty) {
currentSubject = subject
} else if (subject.isDefined && subject != currentSubject) {
queueElement(PathValues(currentSubject.get, currentValues))
queueElement(ExistingPathValues(currentSubject.get, currentValues))

currentSubject = subject
currentValues = Seq.empty
Expand All @@ -208,15 +206,11 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint,
}

for (s <- currentSubject if sparqlResults.nonEmpty) {
queueElement(PathValues(s, currentValues))
queueElement(ExistingPathValues(s, currentValues))
}
queueElement(EndPathValues)
queueElement(QueueEndMarker)
}
}

private case class PathValues(uri: String, values: Seq[String])
private val EndPathValues = PathValues("", Seq.empty) // Signals end

}

object ParallelEntityRetriever {
Expand Down Expand Up @@ -308,4 +302,27 @@ object ParallelEntityRetriever {
}
sparql.toString
}

sealed trait PathValues {
def uri: String
def values: Seq[String]
def failure: Option[Throwable]
}
/** Actual values. */
case class ExistingPathValues(uri: String, values: Seq[String]) extends PathValues {
override def failure: Option[Throwable] = None
}
/** Object that marks the end of the queue. */
object QueueEndMarker extends PathValues {
val uri: String = ""
val values: Seq[String] = Seq.empty
val failure: Option[Throwable] = None
}
/** Failure case */
case class ExceptionPathValues(exception: Throwable) extends PathValues {
override def uri: String = throw exception
override def values: Seq[String] = throw exception

override def failure: Option[Throwable] = Some(exception)
}
}

0 comments on commit 4355ae2

Please sign in to comment.