Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxwellHA on zookeeper #1948

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bin/maxwell-leaders
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
set -e

base_dir="$(dirname "$0")/.."
lib_dir="$base_dir/lib"
lib_dir_development="$base_dir/target/lib"

if [ ! -e "$lib_dir" -a -e "$lib_dir_development" ]; then
lib_dir="$lib_dir_development"
CLASSPATH="$CLASSPATH:$base_dir/target/classes"
fi

CLASSPATH="$CLASSPATH:$lib_dir/*"

if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi

export LANG="en_US.UTF-8"
exec $JAVA -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp $CLASSPATH com.zendesk.maxwell.util.MaxwellLeaders "$@"
28 changes: 28 additions & 0 deletions docs/docs/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,36 @@ which can be worked around by forcing the JVM onto an ipv4 stack:
JAVA_OPTS="-Djava.net.preferIPv4Stack=true" bin/maxwell --ha --raft_member_id=B
```

# High Availabilty on Zookeeper

High availability through zookeeper

## Getting started
Prepare two or more servers to serve as the maxwell host server and a zookeeper cluster. (The maxwell host server and a zookeeper cluster can communicate.)

Example Running Scripts:

```
bin/maxwell --log_level='INFO' --user='<user>' --password='<passwd>' --host='<host>' --producer=stdout --client_id='<client_id>' --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>'
```

Run the preceding command on each maxwell host.

Get which host is the leader script Example:
```
bin/maxwell-leaders --ha='zookeeper' --zookeeper_server ='<host1:port>,<host2:port>,<host3:port>' --client_id='<client_id>'
```
You can get:
```
[INFO] MaxwellLeaders: clientID:<clientID>:leaders now are -> <leader host>
```

## Getting deeper
If a timeout error occurs between the maxwell host and the zookeeper cluster or the connection is abnormal due to network instability, you can set the following parameters:
```
--zookeeper_session_timeout_ms=<session timeout duration>
--zookeeper_connection_timeout_ms=<internal default wait time for the client to establish a connection with the zk>
--zookeeper_max_retries=<number of retries>
--zookeeper_retry_wait_ms=<retry time interval>
```

21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,27 @@
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<!--zookeeper as ha-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>

</dependencies>

Expand Down
16 changes: 13 additions & 3 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
import com.zendesk.maxwell.util.Logging;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -331,9 +332,18 @@ public void run() {

LOGGER.info("Starting Maxwell. maxMemory: " + Runtime.getRuntime().maxMemory() + " bufferMemoryUsage: " + config.bufferMemoryUsage);

if ( config.haMode ) {
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {
if ( null != config.haMode){
if ( "jgroups-raft".equals(config.haMode)){
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHAJGroups();
} else if ( "zookeeper".equals(config.haMode)){
if( StringUtils.isBlank(config.zookeeperServer)){
throw new Exception("In high availability mode 'zookeeperServer' does not allow Null. --zookeeper_server = " + config.zookeeperServer);
}
new MaxwellHA(maxwell, config.zookeeperServer, config.zookeeperSessionTimeoutMs, config.zookeeperConnectionTimeoutMs, config.zookeeperMaxRetries, config.zookeeperRetryWaitMs, config.clientID).startHAZookeeper();
} else {
throw new Exception("The value of ha is not in (jgroups-raft,zookeeper). ha = " + config.haMode);
}
} else{
maxwell.start();
}
} catch ( SQLException e ) {
Expand Down
52 changes: 47 additions & 5 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,9 @@ public class MaxwellConfig extends AbstractConfig {
public Scripting scripting;

/**
* Enable high available support (via jgroups-raft)
* Enable high available support (via jgroups-raft or zookeeper)
*/
public boolean haMode;
public String haMode;

/**
* Path to raft.xml file that configures high availability support
Expand All @@ -629,6 +629,32 @@ public class MaxwellConfig extends AbstractConfig {
*/
public int binlogEventQueueSize;

/**
* HA zookeeper address
*/
public String zookeeperServer;

/**
* session time
*/
public int zookeeperSessionTimeoutMs;

/**
* connection time
*/
public int zookeeperConnectionTimeoutMs;

/**
* maxRetries
*/
public int zookeeperMaxRetries;

/**
* retryWaitMs
*/
public int zookeeperRetryWaitMs;


/**
* Build a default configuration object.
*/
Expand Down Expand Up @@ -741,12 +767,22 @@ protected MaxwellOptionParser buildOptionParser() {
.withRequiredArg();
parser.separator();

parser.accepts( "ha", "enable high-availability mode via jgroups-raft" )
.withOptionalArg().ofType(Boolean.class);
parser.accepts( "ha", "enable high-availability mode via jgroups-raft or zookeeper" )
.withOptionalArg();
parser.accepts( "jgroups_config", "location of jgroups xml configuration file" )
.withRequiredArg();
parser.accepts( "raft_member_id", "raft memberID. (may also be specified in raft.xml)" )
.withRequiredArg();
parser.accepts("zookeeper_server","enable maxwell High Availability using zookeeper")
.withRequiredArg();
parser.accepts("zookeeper_session_timeout_ms","session timeout duration (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_connection_timeout_ms","connection timeout duration (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_max_retries","maximum retry (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
parser.accepts("zookeeper_retry_wait_ms","initial retry wait time (maxwellHA on zk)")
.withRequiredArg().ofType(Integer.class);
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved

parser.separator();

Expand Down Expand Up @@ -1206,11 +1242,17 @@ private void setup(OptionSet options, Properties properties) {

setupEncryptionOptions(options, properties);

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.haMode = fetchStringOption("ha", options, properties, null);
this.jgroupsConf = fetchStringOption("jgroups_config", options, properties, "raft.xml");
this.raftMemberID = fetchStringOption("raft_member_id", options, properties, null);
this.replicationReconnectionRetries = fetchIntegerOption("replication_reconnection_retries", options, properties, 1);

this.zookeeperServer = fetchStringOption("zookeeper_server", options, properties, null);
this.zookeeperSessionTimeoutMs = fetchIntegerOption("zookeeper_session_timeout_ms", options, properties, 6000);
this.zookeeperConnectionTimeoutMs = fetchIntegerOption("zookeeper_connection_timeout_ms", options, properties, 6000);
this.zookeeperMaxRetries = fetchIntegerOption("zookeeper_max_retries", options, properties, 3);
this.zookeeperRetryWaitMs = fetchIntegerOption("zookeeper_retry_wait_ms", options, properties, 1000);

this.binlogEventQueueSize = fetchIntegerOption("binlog_event_queue_size", options, properties, BinlogConnectorReplicator.BINLOG_QUEUE_SIZE);
}

Expand Down
79 changes: 76 additions & 3 deletions src/main/java/com/zendesk/maxwell/MaxwellHA.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.zendesk.maxwell;

import com.zendesk.maxwell.util.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.jgroups.JChannel;
import org.jgroups.protocols.raft.Role;
import org.jgroups.raft.RaftHandle;
Expand All @@ -9,13 +13,15 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Class that joins a jgroups-raft cluster of servers
* Class that joins a jgroups-raft cluster of servers or zookeeper
*/
public class MaxwellHA {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellHA.class);

private final Maxwell maxwell;
private final String jgroupsConf, raftMemberID, clientID;
private String jgroupsConf, raftMemberID, clientID;
private String zookeeperServer;
private int sessionTimeoutMs, connectionTimeoutMs, maxRetries, baseSleepTimeMs;
private boolean hasRun = false;
private AtomicBoolean isRaftLeader = new AtomicBoolean(false);

Expand All @@ -33,6 +39,26 @@ public MaxwellHA(Maxwell maxwell, String jgroupsConf, String raftMemberID, Strin
this.clientID = clientID;
}

/**
* Build a MaxwellHA object
* @param maxwell The Maxwell instance that will be run when an election is won
* @param zookeeperServer zookeeper adds
* @param sessionTimeoutMs
* @param connectionTimeoutMs
* @param maxRetries
* @param baseSleepTimeMs
* @param clientID The maxwell clientID. This will be the only one through which the actual path is stored
*/
public MaxwellHA(Maxwell maxwell, String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs, int maxRetries, int baseSleepTimeMs, String clientID) {
this.maxwell = maxwell;
this.zookeeperServer = zookeeperServer;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.maxRetries = maxRetries;
this.baseSleepTimeMs = baseSleepTimeMs;
this.clientID = clientID;
}

private void run() {
try {
if (hasRun)
Expand All @@ -53,7 +79,7 @@ private void run() {
* Does not return.
* @throws Exception if there's any issues
*/
public void startHA() throws Exception {
public void startHAJGroups() throws Exception {
JChannel ch=new JChannel(jgroupsConf);
RaftHandle handle=new RaftHandle(ch, null);
if ( raftMemberID != null )
Expand Down Expand Up @@ -83,4 +109,51 @@ public void startHA() throws Exception {

Thread.sleep(Long.MAX_VALUE);
}

/**
* indicates that Ha is started in zookeeper mode
* @throws Exception
*/
public void startHAZookeeper() throws Exception {
String electPath = "/" + clientID + "/services";
String masterPath = "/" + clientID + "/leader";
CuratorUtils cu = new CuratorUtils();
cu.setZookeeperServer(zookeeperServer);
cu.setSessionTimeoutMs(sessionTimeoutMs);
cu.setConnectionTimeoutMs(connectionTimeoutMs);
cu.setMaxRetries(maxRetries);
cu.setBaseSleepTimeMs(baseSleepTimeMs);
cu.setClientId(clientID);
cu.setElectPath(electPath);
cu.setMasterPath(masterPath);
cu.init();
CuratorFramework client = cu.getClient();
LeaderLatch leader = new LeaderLatch(client, cu.getElectPath());
leader.start();
LOGGER.info("this node is participating in the election of the leader ....");
leader.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
try {
cu.register();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("The node registration is abnormal, check whether the maxwell host communicates properly with the zookeeper network");
cu.stop();
System.exit(1);
}
LOGGER.info("node is current leader, starting Maxwell....");
run();
cu.stop();
}

@Override
public void notLeader() {
//LeaderLatch.CloseMode.SILENT mode will not invoke this method
wanghangyu817 marked this conversation as resolved.
Show resolved Hide resolved
}
});

Thread.sleep(Long.MAX_VALUE);
}

}
Loading