Skip to content

Commit

Permalink
Support Samza on Kubernetes. Contributed by Weiqing Yang &Jian He
Browse files Browse the repository at this point in the history
  • Loading branch information
jian.h committed Oct 20, 2019
1 parent 3e3713c commit fa25467
Show file tree
Hide file tree
Showing 20 changed files with 1,152 additions and 28 deletions.
21 changes: 21 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,27 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-kubernetes_$scalaSuffix") {
apply plugin: 'java'

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}

tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "samza-kubernetes-${version}"
compression = Compression.GZIP
from(configurations.runtime) { into("lib/") }
from(configurations.archives.artifacts.files) { into("lib/") }
duplicatesStrategy 'exclude'
}
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@
failsafeVersion = "1.1.0"
jlineVersion = "3.8.2"
jnaVersion = "4.5.1"
kubernetesJavaClientVersion = "4.1.3"
}
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
3 changes: 1 addition & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Mon Oct 31 23:13:44 PDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-bin.zip
23 changes: 13 additions & 10 deletions gradlew
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/usr/bin/env sh

##############################################################################
##
Expand Down Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -154,16 +154,19 @@ if $cygwin ; then
esac
fi

# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
function splitJvmOpts() {
JVM_OPTS=("$@")
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
APP_ARGS=$(save "$@")

# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"

# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi

exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
exec "$JAVACMD" "$@"
84 changes: 84 additions & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################

@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto init

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:init
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args

:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2

:win9xME_args_slurp
if "x%~1" == "x" goto execute

set CMD_LINE_ARGS=%*

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar

@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd

:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1

:mainEnd
if "%OS%"=="Windows_NT" endlocal

:omega
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* The Allocator matches requests to resources and executes processes.
*/
private final AbstractContainerAllocator containerAllocator;
private final Thread allocatorThread;
private Thread allocatorThread = null;

// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
Expand Down Expand Up @@ -146,8 +146,9 @@ public ContainerProcessManager(Config config,
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}

this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
log.info("finished initialization of samza task manager");

}
Expand All @@ -174,10 +175,18 @@ public ContainerProcessManager(Config config,
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}

this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
log.info("finished initialization of samza task manager");
}

// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
}

public boolean shouldShutdown() {
log.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ",
state.completedContainers.get(), state.containerCount, tooManyFailedContainers ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
Expand Down Expand Up @@ -206,20 +215,24 @@ public void start() {

// Start container allocator thread
log.info("Starting the container allocator thread");
allocatorThread.start();
if (allocatorThread != null) {
allocatorThread.start();
}
}

public void stop() {
log.info("Invoked stop of the Samza container process manager");

// Shutdown allocator thread
containerAllocator.stop();
try {
allocatorThread.join();
log.info("Stopped container allocator");
} catch (InterruptedException ie) {
log.error("Allocator Thread join() threw an interrupted exception", ie);
Thread.currentThread().interrupt();
if (allocatorThread != null) {
try {
allocatorThread.join();
log.info("Stopped container allocator");
} catch (InterruptedException ie) {
log.error("Allocator Thread join() threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
}

if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ import java.util
import java.util.concurrent.atomic.AtomicReference

import org.apache.samza.Partition
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.{Config, _}
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,14 @@ class HttpServer(
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}

def getIpUrl = {
if (running) {
val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort()

new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath)
} else {
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object HttpUtil {
(exception, loop) => {
exception match {
case ioe: IOException => {
error(ioe)
warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
httpConn = getHttpConnection(url, timeout)
}
Expand Down
34 changes: 34 additions & 0 deletions samza-kubernetes/src/docker/dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.)
# You can build Samza image by:
# docker build -t dockerHubAccount/samza:versionNumber .
# Then Samza user can use the Samza image as base image to build their application image.
#

FROM ubuntu:latest

RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV PATH $PATH:$JAVA_HOME/bin

RUN mkdir -p /opt/samza
WORKDIR /opt/samza/
COPY samzaJarsFolder/ /opt/samza/
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.config;

public class KubeConfig {

// the image name of samza
public static final String APP_IMAGE = "kube.app.image";
public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0";

// The directory path inside which the log will be stored.
public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path";
public static final String K8S_API_NAMESPACE = "kube.app.namespace";
public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp";
public static final String JC_CONTAINER_NAME_PREFIX = "jc";
public static final String POD_RESTART_POLICY = "OnFailure";
public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId
public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId

// Environment variable
public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME";
public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled";
public static final String AZURE_SECRET = "kube.app.volume.azure-secret";
public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share";

private Config config;
public KubeConfig(Config config) {
this.config = config;
}

public static KubeConfig validate(Config config) throws ConfigException {
KubeConfig kc = new KubeConfig(config);
kc.validate();
return kc;
}

private void validate() throws ConfigException {
// TODO
}
}
Loading

0 comments on commit fa25467

Please sign in to comment.