Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2067: Support Samza's running on Kubernetes #1197

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

weiqingy
Copy link
Contributor

@weiqingy weiqingy commented Oct 19, 2019

What changes were proposed in this pull request?

Make Samza jobs able to run on Kubernetes natively.

How was this patch tested?

Run Samza jobs on AKS.

@weiqingy weiqingy changed the title Support Samza's running on Kubernetes [WIP] Support Samza's running on Kubernetes Oct 19, 2019
@weiqingy weiqingy changed the title [WIP] Support Samza's running on Kubernetes [WIP] SAMZA-2067: Support Samza's running on Kubernetes Oct 19, 2019
@weiqingy weiqingy changed the title [WIP] SAMZA-2067: Support Samza's running on Kubernetes [WIP] SAMZA-2067: Support Samza to run on Kubernetes Oct 19, 2019
@weiqingy weiqingy changed the title [WIP] SAMZA-2067: Support Samza to run on Kubernetes [WIP] SAMZA-2067: Support Samza's running on Kubernetes Oct 19, 2019
@xinyuiscool xinyuiscool self-requested a review October 24, 2019 18:47
@weiqingy weiqingy changed the title [WIP] SAMZA-2067: Support Samza's running on Kubernetes SAMZA-2067: Support Samza's running on Kubernetes Nov 5, 2019
Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finish about 1/3 of it. Need to sync up to understand the design more.

@@ -413,7 +413,8 @@ public static void main(String[] args) {
//Read and parse the coordinator system config.
LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
coordinatorSystemConfig =
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
new MapConfig(SamzaObjectMapper.getObjectMapper()
.readValue(coordinatorSystemEnv.replace("\\\"", "\""), Config.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we do this replacement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RB has been updated and aligned with the master branch. The change here has gone.

@@ -93,7 +93,7 @@
* The Allocator matches requests to resources and executes processes.
*/
private final ContainerAllocator containerAllocator;
private final Thread allocatorThread;
private Thread allocatorThread = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep this final and assign to different values in the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The RB was updated, and the change here has gone.

this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization.");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do actually use containerAllocator anymore? Please walk through this part of the code with me offline.

// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lookup of specific cluster manager seems pretty hard to maintain. Can we think of a better to distinguish when we need this to do container allocation?

LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}",
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no");

if (shouldStartAllocateThread()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this if do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This If is just for logging the aliveness info "Is allocator thread alive". The RB has been updated. The aliveness info is now in the LOG.debug() in line 207 (keep the code same as the current codebase).

return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive();

boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get();
return shouldStartAllocateThread() ? shouldShutdown || !allocatorThread.isAlive() : shouldShutdown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's nicer to do a null check of the thread instead of doing shouldStartAllocateThread()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. updated.

@@ -23,34 +23,22 @@ import java.util
import java.util.concurrent.atomic.AtomicReference

import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems all the changes in this file is just to reorg imports. If that's the case, I think this file can be left untouched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RB has been updated and aligned with the master branch. The change here has gone.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three main things:

  • the integration with current ContainerAllocator is quite hard to understand. It's more like just try to fit in instead of organic part of it. I would suggest to think about whether the current ContainerAllocator can be split into two allocator, one sync and one async. The yarn can use the Async allocator, and kubernetes can use the sync one. The thread can be brought up for the async allocator.

  • error handling: there seems to be quite a few loose ends about failures, e.g. operatorPod watcher failure, create failure, delete failure. All these need to be tied up so it can provide production quality code.

  • unit tests are completely missing. I expect unit test for new classes and new methods as possible.

Minor:

  • please add more comments and javadocs too.


// the image name of samza
public static final String APP_IMAGE = "kube.app.image";
public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not put your personal id as the default config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. Updated it to "samza/samza:v0".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create samza docker hub if needed.

public static final String DEFAULT_DIRECTORY = "/opt/samza/";

// the memory and the cpu cores of container
public static final String CLUSTER_MANAGER_CONTAINER_MEM_SIZE = "cluster-manager.container.memory.mb";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems these configs should already be defined in the cluster manager config. If not, can we make sure it's shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RB was updated to remove these duplicate config definitions.

*/
public class KubeJob implements StreamJob {
private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class);
private Config config;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark all these vars final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in both KubeJob and KubeClusterResourceManager.

this.kubernetesClient = KubeClientFactory.create();
this.config = config;
this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX,
config.get(APP_NAME, "samza"), config.get(APP_ID, "1"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ApplicaitonConfig.getAppName()/getAppId().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Kubernetes related configurations
*/
public class KubeConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest following the other Config class example to add getters for common configs, like namespace, jcPodName, etc. It's easier to centralize the place where these things are created. The rest of the code should just use these getters normally instead of directly accessing the config vars. It will be even better to make those static vars private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. will update this class.

return cmdBuilder.buildCommand();
}

private CommandBuilder getCommandBuilder(String containerId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method should be shared with Yarn I believe.

}

// Construct the envs for the task container pod
private List<EnvVar> getEnvs(CommandBuilder cmdBuilder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it an static util method?

*/

// TODO: SAMZA-2369: Add a logging thread which is similar to LoggingPodStatusWatcher in Spark
public class KubePodStatusWatcher implements Watcher<Pod> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we already have a watcher in job coordinator. Any idea what this thing does? I don't see it's been used.

/**
* Convenient utility class with static methods.
*/
public class KubeUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each method, you need to put javadocs and add tests.


package org.apache.samza.job.kubernetes;

import io.fabric8.kubernetes.api.model.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure NOT use wild card imports for all the source files.

@weiqingy
Copy link
Contributor Author

weiqingy commented Nov 26, 2019

@xinyuiscool Thanks for the reviewing. I'll update the PR to fix the comments soon.

@mynameborat
Copy link
Contributor

@weiqingy Is this still being worked on?

@weiqingy
Copy link
Contributor Author

Hi @mynameborat Yes, we are actively working on it, targeting Samza 1.5.

@rohitverma02
Copy link

@weiqingy Samza 1.5 already released, any update on this release?

@weiqingy
Copy link
Contributor Author

@rohitverma02 sorry for the late reply. Will update here about the release plan later. Thanks.

@ayakkala1
Copy link

@weiqingy Is this still being worked on?

@hoprocker
Copy link

Hey all, this sounds like a great idea. Imho adding another deployment target (especially one as popular as Kubernetes) would broaden Samza's general adoption appeal. Is this still being targeted for a release, maybe 1.9 at this point?

@NoahStapp
Copy link

@weiqingy My team is very interested in this change, as we are looking to move our Samza application to Kubernetes. Is this still being actively worked on? We're interested in helping contribute to this feature, or continuing development on it if this is no longer being worked on.

@nickpan47
Copy link
Contributor

nickpan47 commented Feb 25, 2023 via email

@weiqingy
Copy link
Contributor Author

weiqingy commented Feb 25, 2023

Hey @ayakkala1 @hoprocker @NoahStapp Thanks for reviewing and commenting! I haven't got a chance to update PR against the master branch yet, and there are many new changes since the last commit of the PR. Welcome to contribute to this feature and pick it up to catch up your team's timeline.

Best regards,
Weiqing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants