Skip to content

Commit

Permalink
Make Scalac complain more (#478)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Jan 11, 2021
1 parent 7ad7154 commit 380db7e
Show file tree
Hide file tree
Showing 34 changed files with 193 additions and 161 deletions.
14 changes: 14 additions & 0 deletions core/src/main/scala/akka/persistence/jdbc/JournalRow.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc

final case class JournalRow(
ordering: Long,
deleted: Boolean,
persistenceId: String,
sequenceNumber: Long,
message: Array[Byte],
tags: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.typesafe.config.{ Config, ConfigObject }

import scala.collection.JavaConverters._
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
override def lookup: SlickExtension.type = SlickExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
case Some(lastSeqNr) => lastSeqNr + 1
case None => from
}
Some((nextFrom, nextControl), xs)
Some(((nextFrom, nextControl), xs))
}
}

Expand All @@ -67,7 +67,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
akka.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity)
.mapConcat(identity(_))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DefaultJournalDao(
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
.map(AkkaSerialization.fromRow(serialization))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.map(AkkaSerialization.fromRow(serialization)(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@

package akka.persistence.jdbc.journal.dao.legacy

import akka.persistence.jdbc.journal.dao.{
BaseDao,
BaseJournalDaoWithReadMessages,
FlowControl,
H2Compat,
JournalDao,
JournalDaoWithReadMessages,
JournalDaoWithUpdates
}
import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates }
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.persistence.{ AtomicWrite, PersistentRepr }
Expand Down Expand Up @@ -118,9 +110,10 @@ trait BaseByteArrayJournalDao
val write = PersistentRepr(payload, sequenceNr, persistenceId)
val serializedRow = serializer.serialize(write) match {
case Success(t) => t
case Failure(ex) =>
case Failure(cause) =>
throw new IllegalArgumentException(
s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]")
s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]",
cause)
}
db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done))
}
Expand All @@ -141,7 +134,7 @@ trait BaseByteArrayJournalDao
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.via(serializer.deserializeFlow)
.map {
case Success((repr, _, ordering)) => Success(repr -> ordering)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
val (nextMax, _, missingElems) =
// using the ordering elements that were fetched, we verify if there are any gaps
elements.foldLeft[(OrderingId, OrderingId, MissingElements)](
currentMaxOrdering,
currentMaxOrdering,
MissingElements.empty) {
(currentMaxOrdering, currentMaxOrdering, MissingElements.empty)) {
case ((currentMax, previousElement, missing), currentElement) =>
// we must decide if we move the cursor forward
val newMax =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class DefaultReadJournalDao(

// This doesn't populate the tags. AFAICT they aren't used
Source
.fromPublisher(db.stream(queries.eventsByTag(tag, offset, maxOffset, correctMaxForH2Driver(max)).result))
.fromPublisher(db.stream(queries.eventsByTag((tag, offset, maxOffset, correctMaxForH2Driver(max))).result))
.map(row =>
AkkaSerialization.fromRow(serialization)(row).map { case (repr, ordering) => (repr, Set.empty, ordering) })
}

override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result))
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)
Expand All @@ -59,7 +59,7 @@ class DefaultReadJournalDao(
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
.map(AkkaSerialization.fromRow(serialization))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.map(AkkaSerialization.fromRow(serialization)(_))

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

package akka.persistence.jdbc.query.dao

import akka.persistence.jdbc.config.{
EventJournalTableConfiguration,
EventTagTableConfiguration,
LegacyJournalTableConfiguration,
ReadJournalConfig
}
import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration, ReadJournalConfig }
import akka.persistence.jdbc.journal.dao.JournalTables
import slick.jdbc.JdbcProfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {

val publisher = db.stream(queries.eventsByTag(s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max)).result)
val publisher = db.stream(queries.eventsByTag((s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max))).result)
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
Source
.fromPublisher(publisher)
Expand All @@ -57,7 +57,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.via(serializer.deserializeFlow)
.map {
case Success((repr, _, ordering)) => Success(repr -> ordering)
Expand All @@ -66,7 +66,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
}

