Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/pipeline/inferrer/common/guni…
Browse files Browse the repository at this point in the history
…corn-22.0.0
  • Loading branch information
StepanBrychta authored Oct 2, 2024
2 parents a40cbdc + 2813c24 commit ddf7665
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 140 deletions.
10 changes: 0 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,3 @@ lazy val tei_adapter = setupProject(
localDependencies = Seq(source_model, flows),
externalDependencies = CatalogueDependencies.teiAdapterServiceDependencies
)
// AWS Credentials to read from S3

s3CredentialsProvider := {
_ =>
val builder = new STSAssumeRoleSessionCredentialsProvider.Builder(
"arn:aws:iam::760097843905:role/terraform-20210811133135108800000001",
UUID.randomUUID().toString
)
builder.build()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package weco.pipeline.merger.services

import grizzled.slf4j.Logging
import weco.catalogue.internal_model.work.WorkState.Identified
import weco.catalogue.internal_model.work.Work
import weco.pipeline.matcher.models.WorkIdentifier
Expand All @@ -9,7 +10,7 @@ import scala.concurrent.{ExecutionContext, Future}

class IdentifiedWorkLookup(retriever: Retriever[Work[Identified]])(
implicit ec: ExecutionContext
) {
) extends Logging {
def fetchAllWorks(
workIdentifiers: Seq[WorkIdentifier]
): Future[Seq[Option[Work[Identified]]]] = {
Expand All @@ -28,14 +29,21 @@ class IdentifiedWorkLookup(retriever: Retriever[Work[Identified]])(
.map {
case WorkIdentifier(id, version) =>
val work = works(id.toString)
// We only want to get the exact versions of the works specified
// by the matcher.
//
// e.g. if the matcher said "combine Av1 and Bv2", and we look
// in the retriever and find {Av2, Bv3}, we shouldn't merge
// these -- we should wait for the matcher to confirm we should
// still be merging these two works.
if (work.version == version) Some(work) else None
// Being asked to merge non-matching versions is incorrect
// but it is possible for the version in the identified index
// to be higher than the version in the graph store.
// Choosing between:
// (a) a complete failure where no works are merged
// (b) a partial failure where only the works with matching versions are merged
// (c) a partial success where all works are merged, accepting the risk of inconsistency
// We choose (c) as the least disruptive option.
if (work.version != version) {
warn(
"Matching version inconsistent! " +
s"Found work $work with version $version, expected ${work.version}"
)
}
Some(work)
}
case RetrieverMultiResult(_, notFound) =>
throw new RuntimeException(s"Works not found: $notFound")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class IdentifiedWorkLookupTest
}
}

it("returns None if the stored version has a higher version") {
it("returns Some even if the stored version has a higher version") {
val oldWork = identifiedWork()
val newWork = oldWork.withVersion(oldWork.version + 1)

Expand All @@ -50,7 +50,7 @@ class IdentifiedWorkLookupTest
)

whenReady(fetchAllWorks(retriever = retriever, oldWork)) {
_ shouldBe Seq(None)
_ shouldBe Seq(Some(newWork))
}
}

Expand All @@ -72,14 +72,8 @@ class IdentifiedWorkLookupTest
}: _*)
)

val expectedLookupResult =
unchangedWorks.map { Some(_) } ++ (4 to 5).map {
_ =>
None
}

whenReady(fetchAllWorks(retriever = retriever, lookupWorks: _*)) {
_ shouldBe expectedLookupResult
_ shouldBe storedWorks.map(Some(_))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import weco.pipeline.matcher.models.MatcherResult._
import weco.pipeline.merger.fixtures.{MatcherResultFixture, MergerFixtures}
import weco.pipeline_storage.memory.MemoryRetriever

import scala.List
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
Expand Down Expand Up @@ -136,18 +137,17 @@ class MergerWorkerServiceTest
}
}

