Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
change ClusterManager into Builder Pattern & tinker a little at code
Browse files Browse the repository at this point in the history
style
  • Loading branch information
huangwei5 committed Jan 18, 2019
1 parent cd1a033 commit 36b4e3a
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 86 deletions.
43 changes: 21 additions & 22 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,32 @@ public static Cluster createCluster(Properties config) throws IllegalArgumentExc

int asyncWorkers = Integer.parseInt(config.getProperty(
PEGASUS_ASYNC_WORKERS_KEY, PEGASUS_ASYNC_WORKERS_DEF));
ClusterManager.Builder builder = new ClusterManager.Builder(operatorTimeout, asyncWorkers, address);

boolean enablePerfCounter = Boolean.parseBoolean(config.getProperty(
PEGASUS_ENABLE_PERF_COUNTER_KEY, PEGASUS_ENABLE_PERF_COUNTER_VALUE));
String perfCounterTags = enablePerfCounter ? config.getProperty(
PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF) : null;
int pushIntervalSecs = Integer.parseInt(config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY,
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF
));
if (enablePerfCounter) {
String perfCounterTags = config.getProperty(
PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF);
int pushIntervalSecs = Integer.parseInt(config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY,
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF
));
builder.enableCounter(perfCounterTags, pushIntervalSecs);
}

boolean needAuth = Boolean.parseBoolean(config.getProperty(PEGASUS_OPEN_AUTH_KEY, PEGASUS_OPEN_AUTH_DEF));
String serviceName = needAuth ? config.getProperty(PEGASUS_SERVICE_NAME_KEY, PEGASUS_SERVICE_NAME_DEF) : null;
String serviceFqdn = needAuth ? config.getProperty(PEGASUS_SERVICE_FQDN_KEY, PEGASUS_SERVICE_FQDN_DEF) : null;
String jaasConfPath = needAuth ? config.getProperty(PEGASUS_JAAS_CONF_KEY, PEGASUS_JAAS_CONF_DEF) : null;
if (jaasConfPath != null) {
System.setProperty("java.security.auth.login.config", jaasConfPath);
}

return new ClusterManager(
operatorTimeout,
asyncWorkers,
enablePerfCounter,
perfCounterTags,
pushIntervalSecs,
address,
needAuth,
serviceName,
serviceFqdn);
if (needAuth) {
String serviceName = config.getProperty(PEGASUS_SERVICE_NAME_KEY, PEGASUS_SERVICE_NAME_DEF);
String serviceFqdn = config.getProperty(PEGASUS_SERVICE_FQDN_KEY, PEGASUS_SERVICE_FQDN_DEF);
String jaasConfPath = config.getProperty(PEGASUS_JAAS_CONF_KEY, PEGASUS_JAAS_CONF_DEF);
if (jaasConfPath != null) {
System.setProperty("java.security.auth.login.config", jaasConfPath);
}
builder.openAuth(serviceName, serviceFqdn);
}
return builder.build();
}

public abstract String[] getMetaList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,54 @@ public class ClusterManager extends Cluster {
logger.info("operating system name: {}", osName);
}

