diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index fea4cee4..167ad465 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -5,6 +5,6 @@ enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 open_auth = false -jaas_conf = configuration/jaas.conf +jaas_conf = configuration/pegasus_jaas.conf service_name = xxx service_fqdn = xxx diff --git a/configuration/jaas.conf b/configuration/pegasus_jaas.conf similarity index 100% rename from configuration/jaas.conf rename to configuration/pegasus_jaas.conf diff --git a/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java b/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java index 684b3516..880efc48 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java +++ b/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java @@ -108,6 +108,10 @@ public enum error_types { ERR_MOCK_INTERNAL, ERR_ZOOKEEPER_OPERATION, + ERR_AUTH_NEGO_FAILED, + + ERR_UNAUTHENTICATED, + ERR_ACL_DENY, //ERROR_CODE defined by client ERR_SESSION_RESET, }; diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java index 1d8821ec..2f50123c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java @@ -12,7 +12,7 @@ public class negotiate_operator extends client_operator { public negotiate_operator(negotiation_message request) { - super(new gpid(), ""); // TODO HW gpid and tableName needless + super(new gpid(), ""); this.request = request; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index 7428918e..a0c14e33 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -39,7 +39,7 @@ public abstract class Cluster { public static final String PEGASUS_OPEN_AUTH_DEF = "false"; public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf"; - public static final String PEGASUS_JAAS_CONF_DEF = "configuration/jaas.conf"; + public static final String PEGASUS_JAAS_CONF_DEF = "configuration/pegasus_jaas.conf"; public static Cluster createCluster(Properties config) throws IllegalArgumentException { int operatorTimeout = Integer.parseInt(config.getProperty( diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java index fc9d7ef8..ec51ec73 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java @@ -30,8 +30,7 @@ public MetaSession(ClusterManager manager, String addrList[], if (rpc_addr.fromString(addr)) { logger.info("add {} as meta server", addr); metaList.add(clusterManager.getReplicaSession(rpc_addr)); - } - else { + } else { logger.error("invalid address {}", addr); } } @@ -134,8 +133,7 @@ else if (metaError == error_types.ERR_FORWARD_TO_OTHERS) { needDelay = false; needSwitchLeader = true; forwardAddress = getMetaServiceForwardAddress(op); - } - else { + } else { round.callbackFunc.run(); return; } @@ -143,9 +141,8 @@ else if (metaError == error_types.ERR_FORWARD_TO_OTHERS) { else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.errno == error_types.ERR_TIMEOUT) { needDelay = false; needSwitchLeader = true; - } - else { - logger.error("unknown error: {}", op.rpc_error.errno.toString()); + } else { + logger.error(op.rpc_error.errno == error_types.ERR_UNAUTHENTICATED ? "{}" : "unknown error: {}", op.rpc_error.errno.toString()); round.callbackFunc.run(); return; } @@ -176,8 +173,7 @@ else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.err metaList.add(clusterManager.getReplicaSession(forwardAddress)); curLeader = metaList.size() - 1; } - } - else if (metaList.get(curLeader) == round.lastSession) { + } else if (metaList.get(curLeader) == round.lastSession) { curLeader = (curLeader + 1) % metaList.size(); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 17b11a9b..ba4857fe 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -97,7 +97,7 @@ public void initChannel(SocketChannel ch) { } public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean openAuth, Subject subject, String serviceName, String serviceFqdn, MessageResponseFilter filter) { - this(address, rpcGroup, socketTimeout, true, subject, serviceName, serviceFqdn); + this(address, rpcGroup, socketTimeout, openAuth, subject, serviceName, serviceFqdn); this.filter = filter; } @@ -211,15 +211,21 @@ private void markSessionConnected(Channel activeChannel) { } private void markSessionNegotiation(Channel activeChannel) { + logger.info("{}: mark session state negotiation"); VolatileFields newCache = new VolatileFields(); newCache.state = ConnState.NEGOTIATION; newCache.nettyChannel = activeChannel; fields = newCache; - logger.info("{}: mark session state negotiation, now negotiate", name()); - startNegotiation(); + if (needAuthConnection()) { + // version + auth, now only support auth + startNegotiation(); + } else { + logger.info("{}: mark session state connected"); + markSessionConnected(activeChannel); + } } - private void markSessionDisconnect() { + private void markSessionDisconnect(error_types errorType) { VolatileFields cache = fields; synchronized (pendingSend) { if (cache.state != ConnState.DISCONNECTED) { @@ -231,14 +237,14 @@ private void markSessionDisconnect() { // this. In this case, we are relying on the timeout task. while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } List l = new LinkedList(); for (Map.Entry entry : pendingResponse.entrySet()) { l.add(entry.getValue()); } for (RequestEntry e : l) { - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } cache = new VolatileFields(); @@ -251,10 +257,17 @@ private void markSessionDisconnect() { } } - private void tryNotifyWithSequenceID( - int seqID, - error_types errno, - boolean isTimeoutTask) { + // for netty event, reflect status of the session + private void markSessionDisconnect() { + markSessionDisconnect(error_types.ERR_SESSION_RESET); + } + + // for handling logic, initiative to disconnect the session. However, the only thing to do is 'markSessionDisconnect' + private void disconnectCurrentSession(error_types errorType) { + markSessionDisconnect(errorType); + } + + private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug("{}: {} is notified with error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); RequestEntry entry = pendingResponse.remove(new Integer(seqID)); @@ -306,12 +319,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel {} for session {} is active", ctx.channel().toString(), name()); - if (needAuthConnection() && !isAuthed()) { - logger.info("Session {} needs auth", name()); - markSessionNegotiation(ctx.channel()); - } else { - markSessionConnected(ctx.channel()); - } + markSessionNegotiation(ctx.channel()); } @Override @@ -339,17 +347,11 @@ private boolean needAuthConnection() { return openAuth; } - private boolean isAuthed() { - return negoStatus == negotiation_status.SASL_SUCC; - } - private void startNegotiation() { + logger.info("{}: start auth negotiation", name()); negotiation_message msg = new negotiation_message(); msg.status = negotiation_status.SASL_LIST_MECHANISMS; - sendNegoMsg(msg); - - logger.info("{}: start negotiation", name()); } private void sendNegoMsg(negotiation_message msg) { @@ -383,8 +385,8 @@ public void run() { if (op.rpc_error.errno != error_types.ERR_OK) throw new ReplicationException(op.rpc_error.errno); handleResp(); } catch (Exception e) { - // e.printStackTrace(); logger.error(e.toString()); + disconnectCurrentSession(error_types.ERR_AUTH_NEGO_FAILED); } } @@ -399,7 +401,8 @@ private void handleResp() throws Exception { final negotiation_message msg = new negotiation_message(); switch (resp.status) { case INVALID: - throw new Exception("Received a response which status is INVALID"); + case SASL_AUTH_FAIL: + throw new Exception("Received a response which status is " + resp.status + ", break off this negotiation"); case SASL_LIST_MECHANISMS_RESP: Subject.doAs( subject, @@ -446,11 +449,6 @@ public Object run() throws Exception { break; case SASL_SUCC: markSessionConnected(fields.nettyChannel); - negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session - return; - case SASL_AUTH_FAIL: - //throw new Exception("Received SASL_AUTH_FAIL"); - startNegotiation(); return; default: throw new Exception("Received an unknown response, status " + resp.status); @@ -466,7 +464,7 @@ ConnState getState() { } interface MessageResponseFilter { - public boolean abandonIt(error_types err, TMessage header); + boolean abandonIt(error_types err, TMessage header); } MessageResponseFilter filter = null; @@ -493,9 +491,7 @@ private final static class VolatileFields { private String serviceFqdn; // name used for SASL authentication private CallbackHandler cbh = null; // Don't need handler for GSSAPI private SaslClient saslClient; - private negotiation_status negoStatus = negotiation_status.INVALID; private final HashMap props = new HashMap(); - private LoginContext loginContext = null; private final Subject subject; // TODO: read expected mechanisms from config file private static final List expectedMechanisms =