it("always sends the highest version of a Work") {
it("sends a Work even if the matched version is not the latest") {
withMergerWorkerServiceFixtures {
case (retriever, QueuePair(queue, dlq), senders, _, index) =>
val work = identifiedWork()
val olderWork = identifiedWork()
val newerWork =
identifiedWork(canonicalId = olderWork.state.canonicalId)
.withVersion(olderWork.version + 1)

val matcherResult = createMatcherResultWith(Set(Set(work, olderWork)))
val workA = identifiedWork()
val workB = identifiedWork()
val newerWorkB =
identifiedWork(canonicalId = workB.state.canonicalId)
.withVersion(workB.version + 1)
val matcherResult = createMatcherResultWith(Set(Set(workA, workB)))

retriever.index ++= Map(work.id -> work, newerWork.id -> newerWork)
retriever.index ++= Map(workA.id -> workA, workB.id -> newerWorkB)

sendNotificationToSQS(
queue = queue,
Expand All @@ -157,46 +157,15 @@ class MergerWorkerServiceTest
eventually {
assertQueueEmpty(queue)
assertQueueEmpty(dlq)
getWorksSent(senders) should contain only work.id
getWorksSent(senders) should contain allOf (workA.id, workB.id)
index shouldBe Map(
work.id -> Left(work.transition[Merged](matcherResult.createdTime))
)
}
}
}

it("discards Works with version 0") {
withMergerWorkerServiceFixtures {
case (retriever, QueuePair(queue, dlq), senders, metrics, index) =>
val versionZeroWork =
identifiedWork()
.withVersion(0)

val work =
identifiedWork(canonicalId = versionZeroWork.state.canonicalId)
.withVersion(1)

val matcherResult =
createMatcherResultWith(Set(Set(work, versionZeroWork)))

retriever.index ++= Map(work.id -> work)

sendNotificationToSQS(
queue = queue,
message = matcherResult
)

eventually {
assertQueueEmpty(queue)
assertQueueEmpty(dlq)

getWorksSent(senders) should contain only work.id
index shouldBe Map(
work.id -> Left(work.transition[Merged](matcherResult.createdTime))
workA.id -> Left(
workA.transition[Merged](matcherResult.createdTime)
),
workB.id -> Left(
newerWorkB.transition[Merged](matcherResult.createdTime)
)
)

metrics.incrementedCounts.length shouldBe 1
metrics.incrementedCounts.last should endWith("_success")
}
}
}
Expand Down Expand Up @@ -405,7 +374,7 @@ class MergerWorkerServiceTest
}
}

