Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kubernetes job runner (#347) #349

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class JobRunnerSrv @Inject() (
artifactModel: ArtifactModel,
processJobRunnerSrv: ProcessJobRunnerSrv,
dockerJobRunnerSrv: DockerJobRunnerSrv,
k8sJobRunnerSrv: K8sJobRunnerSrv,
workerSrv: WorkerSrv,
createSrv: CreateSrv,
updateSrv: UpdateSrv,
Expand All @@ -47,6 +48,7 @@ class JobRunnerSrv @Inject() (
.getOrElse(Seq("docker", "process"))
.map(_.toLowerCase)
.collect {
case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes"
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
case "process" =>
Seq("", "2", "3").foreach { pythonVersion =>
Expand All @@ -65,6 +67,7 @@ class JobRunnerSrv @Inject() (

lazy val processRunnerIsEnable: Boolean = runners.contains("process")
lazy val dockerRunnerIsEnable: Boolean = runners.contains("docker")
lazy val k8sRunnerIsEnable: Boolean = runners.contains("kubernetes")

private object deleteVisitor extends SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Expand Down Expand Up @@ -218,6 +221,14 @@ class JobRunnerSrv @Inject() (
maybeJobFolder = Some(jobFolder)
runners
.foldLeft[Option[Try[Unit]]](None) {
case (None, "kubernetes") =>
worker
.dockerImage()
.map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes)))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with kubernetes (doesn't have image)")
None
}
case (None, "docker") =>
worker
.dockerImage()
Expand Down
154 changes: 154 additions & 0 deletions app/org/thp/cortex/services/K8SJobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.thp.cortex.services

import java.util.concurrent.TimeUnit
import java.nio.file._

import scala.concurrent.duration.FiniteDuration
import scala.util.{Try}
import scala.collection.JavaConverters._

import play.api.{Configuration, Logger}

import akka.actor.ActorSystem
import io.fabric8.kubernetes.client.{DefaultKubernetesClient}
import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder}
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
import javax.inject.{Inject, Singleton}
import org.thp.cortex.models._

@Singleton
class K8sJobRunnerSrv(
client: DefaultKubernetesClient,
config: Configuration,
autoUpdate: Boolean,
jobBaseDirectory: Path,
persistentVolumeClaimName: String,
implicit val system: ActorSystem
) {

@Inject()
def this(config: Configuration, system: ActorSystem) =
this(
new DefaultKubernetesClient(),
config,
config.getOptional[Boolean]("job.kubernetes.autoUpdate").getOrElse(true),
Paths.get(config.get[String]("job.directory")),
config.get[String]("job.kubernetes.persistentVolumeClaimName"),
system: ActorSystem
)

lazy val logger = Logger(getClass)

lazy val isAvailable: Boolean =
Try {
val ver = client.getVersion()
logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}")
true
}.recover {
case error =>
logger.info(s"Kubernetes is not available", error)
false
}.get

def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration]): Try[Unit] = {
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString()
// make the default longer than likely values, but still not infinite
val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS)
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
// FIXME: this collapses case, jeopardizing the uniqueness of the
// identifier. LDH: lowercase, digits, hyphens.
val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
val kjobName = "neuron-job-" + ldh_jobid
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
.withClaimName(persistentVolumeClaimName)
.withReadOnly(false)
.build();
val kjob1 = new KJobBuilder()
.withApiVersion("batch/v1")
.withNewMetadata()
.withName(kjobName)
.withLabels(Map(
"cortex-job-id" -> job.id,
"cortex-worker-id" -> job.workerId(),
"cortex-neuron-job" -> "true").asJava)
.endMetadata()
.withNewSpec()
.withNewTemplate()
.withNewSpec()
.addNewVolume()
.withName("job-directory")
.withPersistentVolumeClaim(pvcvs)
.endVolume()
.addNewContainer()
.withName("neuron")
.withImage(dockerImage)
.withArgs("/job")
.addNewEnv()
.withName("CORTEX_JOB_FOLDER")
.withValue(relativeJobDirectory)
.endEnv();
val kjob2 = if (Files.exists(cacertsFile)) {
kjob1.addNewEnv()
.withName("REQUESTS_CA_BUNDLE")
.withValue("/job/input/cacerts")
.endEnv()
} else {
kjob1
}
val kjob3 = kjob2
.addNewVolumeMount()
.withName("job-directory")
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/input")
.withMountPath("/job/input")
.withReadOnly(true)
.endVolumeMount()
.addNewVolumeMount()
.withName("job-directory")
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/output")
.withMountPath("/job/output")
.withReadOnly(false)
.endVolumeMount()
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.endTemplate()
.endSpec()
.build();

val execution = Try {
val created_kjob = client.batch().jobs().create(kjob3)
val created_env = created_kjob
.getSpec().getTemplate().getSpec().getContainers().get(0)
.getEnv().asScala;
logger.info(
s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" +
s" timeout: ${timeout_or_default.toString}\n" +
s" image : $dockerImage\n" +
s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" +
created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString)
val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id)
.waitUntilCondition((x => Option(x).flatMap(j =>
Option(j.getStatus).flatMap(s =>
Some(s.getConditions.asScala.map(_.getType).filter(t =>
t.equals("Complete") || t.equals("Failed")).nonEmpty)))
getOrElse false),
timeout_or_default.length, timeout_or_default.unit);
if(ended_kjob != null) {
logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " +
s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}")
} else {
logger.info(s"Kubernetes Job for ${job.id} no longer exists")
}
}
// let's find the job by the attribute we know is fundamentally
// unique, rather than one constructed from it
val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete()
if(deleted) {
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
} else {
logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK")
}
execution
}
}
6 changes: 5 additions & 1 deletion app/org/thp/cortex/services/WorkerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,14 @@ class WorkerSrv @Inject() (
workerDefinitions.filter {
case w if w.command.isDefined && jobRunnerSrv.processRunnerIsEnable => true
case w if w.dockerImage.isDefined && jobRunnerSrv.dockerRunnerIsEnable => true
case w if w.dockerImage.isDefined && jobRunnerSrv.k8sRunnerIsEnable => true
case w =>
val reason =
if (w.command.isDefined) "process runner is disabled"
else if (w.dockerImage.isDefined) "Docker runner is disabled"
else if (w.dockerImage.isDefined && !jobRunnerSrv.dockerRunnerIsEnable)
"Docker runner is disabled"
else if (w.dockerImage.isDefined && !jobRunnerSrv.k8sRunnerIsEnable)
"Kubernetes runner is disabled"
else "it doesn't have image nor command"

logger.warn(s"$workerType ${w.name} is disabled because $reason")
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy val cortex = (project in file("."))
Dependencies.reflections,
Dependencies.zip4j,
Dependencies.dockerClient,
Dependencies.k8sClient,
Dependencies.akkaCluster,
Dependencies.akkaClusterTyped
),
Expand Down
2 changes: 1 addition & 1 deletion conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cache {

job {
timeout = 30 minutes
runners = [docker, process]
runners = [kubernetes, docker, process]
directory = ${java.io.tmpdir}
dockerDirectory = ${job.directory}
keepJobFolder = false
Expand Down
4 changes: 4 additions & 0 deletions package/docker/entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SHOW_SECRET=${show_secret:-0}
DAEMON_USER=${daemon_user:-cortex}
JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs}
DOCKER_JOB_DIRECTORY=${docker_job_directory:-}
KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-}

function usage {
cat <<- _EOF_
Expand All @@ -33,6 +34,7 @@ function usage {
--show-secret | show the generated secret
--job-directory <dir> | use this directory to store job files
--docker-job-directory <dir> | indicate the job directory in the host (not inside container)
--kubernetes-job-pvc <name> | indicate the ReadWriteMany persistent volume claim holding job directory
--analyzer-url <url> | where analyzers are located (url or path)
--responder-url <url> | where responders are located (url or path)
--start-docker | start a internal docker (inside container) to run analyzers/responders
Expand All @@ -56,6 +58,7 @@ do
"--show-secret") SHOW_SECRET=1;;
"--job-directory") shift; JOB_DIRECTORY=$1;;
"--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;;
"--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;;
"--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url"
shift; ANALYZER_URLS+=("$1");;
"--responder-path") echo "--responder-path is deprecated, please use --responder-url"
Expand Down Expand Up @@ -112,6 +115,7 @@ then

test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE"
test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE"
test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE"

function join_urls {
echo -n "\"$1\""
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object Dependencies {
val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5"
val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6"
val dockerClient = "com.spotify" % "docker-client" % "8.16.0"
val k8sClient = "io.fabric8" % "kubernetes-client" % "5.0.2"
val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion
val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion
}
3 changes: 2 additions & 1 deletion www/src/app/pages/analyzers/analyzers.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export default class AnalyzerService {

if (def.dockerImage && def.dockerImage !== null) {
def.runners.push('Docker');
def.runners.push('Kubernetes');
}
});

Expand Down Expand Up @@ -232,4 +233,4 @@ export default class AnalyzerService {
return this.$http.post('./api/analyzer/' + id + '/run', postData);
}
}
}
}