From 4355ae211081807c066b8bf4c4e296e93211fabb Mon Sep 17 00:00:00 2001 From: Andreas Schultz Date: Tue, 17 Nov 2020 14:38:30 +0100 Subject: [PATCH] Fix parallel entity retriever error handling --- .../rdf/sparql/ParallelEntityRetriever.scala | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/sparql/ParallelEntityRetriever.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/sparql/ParallelEntityRetriever.scala index 44137fa86f..34a9104927 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/sparql/ParallelEntityRetriever.scala +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/sparql/ParallelEntityRetriever.scala @@ -21,6 +21,7 @@ import org.silkframework.dataset.rdf.{RdfNode, Resource, SparqlEndpoint, SparqlR import org.silkframework.entity.paths.UntypedPath import org.silkframework.entity.rdf.{SparqlEntitySchema, SparqlPathBuilder, SparqlRestriction} import org.silkframework.entity.{Entity, EntitySchema} +import org.silkframework.plugins.dataset.rdf.sparql.ParallelEntityRetriever.{ExceptionPathValues, ExistingPathValues, PathValues, QueueEndMarker} import org.silkframework.runtime.activity.UserContext import org.silkframework.util.Uri @@ -124,7 +125,6 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint, (implicit userContext: UserContext) extends Thread { private val queue = new LinkedBlockingQueue[PathValues](maxQueueSize) - @volatile private var exception: Throwable = _ private var nextElement: Option[PathValues] = None def hasNext: Boolean = { @@ -133,21 +133,18 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint, } else { nextElement = Option(queue.take()) //Throw exceptions which occurred during querying - if (exception != null) throw exception moreEntriesAvailable } } private def moreEntriesAvailable: Boolean = nextElement match { - case Some(e) => e != EndPathValues + case Some(e) => e != QueueEndMarker case _ => false } def next(): PathValues = { - //Throw exceptions which occurred during querying - if (exception != null) throw exception nextElement match { - case Some(e) if e != EndPathValues=> + case Some(e) if e != QueueEndMarker => nextElement = None e case _ => @@ -162,7 +159,8 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint, parseResults(sparqlResults.bindings) } catch { - case ex: Throwable => exception = ex + case ex: Throwable => + queue.put(ExceptionPathValues(ex)) } } @@ -194,7 +192,7 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint, if (currentSubject.isEmpty) { currentSubject = subject } else if (subject.isDefined && subject != currentSubject) { - queueElement(PathValues(currentSubject.get, currentValues)) + queueElement(ExistingPathValues(currentSubject.get, currentValues)) currentSubject = subject currentValues = Seq.empty @@ -208,15 +206,11 @@ class ParallelEntityRetriever(endpoint: SparqlEndpoint, } for (s <- currentSubject if sparqlResults.nonEmpty) { - queueElement(PathValues(s, currentValues)) + queueElement(ExistingPathValues(s, currentValues)) } - queueElement(EndPathValues) + queueElement(QueueEndMarker) } } - - private case class PathValues(uri: String, values: Seq[String]) - private val EndPathValues = PathValues("", Seq.empty) // Signals end - } object ParallelEntityRetriever { @@ -308,4 +302,27 @@ object ParallelEntityRetriever { } sparql.toString } + + sealed trait PathValues { + def uri: String + def values: Seq[String] + def failure: Option[Throwable] + } + /** Actual values. */ + case class ExistingPathValues(uri: String, values: Seq[String]) extends PathValues { + override def failure: Option[Throwable] = None + } + /** Object that marks the end of the queue. */ + object QueueEndMarker extends PathValues { + val uri: String = "" + val values: Seq[String] = Seq.empty + val failure: Option[Throwable] = None + } + /** Failure case */ + case class ExceptionPathValues(exception: Throwable) extends PathValues { + override def uri: String = throw exception + override def values: Seq[String] = throw exception + + override def failure: Option[Throwable] = Some(exception) + } }