Skip to content

Commit

Permalink
(redux) add K8sJobRunnerSrv, which runs Cortex jobs using Kubernetes …
Browse files Browse the repository at this point in the history
…Jobs
  • Loading branch information
jaredjennings committed Aug 5, 2022
1 parent 619a70f commit 009c997
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 3 deletions.
11 changes: 11 additions & 0 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class JobRunnerSrv @Inject() (
artifactModel: ArtifactModel,
processJobRunnerSrv: ProcessJobRunnerSrv,
dockerJobRunnerSrv: DockerJobRunnerSrv,
k8sJobRunnerSrv: K8sJobRunnerSrv,
workerSrv: WorkerSrv,
createSrv: CreateSrv,
updateSrv: UpdateSrv,
Expand All @@ -47,6 +48,7 @@ class JobRunnerSrv @Inject() (
.getOrElse(Seq("docker", "process"))
.map(_.toLowerCase)
.collect {
case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes"
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
case "process" =>
Seq("", "2", "3").foreach { pythonVersion =>
Expand All @@ -65,6 +67,7 @@ class JobRunnerSrv @Inject() (

lazy val processRunnerIsEnable: Boolean = runners.contains("process")
lazy val dockerRunnerIsEnable: Boolean = runners.contains("docker")
lazy val k8sRunnerIsEnable: Boolean = runners.contains("kubernetes")

private object deleteVisitor extends SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Expand Down Expand Up @@ -218,6 +221,14 @@ class JobRunnerSrv @Inject() (
maybeJobFolder = Some(jobFolder)
runners
.foldLeft[Option[Try[Unit]]](None) {
case (None, "kubernetes") =>
worker
.dockerImage()
.map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes)))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with kubernetes (doesn't have image)")
None
}
case (None, "docker") =>
worker
.dockerImage()
Expand Down
154 changes: 154 additions & 0 deletions app/org/thp/cortex/services/K8SJobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.thp.cortex.services

import java.util.concurrent.TimeUnit
import java.nio.file._

import scala.concurrent.duration.FiniteDuration
import scala.util.{Try}
import scala.collection.JavaConverters._

import play.api.{Configuration, Logger}

import akka.actor.ActorSystem
import io.fabric8.kubernetes.client.{DefaultKubernetesClient}
import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder}
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
import javax.inject.{Inject, Singleton}
import org.thp.cortex.models._

@Singleton
class K8sJobRunnerSrv(
client: DefaultKubernetesClient,
config: Configuration,
autoUpdate: Boolean,
jobBaseDirectory: Path,
persistentVolumeClaimName: String,
implicit val system: ActorSystem
) {

@Inject()
def this(config: Configuration, system: ActorSystem) =
this(
new DefaultKubernetesClient(),
config,
config.getOptional[Boolean]("job.kubernetes.autoUpdate").getOrElse(true),
Paths.get(config.get[String]("job.directory")),
config.get[String]("job.kubernetes.persistentVolumeClaimName"),
system: ActorSystem
)

lazy val logger = Logger(getClass)

lazy val isAvailable: Boolean =
Try {
val ver = client.getVersion()
logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}")
true
}.recover {
case error =>
logger.info(s"Kubernetes is not available", error)
false
}.get

def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration]): Try[Unit] = {
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString()
// make the default longer than likely values, but still not infinite
val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS)
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
// FIXME: this collapses case, jeopardizing the uniqueness of the
// identifier. LDH: lowercase, digits, hyphens.
val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
val kjobName = "neuron-job-" + ldh_jobid
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
.withClaimName(persistentVolumeClaimName)
.withReadOnly(false)
.build();
val kjob1 = new KJobBuilder()
.withApiVersion("batch/v1")
.withNewMetadata()
.withName(kjobName)
.withLabels(Map(
"cortex-job-id" -> job.id,
"cortex-worker-id" -> job.workerId(),
"cortex-neuron-job" -> "true").asJava)
.endMetadata()
.withNewSpec()
.withNewTemplate()
.withNewSpec()
.addNewVolume()
.withName("job-directory")
.withPersistentVolumeClaim(pvcvs)
.endVolume()
.addNewContainer()
.withName("neuron")
.withImage(dockerImage)
.withArgs("/job")
.addNewEnv()
.withName("CORTEX_JOB_FOLDER")
.withValue(relativeJobDirectory)
.endEnv();
val kjob2 = if (Files.exists(cacertsFile)) {
kjob1.addNewEnv()
.withName("REQUESTS_CA_BUNDLE")
.withValue("/job/input/cacerts")
.endEnv()
} else {
kjob1
}
val kjob3 = kjob2
.addNewVolumeMount()
.withName("job-directory")
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/input")
.withMountPath("/job/input")
.withReadOnly(true)
.endVolumeMount()
.addNewVolumeMount()
.withName("job-directory")
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/output")
.withMountPath("/job/output")
.withReadOnly(false)
.endVolumeMount()
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.endTemplate()
.endSpec()
.build();

val execution = Try {
val created_kjob = client.batch().jobs().create(kjob3)
val created_env = created_kjob
.getSpec().getTemplate().getSpec().getContainers().get(0)
.getEnv().asScala;
logger.info(
s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" +
s" timeout: ${timeout_or_default.toString}\n" +
s" image : $dockerImage\n" +
s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" +
created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString)
val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id)
.waitUntilCondition((x => Option(x).flatMap(j =>
Option(j.getStatus).flatMap(s =>
Some(s.getConditions.asScala.map(_.getType).filter(t =>
t.equals("Complete") || t.equals("Failed")).nonEmpty)))
getOrElse false),
timeout_or_default.length, timeout_or_default.unit);
if(ended_kjob != null) {
logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " +
s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}")
} else {
logger.info(s"Kubernetes Job for ${job.id} no longer exists")
}
}
// let's find the job by the attribute we know is fundamentally
// unique, rather than one constructed from it
val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete()
if(deleted) {
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
} else {
logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK")
}
execution
}
}
6 changes: 5 additions & 1 deletion app/org/thp/cortex/services/WorkerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,14 @@ class WorkerSrv @Inject() (
workerDefinitions.filter {
case w if w.command.isDefined && jobRunnerSrv.processRunnerIsEnable => true
case w if w.dockerImage.isDefined && jobRunnerSrv.dockerRunnerIsEnable => true
case w if w.dockerImage.isDefined && jobRunnerSrv.k8sRunnerIsEnable => true
case w =>
val reason =
if (w.command.isDefined) "process runner is disabled"
else if (w.dockerImage.isDefined) "Docker runner is disabled"
else if (w.dockerImage.isDefined && !jobRunnerSrv.dockerRunnerIsEnable)
"Docker runner is disabled"
else if (w.dockerImage.isDefined && !jobRunnerSrv.k8sRunnerIsEnable)
"Kubernetes runner is disabled"
else "it doesn't have image nor command"

logger.warn(s"$workerType ${w.name} is disabled because $reason")
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lazy val cortex = (project in file("."))
Dependencies.reflections,
Dependencies.zip4j,
Dependencies.dockerClient,
Dependencies.k8sClient,
Dependencies.akkaCluster,
Dependencies.akkaClusterTyped
),
Expand Down
2 changes: 1 addition & 1 deletion conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cache {

job {
timeout = 30 minutes
runners = [docker, process]
runners = [kubernetes, docker, process]
directory = ${java.io.tmpdir}
dockerDirectory = ${job.directory}
keepJobFolder = false
Expand Down
4 changes: 4 additions & 0 deletions package/docker/entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SHOW_SECRET=${show_secret:-0}
DAEMON_USER=${daemon_user:-cortex}
JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs}
DOCKER_JOB_DIRECTORY=${docker_job_directory:-}
KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-}

function usage {
cat <<- _EOF_
Expand All @@ -33,6 +34,7 @@ function usage {
--show-secret | show the generated secret
--job-directory <dir> | use this directory to store job files
--docker-job-directory <dir> | indicate the job directory in the host (not inside container)
--kubernetes-job-pvc <name> | indicate the ReadWriteMany persistent volume claim holding job directory
--analyzer-url <url> | where analyzers are located (url or path)
--responder-url <url> | where responders are located (url or path)
--start-docker | start a internal docker (inside container) to run analyzers/responders
Expand All @@ -56,6 +58,7 @@ do
"--show-secret") SHOW_SECRET=1;;
"--job-directory") shift; JOB_DIRECTORY=$1;;
"--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;;
"--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;;
"--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url"
shift; ANALYZER_URLS+=("$1");;
"--responder-path") echo "--responder-path is deprecated, please use --responder-url"
Expand Down Expand Up @@ -112,6 +115,7 @@ then

test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE"
test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE"
test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE"

function join_urls {
echo -n "\"$1\""
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ object Dependencies {
val zip4j = "net.lingala.zip4j" % "zip4j" % "2.10.0"
val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6"
val dockerClient = "com.spotify" % "docker-client" % "8.14.4"
val k8sClient = "io.fabric8" % "kubernetes-client" % "5.0.2"
val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion
val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion
}
3 changes: 2 additions & 1 deletion www/src/app/pages/analyzers/analyzers.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export default class AnalyzerService {

if (def.dockerImage && def.dockerImage !== null) {
def.runners.push('Docker');
def.runners.push('Kubernetes');
}
});

Expand Down Expand Up @@ -232,4 +233,4 @@ export default class AnalyzerService {
return this.$http.post('./api/analyzer/' + id + '/run', postData);
}
}
}
}

0 comments on commit 009c997

Please sign in to comment.