public ClusterManager(
int timeout,
int io_threads,
boolean enableCounter,
String perfCounterTags,
int pushIntervalSecs,
String[] address_list,
boolean openAuth,
String serviceName,
String serviceFqdn)
throws IllegalArgumentException {
setTimeout(timeout);
this.enableCounter = enableCounter;
if (enableCounter) {
MetricsManager.detectHostAndInit(perfCounterTags, pushIntervalSecs);
public static class Builder {
private int timeout;
private int io_threads;
private boolean enableCounter;
private String perfCounterTags;
private int pushIntervalSecs;
private String[] address_list;
private boolean openAuth;
private String serviceName;
private String serviceFqdn;

public Builder(int timeout, int io_threads, String[] address_list
) {
this.timeout = timeout;
this.io_threads = io_threads;
this.address_list = address_list;
}

public Builder enableCounter(String perfCounterTags, int pushIntervalSecs) {
this.enableCounter = true;
this.perfCounterTags = perfCounterTags;
this.pushIntervalSecs = pushIntervalSecs;
return this;
}

if (openAuth) {
logger.info("open authentication");
this.openAuth = openAuth;
public Builder openAuth(String serviceName, String serviceFqdn) {
this.openAuth = true;
this.serviceName = serviceName;
this.serviceFqdn = serviceFqdn;
return this;
}

public ClusterManager build() {
return new ClusterManager(this);
}
}

public ClusterManager(Builder builder) {
setTimeout(builder.timeout);
this.enableCounter = builder.enableCounter;
if (enableCounter) {
MetricsManager.detectHostAndInit(builder.perfCounterTags, builder.pushIntervalSecs);
}

if (builder.openAuth) {
this.openAuth = true;
this.serviceName = builder.serviceName;
this.serviceFqdn = builder.serviceFqdn;

String jaasConf = System.getProperties().getProperty("java.security.auth.login.config");
if (jaasConf == null) {
Expand Down Expand Up @@ -108,34 +134,14 @@ public ClusterManager(
}

replicaSessions = new ConcurrentHashMap<rpc_address, ReplicaSession>();
replicaGroup = getEventLoopGroupInstance(io_threads);
replicaGroup = getEventLoopGroupInstance(builder.io_threads);
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);

metaList = address_list;
metaList = builder.address_list;
// the constructor of meta session is depend on the replicaSessions,
// so the replicaSessions should be initialized earlier
metaSession = new MetaSession(this, address_list, timeout, 10, metaGroup);
}

public ClusterManager(
int timeout,
int io_threads,
boolean enableCounter,
String perfCounterTags,
int pushIntervalSecs,
String[] address_list)
throws IllegalArgumentException {
this(
timeout,
io_threads,
enableCounter,
perfCounterTags,
pushIntervalSecs,
address_list,
false,
null,
null);
metaSession = new MetaSession(this, builder.address_list, builder.timeout, 10, metaGroup);
}

public EventExecutor getExecutor(String name, int threadCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ private void startNegotiation() {
negotiation_message msg = new negotiation_message();
msg.status = negotiation_status.SASL_LIST_MECHANISMS;

sendNegoMsg(msg);

logger.info("{}: start negotiation", name());
}

private void sendNegoMsg(negotiation_message msg) {
final RequestEntry entry = new ReplicaSession.RequestEntry();
entry.sequenceId = seqId.getAndIncrement();
entry.op = new negotiate_operator(msg);
Expand All @@ -355,7 +361,6 @@ private void startNegotiation() {
pendingResponse.put(new Integer(entry.sequenceId), entry);

write(entry, fields);
logger.info("Start negotiation");
}

private class Action implements PrivilegedExceptionAction {
Expand Down Expand Up @@ -451,14 +456,7 @@ public Object run() throws Exception {
throw new Exception("Received an unknown response, status " + resp.status);
}

final RequestEntry entry = new ReplicaSession.RequestEntry();
entry.sequenceId = seqId.getAndIncrement();
entry.op = new negotiate_operator(msg);
entry.callback = new SaslRecvHandler((negotiate_operator) entry.op);
pendingResponse.put(new Integer(entry.sequenceId), entry);
entry.timeoutTask = addTimer(entry.sequenceId, 1000);

write(entry, fields);
sendNegoMsg(msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public void testGetReplicaSession() throws Exception {

ClusterManager testManager;
if (isOpenAuth()) {
testManager = new ClusterManager(1000, 1, false, null, 60, address_list, true, "xxxx", "xxxx");
testManager = new ClusterManager.Builder(1000, 1, address_list).openAuth("xxxx", "xxxx").build();
} else {
testManager = new ClusterManager(1000, 1, false, null, 60, address_list);
testManager = new ClusterManager.Builder(1000, 1, address_list).build();
}
// input an invalid rpc address
rpc_address address = new rpc_address();
Expand All @@ -63,9 +63,9 @@ public void testOpenTable() throws Exception {
String[] addr_list = {"127.0.0.1:123", "127.0.0.1:124", "127.0.0.1:125"};
ClusterManager testManager;
if (isOpenAuth()) {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list, true, "xxxx", "xxxx");
testManager = new ClusterManager.Builder(1000, 1, addr_list).openAuth("xxxx", "xxxx").build();
} else {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list);
testManager = new ClusterManager.Builder(1000, 1, addr_list).build();
}

TableHandler result = null;
Expand All @@ -81,9 +81,9 @@ public void testOpenTable() throws Exception {
// test partially invalid meta list
String[] addr_list2 = {"127.0.0.1:123", "127.0.0.1:34603", "127.0.0.1:34601", "127.0.0.1:34602"};
if (isOpenAuth()) {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list2, true, "xxxx", "xxxx");
testManager = new ClusterManager.Builder(1000, 1, addr_list2).openAuth("xxxx", "xxxx").build();
} else {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list2);
testManager = new ClusterManager.Builder(1000, 1, addr_list2).build();
}
try {
result = testManager.openTable("hehe", KeyHasher.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public void testConnect() throws Exception {
String[] addr_list = {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"};
ClusterManager manager;
if (isOpenAuth()) {
manager = new ClusterManager(1000, 4, false, null, 60, addr_list, true, "xxxx", "xxxx");
manager = new ClusterManager.Builder(1000, 4, addr_list).openAuth("xxxx", "xxxx").build();
} else {
manager = new ClusterManager(1000, 4, false, null, 60, addr_list);
manager = new ClusterManager.Builder(1000, 4, addr_list).build();
}
MetaSession session = manager.getMetaSession();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public void before() throws Exception {
String prop = System.getProperties().getProperty("test.open.auth");
boolean openAuth = (prop != null) ? Boolean.valueOf(prop) : false;
if (openAuth) {
manager = new ClusterManager(1000, 1, false, null, 60, metaList, true, "xxxx", "xxxx");
manager = new ClusterManager.Builder(1000, 1, metaList).openAuth("xxxx", "xxxx").build();
} else {
manager = new ClusterManager(1000, 1, false, null, 60, metaList);
manager = new ClusterManager.Builder(1000, 1, metaList).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public void before() throws Exception {
String prop = System.getProperties().getProperty("test.open.auth");
boolean openAuth = (prop != null) ? Boolean.valueOf(prop) : false;
if (openAuth) {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list, true, "xxxx", "xxxx");
testManager = new ClusterManager.Builder(1000, 1, addr_list).openAuth("xxxx", "xxxx").build();
} else {
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list);
testManager = new ClusterManager.Builder(1000, 1, addr_list).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void timeoutChecker() {
boolean openAuth = (prop != null) ? Boolean.valueOf(prop) : false;
ClusterManager manager;
if (openAuth) {
manager = new ClusterManager(1000, 1, false, null, 60, metaList, true, "xxxx", "xxxx");
manager = new ClusterManager.Builder(1000, 1, metaList).openAuth("xxxx", "xxxx").build();
} else {
manager = new ClusterManager(1000, 1, false, null, 60, metaList);
manager = new ClusterManager.Builder(1000, 1, metaList).build();
}

TableHandler handle;
Expand Down

0 comments on commit 36b4e3a

Please sign in to comment.