Skip to content

Commit

Permalink
Run AvoidInfix on spark and kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
olafurpg committed Oct 12, 2016
1 parent f117534 commit 4aec1d2
Show file tree
Hide file tree
Showing 173 changed files with 1,441 additions and 1,408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ object AdminUtils extends Logging {
Json.parseFull(str) match {
case None => // there are no config overrides
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect { case (k: String, v: Any) => k -> v }
val map = mapAnon.collect { case (k: String, v: Any) => k -> v }
require(map("version") == 1)
map.get("config") match {
case Some(config: Map[_, _]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,24 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
case path => s"$path/$child"
}
}.foreach(setAclsRecursively)
promise success "done"
promise.success("done")
case Code.CONNECTIONLOSS =>
zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
case Code.NONODE =>
warn(
"Node is gone, it could be have been legitimately deleted: %s"
.format(path))
promise success "done"
promise.success("done")
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(
KeeperException.create(Code.get(rc)))
promise.failure(
ZkException.create(KeeperException.create(Code.get(rc))))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(
KeeperException.create(Code.get(rc)))
promise.failure(
ZkException.create(KeeperException.create(Code.get(rc))))
}
}
}
Expand All @@ -229,7 +229,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
Code.get(rc) match {
case Code.OK =>
info("Successfully set ACLs for %s".format(path))
promise success "done"
promise.success("done")
case Code.CONNECTIONLOSS =>
zkHandle.setACL(path,
ZkUtils.DefaultAcls(zkUtils.isSecure),
Expand All @@ -240,17 +240,17 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
warn(
"Znode is gone, it could be have been legitimately deleted: %s"
.format(path))
promise success "done"
promise.success("done")
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(
KeeperException.create(Code.get(rc)))
promise.failure(
ZkException.create(KeeperException.create(Code.get(rc))))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(
KeeperException.create(Code.get(rc)))
promise.failure(
ZkException.create(KeeperException.create(Code.get(rc))))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ZkNodeChangeNotificationListener(
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
data map (notificationHandler.processNotification(_)) getOrElse
data.map(notificationHandler.processNotification(_)) getOrElse
(logger.warn(
s"read null data from $changeZnode when processing notification $notification"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ private[kafka] class ZookeeperConsumerConnector(
val updatedTopics = allTopics.filter(topic =>
topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))

val addedTopics = updatedTopics filterNot (wildcardTopics contains)
val addedTopics = updatedTopics.filterNot(wildcardTopics contains)
if (addedTopics.nonEmpty)
info("Topic event: added topics = %s".format(addedTopics))

Expand All @@ -1240,7 +1240,7 @@ private[kafka] class ZookeeperConsumerConnector(
* 0.8 release). We may need to remove these topics from the rebalance
* listener's map in reinitializeConsumer.
*/
val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
val deletedTopics = wildcardTopics.filterNot(updatedTopics contains)
if (deletedTopics.nonEmpty)
info("Topic event: deleted topics = %s".format(deletedTopics))

Expand Down
2 changes: 1 addition & 1 deletion repos/kafka/core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class LogManager(val logDirs: Array[File],
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values

val jobsForDir =
logsInDir map { log =>
logsInDir.map { log =>
CoreUtils.runnable {
// flush the log to ensure latest possible recovery point
log.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class DynamicConfigManager(
case None => // There are no config overrides.
// Ignore non-json notifications because they can be from the deprecated TopicConfigManager
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect { case (k: String, v: Any) => k -> v }
val map = mapAnon.collect { case (k: String, v: Any) => k -> v }
require(map("version") == 1)

val entityType = map.get("entity_type") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members =
joinResult.members map {
joinResult.members.map {
case (memberId, metadataArray) =>
(memberId, ByteBuffer.wrap(metadataArray))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object CommandLineUtils extends Logging {
*/
def parseKeyValueArgs(args: Iterable[String],
acceptMissingValue: Boolean = true): Properties = {
val splits = args.map(_ split "=").filterNot(_.length == 0)
val splits = args.map(_.split("=")).filterNot(_.length == 0)

val props = new Properties
for (a <- splits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
partitions: Set[TopicPartition]): Boolean = {
val allNonEmptyAssignments =
assignments forall (assignment => assignment.size > 0)
assignments.forall(assignment => assignment.size > 0)
if (!allNonEmptyAssignments) {
// at least one consumer got empty assignment
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertTrue("Request should have completed", future.isDone)

// make sure all of them end up in the same partition with increasing offset values
for ((future, offset) <- futures zip (0 until numRecords)) {
for ((future, offset) <- futures.zip(0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)
assertEquals(topic, future.get.topic)
assertEquals(partition, future.get.partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// until subscribe is called on all consumers
TestUtils.waitUntilTrue(
() => {
consumerPollers forall
(poller => poller.isSubscribeRequestProcessed())
consumerPollers.forall(poller => poller.isSubscribeRequestProcessed())
},
s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}",
1000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ProducerCompressionTest(compression: String)
null,
message))
val futures = responses.toList
for ((future, offset) <- futures zip (0 until numRecords)) {
for ((future, offset) <- futures.zip(0 until numRecords)) {
assertEquals(offset.toLong, future.get.offset)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ReassignPartitionsCommandTest
disableRackAware = false)

val assignment =
proposedAssignment map {
proposedAssignment.map {
case (topicPartition, replicas) =>
(topicPartition.partition, replicas)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
var counter = 0
while (!found && counter < 10) {
for (server <- this.servers) {
val previousEpoch = (epochMap get server.config.brokerId) match {
val previousEpoch = (epochMap.get(server.config.brokerId)) match {
case Some(epoch) =>
epoch
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
new Resource(Group, "test-ConsumerGroup") -> acls
)

resourceToAcls foreach {
resourceToAcls.foreach {
case (key, value) =>
changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HighwatermarkPersistenceTest {
.map(KafkaConfig.fromProps)
val topic = "foo"
val logManagers =
configs map { config =>
configs.map { config =>
TestUtils.createLogManager(logDirs =
config.logDirs.map(new File(_)).toArray,
cleanerConfig = CleanerConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class CountMinSketchSuite extends FunSuite {
sketch
}

val mergedSketch = sketches.reduce(_ mergeInPlace _)
val mergedSketch = sketches.reduce(_.mergeInPlace(_))
checkSerDe(mergedSketch)

val expectedSketch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class SimpleFutureAction[T] private[spark] (jobWaiter: JobWaiter[_],

override def onComplete[U](func: (Try[T]) => U)(
implicit executor: ExecutionContext) {
jobWaiter.completionFuture onComplete { _ =>
jobWaiter.completionFuture.onComplete { _ =>
func(value.get)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf,

def stop(): Unit = synchronized {
if (isDriver) {
coordinatorRef.foreach(_ send StopCoordinator)
coordinatorRef.foreach(_.send(StopCoordinator))
coordinatorRef = None
authorizedCommittersByStage.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ExecutorInfo(val executorHost: String,

override def equals(other: Any): Boolean = other match {
case that: ExecutorInfo =>
(that canEqual this) && executorHost == that.executorHost &&
(that.canEqual(this)) && executorHost == that.executorHost &&
totalCores == that.totalCores && logUrlMap == that.logUrlMap
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ private[spark] class Benchmark(name: String,
case (result, benchmark) =>
printf("%-35s %16s %12s %13s %10s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%10.1f" format result.bestRate,
"%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
"%5.0f / %4.0f".format(result.bestMs, result.avgMs),
"%10.1f".format(result.bestRate),
"%6.1f".format(1000 / result.bestRate),
"%3.1fX".format(firstBest / result.bestMs))
}
println
// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private[spark] object ClosureCleaner extends Logging {
// List of outer (class, object) pairs, ordered from outermost to innermost
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] =
(outerClasses zip outerObjects).reverse
(outerClasses.zip(outerObjects)).reverse
var parent: AnyRef = null
if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
// The closure is ultimately nested inside a class; keep the object of that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,89 +1086,89 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)

private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
manager.invokePrivate(_numExecutorsToAdd())
}

private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsTarget()
manager.invokePrivate(_numExecutorsTarget())
}

private def executorsPendingToRemove(
manager: ExecutorAllocationManager): collection.Set[String] = {
manager invokePrivate _executorsPendingToRemove()
manager.invokePrivate(_executorsPendingToRemove())
}

private def executorIds(
manager: ExecutorAllocationManager): collection.Set[String] = {
manager invokePrivate _executorIds()
manager.invokePrivate(_executorIds())
}

private def addTime(manager: ExecutorAllocationManager): Long = {
manager invokePrivate _addTime()
manager.invokePrivate(_addTime())
}

private def removeTimes(
manager: ExecutorAllocationManager): collection.Map[String, Long] = {
manager invokePrivate _removeTimes()
manager.invokePrivate(_removeTimes())
}

private def schedule(manager: ExecutorAllocationManager): Unit = {
manager invokePrivate _schedule()
manager.invokePrivate(_schedule())
}

private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _maxNumExecutorsNeeded()
manager.invokePrivate(_maxNumExecutorsNeeded())
}

private def addExecutors(manager: ExecutorAllocationManager): Int = {
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
val maxNumExecutorsNeeded = manager.invokePrivate(_maxNumExecutorsNeeded())
manager.invokePrivate(_addExecutors(maxNumExecutorsNeeded))
}

private def adjustRequestedExecutors(
manager: ExecutorAllocationManager): Int = {
manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
manager.invokePrivate(_updateAndSyncNumExecutorsTarget(0L))
}

private def removeExecutor(manager: ExecutorAllocationManager,
id: String): Boolean = {
manager invokePrivate _removeExecutor(id)
manager.invokePrivate(_removeExecutor(id))
}

private def onExecutorAdded(manager: ExecutorAllocationManager,
id: String): Unit = {
manager invokePrivate _onExecutorAdded(id)
manager.invokePrivate(_onExecutorAdded(id))
}

private def onExecutorRemoved(manager: ExecutorAllocationManager,
id: String): Unit = {
manager invokePrivate _onExecutorRemoved(id)
manager.invokePrivate(_onExecutorRemoved(id))
}

private def onSchedulerBacklogged(manager: ExecutorAllocationManager): Unit = {
manager invokePrivate _onSchedulerBacklogged()
manager.invokePrivate(_onSchedulerBacklogged())
}

private def onSchedulerQueueEmpty(manager: ExecutorAllocationManager): Unit = {
manager invokePrivate _onSchedulerQueueEmpty()
manager.invokePrivate(_onSchedulerQueueEmpty())
}

private def onExecutorIdle(manager: ExecutorAllocationManager,
id: String): Unit = {
manager invokePrivate _onExecutorIdle(id)
manager.invokePrivate(_onExecutorIdle(id))
}

private def onExecutorBusy(manager: ExecutorAllocationManager,
id: String): Unit = {
manager invokePrivate _onExecutorBusy(id)
manager.invokePrivate(_onExecutorBusy(id))
}

private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _localityAwareTasks()
manager.invokePrivate(_localityAwareTasks())
}

private def hostToLocalTaskCount(
manager: ExecutorAllocationManager): Map[String, Int] = {
manager invokePrivate _hostToLocalTaskCount()
manager.invokePrivate(_hostToLocalTaskCount())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ class SparkContextSchedulerCreationSuite
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]](
'createTaskScheduler)
val (_, sched) =
SparkContext invokePrivate createTaskSchedulerMethod(sc,
master,
deployMode)
SparkContext.invokePrivate(
createTaskSchedulerMethod(sc, master, deployMode))
sched.asInstanceOf[TaskSchedulerImpl]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class StandaloneDynamicAllocationSuite
val getMap =
PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
val executorIdToTaskCount = taskScheduler.invokePrivate(getMap())
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
assert(!killExecutor(sc, executors.head, force = false))
Expand Down
Loading

0 comments on commit 4aec1d2

Please sign in to comment.