Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix about handling auth error
Browse files Browse the repository at this point in the history
  • Loading branch information
huangwei5 committed Jan 18, 2019
1 parent 36b4e3a commit 1688909
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 45 deletions.
2 changes: 1 addition & 1 deletion configuration/pegasus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10
open_auth = false
jaas_conf = configuration/jaas.conf
jaas_conf = configuration/pegasus_jaas.conf
service_name = xxx
service_fqdn = xxx
File renamed without changes.
4 changes: 4 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/base/error_code.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public enum error_types {
ERR_MOCK_INTERNAL,
ERR_ZOOKEEPER_OPERATION,

ERR_AUTH_NEGO_FAILED,

ERR_UNAUTHENTICATED,
ERR_ACL_DENY,
//ERROR_CODE defined by client
ERR_SESSION_RESET,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public class negotiate_operator extends client_operator {
public negotiate_operator(negotiation_message request) {
super(new gpid(), ""); // TODO HW gpid and tableName needless
super(new gpid(), "");
this.request = request;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class Cluster {
public static final String PEGASUS_OPEN_AUTH_DEF = "false";

public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf";
public static final String PEGASUS_JAAS_CONF_DEF = "configuration/jaas.conf";
public static final String PEGASUS_JAAS_CONF_DEF = "configuration/pegasus_jaas.conf";

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
int operatorTimeout = Integer.parseInt(config.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public MetaSession(ClusterManager manager, String addrList[],
if (rpc_addr.fromString(addr)) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpc_addr));
}
else {
} else {
logger.error("invalid address {}", addr);
}
}
Expand Down Expand Up @@ -134,18 +133,16 @@ else if (metaError == error_types.ERR_FORWARD_TO_OTHERS) {
needDelay = false;
needSwitchLeader = true;
forwardAddress = getMetaServiceForwardAddress(op);
}
else {
} else {
round.callbackFunc.run();
return;
}
}
else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.errno == error_types.ERR_TIMEOUT) {
needDelay = false;
needSwitchLeader = true;
}
else {
logger.error("unknown error: {}", op.rpc_error.errno.toString());
} else {
logger.error(op.rpc_error.errno == error_types.ERR_UNAUTHENTICATED ? "{}" : "unknown error: {}", op.rpc_error.errno.toString());
round.callbackFunc.run();
return;
}
Expand Down Expand Up @@ -176,8 +173,7 @@ else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.err
metaList.add(clusterManager.getReplicaSession(forwardAddress));
curLeader = metaList.size() - 1;
}
}
else if (metaList.get(curLeader) == round.lastSession) {
} else if (metaList.get(curLeader) == round.lastSession) {
curLeader = (curLeader + 1) % metaList.size();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void initChannel(SocketChannel ch) {
}

public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean openAuth, Subject subject, String serviceName, String serviceFqdn, MessageResponseFilter filter) {
this(address, rpcGroup, socketTimeout, true, subject, serviceName, serviceFqdn);
this(address, rpcGroup, socketTimeout, openAuth, subject, serviceName, serviceFqdn);
this.filter = filter;
}

Expand Down Expand Up @@ -211,15 +211,21 @@ private void markSessionConnected(Channel activeChannel) {
}

private void markSessionNegotiation(Channel activeChannel) {
logger.info("{}: mark session state negotiation");
VolatileFields newCache = new VolatileFields();
newCache.state = ConnState.NEGOTIATION;
newCache.nettyChannel = activeChannel;
fields = newCache;
logger.info("{}: mark session state negotiation, now negotiate", name());
startNegotiation();
if (needAuthConnection()) {
// version + auth, now only support auth
startNegotiation();
} else {
logger.info("{}: mark session state connected");
markSessionConnected(activeChannel);
}
}