override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result))
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def maxJournalSequence(): Future[Long] = {
db.run(queries.maxJournalSequenceQuery.result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package akka.persistence.jdbc.query.dao.legacy

import akka.persistence.jdbc.config.{ LegacyJournalTableConfiguration, ReadJournalConfig }
import akka.persistence.jdbc.journal.dao.legacy.{ JournalRow, JournalTables }
import akka.persistence.jdbc.journal.dao.legacy.JournalTables
import slick.jdbc.JdbcProfile

class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJournalConfig) extends JournalTables {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJo
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.javadsl._
import akka.stream.javadsl.Source
import akka.persistence.jdbc.util.PluginVersionChecker

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/akka/persistence/jdbc/query/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

package akka.persistence.jdbc

import akka.NotUsed
import akka.persistence.query._
import akka.stream.scaladsl.Source
import scala.language.implicitConversions

package object query {
implicit class OffsetOps(val that: Offset) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E
// Continue querying from the largest offset
xs.map(_.offset.value).max
}
Some((nextStartingOffset, nextControl), xs)
Some(((nextStartingOffset, nextControl), xs))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.collection.immutable._

import scala.util.Try

@deprecated(since = "5.0.0")
@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0")
trait PersistentReprSerializer[T] {

/**
Expand Down Expand Up @@ -45,6 +45,7 @@ trait PersistentReprSerializer[T] {
def deserialize(t: T): Try[(PersistentRepr, Set[String], Long)]
}

@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0")
trait FlowPersistentReprSerializer[T] extends PersistentReprSerializer[T] {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,48 +78,51 @@ class DefaultSnapshotDao(
override def snapshotForMaxTimestamp(
persistenceId: String,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result).map(zeroOrOneSnapshot)
db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result).map(zeroOrOneSnapshot)

override def snapshotForMaxSequenceNr(
persistenceId: String,
maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result).map(zeroOrOneSnapshot)
db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result)
.map(zeroOrOneSnapshot)

override def snapshotForMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
maxSequenceNr: Long,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(
queries
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.result)
.map(zeroOrOneSnapshot)
.map(zeroOrOneSnapshot(_))

override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot))
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic)
}

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete)
db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete)
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
.map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete)
db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
.map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
maxSequenceNr: Long,
maxTimestamp: Long): Future[Unit] =
db.run(
queries.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp).delete)
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
.map(_ => ())((ExecutionContexts.parasitic))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.persistence.jdbc.snapshot.dao

import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration }
import akka.persistence.jdbc.config.SnapshotTableConfiguration
import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow
import slick.jdbc.JdbcProfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.persistence.jdbc.snapshot.dao

import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration }
import akka.persistence.jdbc.config.SnapshotTableConfiguration
import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow
import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.isOracleDriver
import akka.persistence.jdbc.util.InputStreamOps.InputStreamImplicits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class ByteArraySnapshotDao(
persistenceId: String,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
for {
rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result)
rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result)
} yield rows.headOption.map(toSnapshotData)

override def snapshotForMaxSequenceNr(
persistenceId: String,
maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] =
for {
rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result)
rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result)
} yield rows.headOption.map(toSnapshotData)

override def snapshotForMaxSequenceNrAndMaxTimestamp(
Expand All @@ -60,7 +60,7 @@ class ByteArraySnapshotDao(
for {
rows <- db.run(
queries
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.result)
} yield rows.headOption.map(toSnapshotData)

Expand All @@ -71,7 +71,7 @@ class ByteArraySnapshotDao(

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete)
_ <- db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
} yield ()

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
Expand All @@ -81,12 +81,12 @@ class ByteArraySnapshotDao(

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete)
_ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
} yield ()

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete)
_ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
} yield ()

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
Expand All @@ -96,7 +96,7 @@ class ByteArraySnapshotDao(
for {
_ <- db.run(
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
} yield ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.sql.Statement

import scala.concurrent.Future
import akka.Done
import akka.actor.{ ActorSystem, ClassicActorSystemProvider }
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.persistence.jdbc.db.SlickDatabase
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc;

import akka.Done;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc

import akka.{ Done, NotUsed }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.actor.ActorSystem
import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig, SlickConfiguration }
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal
import akka.persistence.jdbc.util.DropCreate
import akka.persistence.jdbc.db.{ SlickDatabase, SlickDriver }
import akka.persistence.jdbc.db.SlickDatabase
import akka.util.Timeout
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
import org.scalatest.BeforeAndAfterEach
Expand Down
Loading

0 comments on commit 380db7e

Please sign in to comment.