it("doesn't send anything if the works are outdated") {
it("sends messages even if the works are outdated") {
withMergerWorkerServiceFixtures {
case (retriever, QueuePair(queue, dlq), senders, metrics, index) =>
val work0 = identifiedWork().withVersion(0)
Expand All @@ -423,8 +392,10 @@ class MergerWorkerServiceTest
assertQueueEmpty(queue)
assertQueueEmpty(dlq)

getWorksSent(senders) shouldBe empty
index shouldBe Map()
getWorksSent(senders) shouldBe List(work1.id)
index shouldBe Map(
work1.id -> Left(work1.transition[Merged](matcherResult.createdTime))
)

metrics.incrementedCounts.length shouldBe 1
metrics.incrementedCounts.last should endWith("_success")
Expand Down
3 changes: 0 additions & 3 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ object Common {
val settings: Seq[Def.Setting[_]] = Seq(
scalaVersion := "2.12.16",
organization := "weco",
resolvers ++= Seq(
"Wellcome releases" at "s3://releases.mvn-repo.wellcomecollection.org/"
),
scalacOptions ++= Seq(
"-deprecation",
"-unchecked",
Expand Down
114 changes: 55 additions & 59 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,94 +1,90 @@
import sbt._
import scala.language.reflectiveCalls
object WellcomeDependencies {

val defaultVersion =
"32.42.0" // This is automatically bumped by the scala-libs release process, do not edit this line manually

lazy val versions = new {
val typesafe = defaultVersion
val fixtures = defaultVersion
val http = defaultVersion
val json = defaultVersion
val messaging = defaultVersion
val monitoring = defaultVersion
val storage = defaultVersion
val elasticsearch = defaultVersion
val typesafe = "32.42.1"
val fixtures = "32.42.1"
val http = "32.42.1"
val json = "32.42.1"
val messaging = "32.42.1"
val monitoring = "32.42.1"
val storage = "32.42.1"
val elasticsearch = "32.42.1"
val sierra = "32.42.1"
}

val jsonLibrary: Seq[ModuleID] = library(
name = "json",
version = versions.json
val jsonLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "json" % versions.json,
"org.wellcomecollection" %% "json" % versions.json % "test" classifier "tests"
)

val fixturesLibrary: Seq[ModuleID] = library(
name = "fixtures",
version = versions.fixtures

val fixturesLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "fixtures" % versions.fixtures,
"org.wellcomecollection" %% "fixtures" % versions.fixtures % "test" classifier "tests"
)

val messagingLibrary: Seq[ModuleID] = library(
name = "messaging",
version = versions.messaging

val messagingLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "messaging" % versions.messaging,
"org.wellcomecollection" %% "messaging" % versions.messaging % "test" classifier "tests"
)

val elasticsearchLibrary: Seq[ModuleID] = library(
name = "elasticsearch",
version = versions.elasticsearch
val elasticsearchLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "elasticsearch" % versions.elasticsearch,
"org.wellcomecollection" %% "elasticsearch" % versions.elasticsearch % "test" classifier "tests"
)

val elasticsearchTypesafeLibrary: Seq[ModuleID] = library(
name = "elasticsearch_typesafe",
version = versions.elasticsearch
val elasticsearchTypesafeLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "elasticsearch_typesafe" % versions.elasticsearch,
"org.wellcomecollection" %% "elasticsearch_typesafe" % versions.elasticsearch % "test" classifier "tests"
)

val httpLibrary: Seq[ModuleID] = library(
name = "http",
version = versions.http

val httpLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "http" % versions.http,
"org.wellcomecollection" %% "http" % versions.http % "test" classifier "tests"
)

val monitoringLibrary: Seq[ModuleID] = library(
name = "monitoring",
version = versions.monitoring

val monitoringLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "monitoring" % versions.monitoring,
"org.wellcomecollection" %% "monitoring" % versions.monitoring % "test" classifier "tests"
)

val monitoringTypesafeLibrary: Seq[ModuleID] = monitoringLibrary ++ library(
name = "monitoring_typesafe",
version = versions.monitoring
val monitoringTypesafeLibrary: Seq[ModuleID] = monitoringLibrary ++ Seq(
"org.wellcomecollection" %% "monitoring_typesafe" % versions.monitoring,
"org.wellcomecollection" %% "monitoring_typesafe" % versions.monitoring % "test" classifier "tests"
)

val storageLibrary: Seq[ModuleID] = library(
name = "storage",
version = versions.storage
val storageLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "storage" % versions.storage,
"org.wellcomecollection" %% "storage" % versions.storage % "test" classifier "tests"
)

val typesafeLibrary: Seq[ModuleID] = library(
name = "typesafe_app",
version = versions.typesafe
val typesafeLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "typesafe_app" % versions.typesafe,
"org.wellcomecollection" %% "typesafe_app" % versions.typesafe % "test" classifier "tests"
) ++ fixturesLibrary

val storageTypesafeLibrary: Seq[ModuleID] = storageLibrary ++ library(
name = "storage_typesafe",
version = versions.storage
val storageTypesafeLibrary: Seq[ModuleID] = storageLibrary ++ Seq(
"org.wellcomecollection" %% "storage_typesafe" % versions.storage,
"org.wellcomecollection" %% "storage_typesafe" % versions.storage % "test" classifier "tests"
)

val messagingTypesafeLibrary: Seq[ModuleID] = messagingLibrary ++ library(
name = "messaging_typesafe",
version = versions.messaging
val messagingTypesafeLibrary: Seq[ModuleID] = messagingLibrary ++ Seq(
"org.wellcomecollection" %% "messaging_typesafe" % versions.messaging,
"org.wellcomecollection" %% "messaging_typesafe" % versions.messaging % "test" classifier "tests"
) ++ monitoringLibrary

val sierraLibrary: Seq[ModuleID] = library(
name = "sierra",
version = defaultVersion
)

val sierraTypesafeLibrary: Seq[ModuleID] = sierraLibrary ++ library(
name = "sierra_typesafe",
version = defaultVersion
val sierraLibrary: Seq[ModuleID] = Seq(
"org.wellcomecollection" %% "sierra" % versions.sierra,
"org.wellcomecollection" %% "sierra" % versions.sierra % "test" classifier "tests"
)

private def library(name: String, version: String): Seq[ModuleID] = Seq(
"weco" %% name % version,
"weco" %% name % version % "test" classifier "tests"
val sierraTypesafeLibrary: Seq[ModuleID] = sierraLibrary ++ Seq(
"org.wellcomecollection" %% "sierra_typesafe" % versions.sierra,
"org.wellcomecollection" %% "sierra_typesafe" % versions.sierra % "test" classifier "tests"
)
}

Expand Down

0 comments on commit ddf7665

Please sign in to comment.