private void markSessionDisconnect() {
private void markSessionDisconnect(error_types errorType) {
VolatileFields cache = fields;
synchronized (pendingSend) {
if (cache.state != ConnState.DISCONNECTED) {
Expand All @@ -231,14 +237,14 @@ private void markSessionDisconnect() {
// this. In this case, we are relying on the timeout task.
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyWithSequenceID(e.sequenceId, errorType, false);
}
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyWithSequenceID(e.sequenceId, errorType, false);
}

cache = new VolatileFields();
Expand All @@ -251,10 +257,17 @@ private void markSessionDisconnect() {
}
}

private void tryNotifyWithSequenceID(
int seqID,
error_types errno,
boolean isTimeoutTask) {
// for netty event, reflect status of the session
private void markSessionDisconnect() {
markSessionDisconnect(error_types.ERR_SESSION_RESET);
}

// for handling logic, initiative to disconnect the session. However, the only thing to do is 'markSessionDisconnect'
private void disconnectCurrentSession(error_types errorType) {
markSessionDisconnect(errorType);
}

private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) {
logger.debug("{}: {} is notified with error {}, isTimeoutTask {}",
name(), seqID, errno.toString(), isTimeoutTask);
RequestEntry entry = pendingResponse.remove(new Integer(seqID));
Expand Down Expand Up @@ -306,12 +319,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel {} for session {} is active", ctx.channel().toString(), name());
if (needAuthConnection() && !isAuthed()) {
logger.info("Session {} needs auth", name());
markSessionNegotiation(ctx.channel());
} else {
markSessionConnected(ctx.channel());
}
markSessionNegotiation(ctx.channel());
}

@Override
Expand Down Expand Up @@ -339,17 +347,11 @@ private boolean needAuthConnection() {
return openAuth;
}

private boolean isAuthed() {
return negoStatus == negotiation_status.SASL_SUCC;
}

private void startNegotiation() {
logger.info("{}: start auth negotiation", name());
negotiation_message msg = new negotiation_message();
msg.status = negotiation_status.SASL_LIST_MECHANISMS;

sendNegoMsg(msg);

logger.info("{}: start negotiation", name());
}

private void sendNegoMsg(negotiation_message msg) {
Expand Down Expand Up @@ -383,8 +385,8 @@ public void run() {
if (op.rpc_error.errno != error_types.ERR_OK) throw new ReplicationException(op.rpc_error.errno);
handleResp();
} catch (Exception e) {
// e.printStackTrace();
logger.error(e.toString());
disconnectCurrentSession(error_types.ERR_AUTH_NEGO_FAILED);
}
}

Expand All @@ -399,7 +401,8 @@ private void handleResp() throws Exception {
final negotiation_message msg = new negotiation_message();
switch (resp.status) {
case INVALID:
throw new Exception("Received a response which status is INVALID");
case SASL_AUTH_FAIL:
throw new Exception("Received a response which status is " + resp.status + ", break off this negotiation");
case SASL_LIST_MECHANISMS_RESP:
Subject.doAs(
subject,
Expand Down Expand Up @@ -446,11 +449,6 @@ public Object run() throws Exception {
break;
case SASL_SUCC:
markSessionConnected(fields.nettyChannel);
negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session
return;
case SASL_AUTH_FAIL:
//throw new Exception("Received SASL_AUTH_FAIL");
startNegotiation();
return;
default:
throw new Exception("Received an unknown response, status " + resp.status);
Expand All @@ -466,7 +464,7 @@ ConnState getState() {
}

interface MessageResponseFilter {
public boolean abandonIt(error_types err, TMessage header);
boolean abandonIt(error_types err, TMessage header);
}

MessageResponseFilter filter = null;
Expand All @@ -493,9 +491,7 @@ private final static class VolatileFields {
private String serviceFqdn; // name used for SASL authentication
private CallbackHandler cbh = null; // Don't need handler for GSSAPI
private SaslClient saslClient;
private negotiation_status negoStatus = negotiation_status.INVALID;
private final HashMap<String, Object> props = new HashMap<String, Object>();
private LoginContext loginContext = null;
private final Subject subject;
// TODO: read expected mechanisms from config file
private static final List<String> expectedMechanisms =
Expand Down

0 comments on commit 1688909

Please sign in to comment.