-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[LIVY-702]: Submit Spark apps to Kubernetes #249
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #249 +/- ##
============================================
- Coverage 68.19% 66.66% -1.53%
- Complexity 964 982 +18
============================================
Files 104 105 +1
Lines 5952 6252 +300
Branches 900 955 +55
============================================
+ Hits 4059 4168 +109
- Misses 1314 1483 +169
- Partials 579 601 +22
Continue to review full report at Codecov.
|
private var sessionLeakageCheckInterval: Long = _ | ||
private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() | ||
|
||
private val leakedAppsGCThread = new Thread() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a GC thread here? RSCDriver will shut down itself if there's no client come in for a while. Please check this code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this GC thread collects leacked apps
in cases when Driver cannot be discovered after its submission (usually when there are not enough resources in the cluster, or some error by accident); look here.
What about RSCDriver shutdown it comes to play only if Spark Driver has been launched, and it works only for Interactive sessions.
You may also think here about Livy GC which collects expired session states, but it can and usually will be configured with bigger timeout and serves another purpose.
Does that make it more clear for you?
kubernetesDiagnostics = ArrayBuffer(e.getMessage) | ||
changeState(SparkApp.State.FAILED) | ||
} finally { | ||
listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the user expected to access the driver UI ? Without setting up the ingress and surfacing that URL, it may not be very useful. The original patch handled this and think should be part of the basic requirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = None))) |
Suggestion: It shouldn't be unset cause it hasn't been set.
As a first iteration this PR provides a way to submit and track the Spark apps by Livy, as well to integrate Interactive sessions with Notebooks (or whatever). To access Spark UI user still needs to handle access on its own so far. The easiest way I guess is to use kubectl port-forward ...
manually.
There could be multiple points of view on how is it better to compose base PR splitting or whether it even makes sense to do it, unfortunately... But would be nice to let it go at some point. I propose to create small PRs representing single aspect of the whole project each. We can merge them to bigger ones at any time, but splitting out is a bit more painful. In the meantime you can take a look at the following one : #252 .
Or we can always roll back to the original PR and just refactor it up to the acceptable state.
WDYT?
@mgaido91 Could you take a look? |
@mgaido91 ping. |
@jahstreet I am not the best guy to take a look at this honestly. I am reviewing this PR in a few hours, but would be great to have feedbacks also from other people who are more familiar with this part of Livy. cc @vanzin @jerryshao |
Ah, I see. Will try to ping them. Thanks anyway. |
@@ -399,7 +399,13 @@ class InteractiveSession( | |||
app = mockApp.orElse { | |||
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } | |||
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) | |||
driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) } | |||
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) | |||
.orElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: on the line above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, i haven't got the idea. what is nit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is the short for nitpicking
.
I meant to put .oElse {
immediately after the (
on the line above. It is just a style thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exceeds line length then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
driverProcess.map(
_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))).orElse {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, ok!
if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) { | ||
throw new IllegalArgumentException( | ||
s"Requested namespace $targetNamespace doesn't match the configured: " + | ||
s"${kubernetesNamespaces.mkString(", ")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"${kubernetesNamespaces.mkString(", ")}") | |
kubernetesNamespaces.mkString(", ")) |
apps.get(leakedApp.getKey) match { | ||
case Some(seq) => | ||
seq.foreach(app => | ||
if (withRetry(kubernetesClient.killApplication(app))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this return false? at least a warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
// kill the app if found it or remove it if exceeding a threshold | ||
val leakedApps = leakedAppTags.entrySet().iterator() | ||
val now = System.currentTimeMillis() | ||
val apps = withRetry(kubernetesClient.getApplications()).groupBy(_.getApplicationTag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see exception handling here....an exception here destroys the thread so so leakage removal works anymore after an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
} | ||
|
||
private[utils] def mapKubernetesState( | ||
kubernetesAppState: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: another indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here. could you please explain what do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add other 2 spaces indent here, like:
kubernetesAppState: String, | |
kubernetesAppState: String, |
SparkApp.State.FAILED | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this empty line
} | ||
|
||
class SparkKubernetesApp private[utils]( | ||
appTag: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent once more
process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ | ||
("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) | ||
|
||
override def kill(): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add {
and }
around methods everywhere, even when not necessary. They help readability.
if (deadline.isOverdue) { | ||
process.foreach(_.destroy()) | ||
leakedAppTags.put(appTag, System.currentTimeMillis()) | ||
throw new IllegalStateException(s"No Kubernetes application is found with tag" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalStateException(s"No Kubernetes application is found with tag" + | |
throw new IllegalStateException("No Kubernetes application is found with tag" + |
Build vailure due to Travis:
@jerryshao @vanzin could you take a look please? Your review would be really helpful to let that PR go. |
server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
Outdated
Show resolved
Hide resolved
825741b
to
b54eee9
Compare
@yiheng @arunmahadevan @mgaido91 |
e3740ae
to
865aa24
Compare
Rebased |
865aa24
to
4a6501c
Compare
Rebased to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw your email and took some time to check out your incremental changes. With my focus on the UI and Confs side of Livy, I've only reviewed the code changes to existing files (I'll leave reviews of SparkKubernetesApp.scala
and SparkKubernetesAppSpec.scala
to those with better knowledge and more time). Overall your conf related changes look good, only a small note on an added if
block.
@@ -402,6 +402,9 @@ class InteractiveSession( | |||
|
|||
if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { | |||
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) | |||
} else if (livyConf.isRunningOnKubernetes()) { | |||
// Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart | |||
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is just the same line as 404 why is it in it's own else if
block? Wouldn't it make more sense to add || livyConf.isRunningOnKubernetes()
to the if on line 403?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, nice catch, agree. EDIT: resolved
can we merge this? :) |
I would also love to! Opened for the suggestions on how to get closer to it. |
Is there a timeline when this will get integrated with Livy? This would help us run Jupyter on Spark on Kubernetes. Any ETA will be very helpful! Thanks! |
Hi @SarnathK , I've tried to contact the community multiple times via mailing lists with no luck to push this forward. |
@jerryshao do you have bandwidth to review this, I've done a partial review above, but need another pair of eyes. |
I can take a chance to review this, but I'm not an export of k8s, may not fully understand the pros and cons of the implementation. |
any estimated time frame this ticket can be merged? |
I agree with all the comments: a lot of effort that should be at least considered |
public boolean isRunningOnKubernetes() { | ||
return Optional.ofNullable(get("livy.spark.master")) | ||
.filter(s -> s.startsWith("k8s")) | ||
.isPresent(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function may always return false, since "livy.spark.master" will not get by RSCConf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, seems like that's what I've faced in #249 (comment)
However, it seems like it's not affecting functionality, as this function is used while setting RPC_SERVER_ADDRESS
here:
https://github.com/apache/incubator-livy/pull/249/files/b87c0cebb65ce7f34e6b4b6b738095be6254cf69#diff-43114318c4b009c2404f7eb326a84c184fb1501a3237c49a771df851d0f6f328R172-R178
And the value of RPC_SERVER_ADDRESS
is not used anyway since Livy 0.7 because of things I've explained in #388.
I have validated this fix in the new git branch. I found that the fix is working as expected. The detailed steps used during the validation are documented at README.md. Should we consider merging the fix? |
@askhatri - I would say that you can merge this given your review and testing. |
Happy to see things going in this PR 🎉 . Thank you a lot folks for putting the effort in reviewing and testing the change. @askhatri can I help you with anything to test the work? I'd love to pay my time to finalize the activity here and then I have something to offer on top of it to contribute with. |
Thank you, @jahstreet for offering your help in testing. During my initial testing, I found that the code is working as expected. Only one observation is that we might need to upgrade spark-on-kubernetes-helm to support Helm Chart 3.x and Kubernetes latest version 1.24 or higher. |
Looking into it, not 100% sure we can get latest K8s version, given latest Spark 3.3.2 works with Fabric8 client 5.12.2 which aims for K8s <= 1.23.13 (as per compatibility matrix). Will share my findings ... UPD: seems the latest Spark 3.4.0 already bumped the K8s client version. We are getting there folks ... |
Thank you @jahstreet |
Absolutely great work @jahstreet. We are waiting for PR with Helm 3 and k8s 1.24+ versions support. currently, we are using livy with consistent resources. we want to try this in AKS, EKS, and on-prem k8s clusters as a server-less livy. |
Hey Folks - I noticed this JIRA is listed for 0.8.0 and would like to get a sense for how far out this may be. |
Hi @lmccay , thank you for keeping eye on it. I think this PR is already battle tested and good to go. There is already a chain of work done on top of it by me and other people who left feedback on this chunk of work. If we make it a part of master I'm 100% sure I won't be the only one pushing Livy project to the world of K8s. Besides that, seeing the progress after the years of waiting would boost the motivation to continue contributions... So, including the upgrade of dependencies to support latest K8s and Spark versions, I propose to tackle those in separate PRs. How do you feel about merging it to 0.8 and is there anything formal we should do to make it happen? (resolving the merge conflicts in the meantime) |
Yeah. We've been using Livy on K8s with fixes from this PR and PR #252 for near 2 years now, in different configurations. Including different Spark versions starting from 2.4.4 and up to 3.3.1 (however some fixes where made in Livy itself to make it compatible with Spark 3.3). But in general everything works fantastic. |
@jahstreet and @idzikovsky - if we merge this before branching from 0.8.0 and it doesn't cause any issues due to other work not being there yet then I would have no problem with doing so. I'm personally not in a position to +1 the merge. Would someone that has tested it and/or tested with it inplace but not necessarily exercised be able to +1 it as a review? |
@lmccay I think we need to summon a maintainer with K8s expertise, is that something you expect? Do you know any name(s) we can put here and follow-up on that together? |
@jahstreet The implementation is great. I got couple of remarks/questions on the way it handle the logs and the ingress, the current implementation is opinionated for ngnix (for spark ui) and lokki, while both are widely adopted, how can someone with another ingress controller use their own (withouth modifying your code)? I understand the log might be a bit more challenging to provide the native UI integration, in that case maybe offer the possibility to use a sidecar for log shipping? For livy there is a block that look up for the |
Hi. Is that possible to merge this into apache-incubator-livy officially on new version "0.9.0"? I guess if it merges, we will be able to use livy on kubernetes for launching our awesome spark jobs! Thanks.. |
Adding Kubernetes support to Apache Livy is a valuable enhancement for the next release. Should we consider merging this into the master branch? |
@askhatri - yes, I think we should push for an active approval on the merge of this. We can likely mark this as an important contribution for the next release. Let's move this forward now. |
I've been following this issue since forever, as it would have been highly beneficial somewhat like 2 companies ago. |
@jesinity there is no passive obstruction against it here. |
To help with the K8s review/perspective, I know there are teams that have been using this branch in production for years (cc @jpugliesi). |
I have created a new #451 with the update includes a newer version of the Kubernetes client and adds code to display the Spark UI. CC: @jahstreet |
This pull request (PR) is the foundational PR for adding Kubernetes support in Apache Livy, originally found here (#249). This update includes a newer version of the Kubernetes client and adds code to display the Spark UI. ## Summary of the Proposed Changes This PR introduces a method to submit Spark applications to a Kubernetes cluster. The key points covered include: * Submitting batch sessions * Submitting interactive sessions * Monitoring sessions, collecting logs, and gathering diagnostic information * Restoring session monitoring after restarts * Garbage collection (GC) of created Kubernetes resources JIRA link: https://issues.apache.org/jira/browse/LIVY-702 ## How was this patch tested? * Unit Tests: The patch has been verified through comprehensive unit tests. * Manual Testing: Conducted manual testing using Kubernetes on Docker Desktop. * Environment: Helm charts. For detailed instructions on testing using Helm charts, please refer to the documentation available at https://github.com/askhatri/livycluster Co-authored-by: Asif Khatri <[email protected]> Co-authored-by: Alex Sasnouskikh <[email protected]>
This pull request (PR) is the foundational PR for adding Kubernetes support in Apache Livy, originally found here (apache#249). This update includes a newer version of the Kubernetes client and adds code to display the Spark UI. ## Summary of the Proposed Changes This PR introduces a method to submit Spark applications to a Kubernetes cluster. The key points covered include: * Submitting batch sessions * Submitting interactive sessions * Monitoring sessions, collecting logs, and gathering diagnostic information * Restoring session monitoring after restarts * Garbage collection (GC) of created Kubernetes resources JIRA link: https://issues.apache.org/jira/browse/LIVY-702 ## How was this patch tested? * Unit Tests: The patch has been verified through comprehensive unit tests. * Manual Testing: Conducted manual testing using Kubernetes on Docker Desktop. * Environment: Helm charts. For detailed instructions on testing using Helm charts, please refer to the documentation available at https://github.com/askhatri/livycluster Co-authored-by: Asif Khatri <[email protected]> Co-authored-by: Alex Sasnouskikh <[email protected]>
What changes were proposed in this pull request?
Jira
This PR is one of the PRs in the series related to the splitting of the base PR #167 to multiple PRs to ease and speed up review and merge processes.
This PR proposes a way to submit Spark apps to Kubernetes cluster. Points covered:
How was this patch tested?
Unit tests.
Manual testing with Kubernetes on Docker Desktop for Mac v2.1.0.1.
Environment - Helm charts: