Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check multiple registration of same service name behaviour #6

Merged
merged 9 commits into from
Jun 18, 2022
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ subprojects {
targetCompatibility = JavaVersion.VERSION_17

repositories{
mavenLocal()
mavenCentral()
jcenter()
maven {
Expand Down
33 changes: 22 additions & 11 deletions rosjava/src/main/java/org/ros/internal/node/DefaultNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package org.ros.internal.node;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.ros.Parameters;
import org.ros.concurrent.CancellableLoop;
import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.exception.DuplicateServiceException;
import org.ros.exception.RemoteException;
import org.ros.exception.ServiceNotFoundException;
import org.ros.internal.message.Message;
Expand Down Expand Up @@ -184,7 +187,7 @@ public void onMasterRegistrationSuccess(Publisher<Log> registrant) {
return;
}

boolean useSimTime = false;
final boolean useSimTime;
try {
useSimTime =
parameterTree.has(Parameters.USE_SIM_TIME)
Expand Down Expand Up @@ -244,7 +247,7 @@ private <T extends Message> MessageSerializer<T> newServiceResponseSerializer(St

@SuppressWarnings("unchecked")
private <T extends Message> MessageDeserializer<T> newServiceResponseDeserializer(String serviceType) {
return this.nodeConfiguration.getMessageSerializationFactory()
return this.nodeConfiguration.getMessageSerializationFactory()
.newServiceResponseDeserializer(serviceType);
}

Expand All @@ -265,8 +268,7 @@ public <T extends Message> Publisher<T> newPublisher(GraphName topicName, String
final GraphName resolvedTopicName = resolveName(topicName);
final TopicDescription topicDescription =
this.nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
final TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null);
final TopicDeclaration topicDeclaration = TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null);
final org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType);
return this.publisherFactory.newOrExisting(topicDeclaration, serializer);
}
Expand All @@ -284,10 +286,8 @@ public <T extends Message> Subscriber<T> newSubscriber(GraphName topicName, Stri
@Override
public <T extends Message> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) {
final GraphName resolvedTopicName = resolveName(topicName);
final TopicDescription topicDescription =
this.nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
final TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints);
final TopicDescription topicDescription = this.nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
final TopicDeclaration topicDeclaration = TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints);
final MessageDeserializer<T> deserializer = newMessageDeserializer(messageType);
final Subscriber<T> subscriber = this.subscriberFactory.newOrExisting(topicDeclaration, deserializer);
return subscriber;
Expand All @@ -305,8 +305,17 @@ public <T extends Message> Subscriber<T> newSubscriber(String topicName, String

@Override
public <T extends Message, S extends Message> ServiceServer<T, S> newServiceServer(GraphName serviceName, String serviceType,
ServiceResponseBuilder<T, S> responseBuilder) {
ServiceResponseBuilder<T, S> responseBuilder) {
Preconditions.checkNotNull(responseBuilder, "ResponseBuilder should not be null");
Preconditions.checkNotNull(serviceName, "serviceName should not be null");
Preconditions.checkArgument(StringUtils.isNotBlank(serviceType), "serviceType should not be blank");


final GraphName resolvedServiceName = resolveName(serviceName);
// final URI uri = lookupServiceUri(resolvedServiceName);
// if (uri != null) {
// throw new DuplicateServiceException("Service " + resolvedServiceName + " is already registered with URI: " + uri);
// }
// TODO(damonkohler): It's rather non-obvious that the URI will be
// created later on the fly.
final ServiceIdentifier identifier = new ServiceIdentifier(resolvedServiceName, null);
Expand All @@ -321,13 +330,15 @@ public <T extends Message, S extends Message> ServiceServer<T, S> newServiceServ

@Override
public <T extends Message, S extends Message> ServiceServer<T, S> newServiceServer(String serviceName, String serviceType,
ServiceResponseBuilder<T, S> responseBuilder) {
ServiceResponseBuilder<T, S> responseBuilder) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceName), "serviceName should not be blank");

return newServiceServer(GraphName.of(serviceName), serviceType, responseBuilder);
}

@SuppressWarnings("unchecked")
@Override
public <T extends Message, S extends Message> ServiceServer<T, S> getServiceServer(GraphName serviceName) {
public <T extends Message, S extends Message> ServiceServer<T, S> getServiceServer(GraphName serviceName) {
return (ServiceServer<T, S>) serviceManager.getServer(serviceName);
}

Expand Down
4 changes: 2 additions & 2 deletions rosjava/src/main/java/org/ros/internal/node/RosoutLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.ros.internal.node;

import com.google.common.base.Preconditions;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.ros.Topics;
import org.ros.node.ConnectedNode;
import org.ros.node.topic.Publisher;
Expand Down Expand Up @@ -64,7 +64,7 @@ public RosoutLogger() {
/**
* Starts logging connected to ROS, if usePublisher is not null it is applied
*
* @param defaultNode
* @param connectedNode
* @param usePublisher
*/
RosoutLogger(final ConnectedNode connectedNode, final Consumer<Publisher<Log>> usePublisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.ros.internal.node.client;

import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.xmlrpc.client.XmlRpcClient;
import org.apache.xmlrpc.client.XmlRpcClientConfigImpl;
import org.apache.xmlrpc.client.XmlRpcCommonsTransportFactory;
Expand Down
48 changes: 19 additions & 29 deletions rosjava/src/main/java/org/ros/internal/node/client/Registrar.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright (C) 2011 Google Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -33,6 +33,7 @@
import org.ros.internal.node.topic.DefaultSubscriber;
import org.ros.internal.node.topic.PublisherIdentifier;
import org.ros.internal.node.topic.TopicParticipantManagerListener;
import org.ros.namespace.GraphName;
import org.ros.node.service.ServiceServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +48,7 @@
/**
* Manages topic, and service registrations of a {@link SlaveServer} with the
* {@link MasterServer}.
*
*
* @author [email protected] (Ken Conley)
* @author [email protected] (Damon Kohler)
*/
Expand Down Expand Up @@ -88,7 +89,7 @@ public Registrar(MasterClient masterClient, ScheduledExecutorService executorSer
* Failed registration actions are retried periodically until they succeed.
* This method adjusts the delay between successive retry attempts for any
* particular registration action.
*
*
* @param delay
* the delay in units of {@code unit} between retries
* @param unit
Expand Down Expand Up @@ -258,35 +259,24 @@ public void run() {
}
}



@Override
public void onServiceServerAdded(final ServiceServer<?, ?> serviceServer) {
if (DEBUG) {
LOGGER.info("Registering service: " + serviceServer);
}
boolean submitted = submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
boolean success = callMaster(new Callable<Response<Void>>() {
@Override
public Response<Void> call() throws Exception {
return masterClient.registerService(nodeIdentifier, serviceServer);
}
});
if (success) {
serviceServer.onMasterRegistrationSuccess();
} else {
serviceServer.onMasterRegistrationFailure();
}
return !success;
boolean submitted = submit(() -> {
boolean success = callMaster(() -> masterClient.registerService(nodeIdentifier, serviceServer));
if (success) {
serviceServer.onMasterRegistrationSuccess();
} else {
serviceServer.onMasterRegistrationFailure();
}
return !success;
});
if (!submitted) {
executorService.execute(new Runnable() {
@Override
public void run() {
serviceServer.onMasterRegistrationFailure();
}
});
executorService.execute(() -> serviceServer.onMasterRegistrationFailure());
}
}

Expand Down Expand Up @@ -325,7 +315,7 @@ public void run() {
/**
* Starts the {@link Registrar} for the {@link SlaveServer} identified by the
* given {@link NodeIdentifier}.
*
*
* @param nodeIdentifier
* the {@link NodeIdentifier} for the {@link SlaveServer} this
* {@link Registrar} is responsible for
Expand All @@ -339,12 +329,12 @@ public void start(NodeIdentifier nodeIdentifier) {

/**
* Shuts down the {@link Registrar}.
*
*
* <p>
* No further registration requests will be accepted. All queued registration
* jobs have up to {@link #SHUTDOWN_TIMEOUT} {@link #SHUTDOWN_TIMEOUT_UNITS}
* to complete before being canceled.
*
*
* <p>
* Calling {@link #shutdown()} more than once has no effect.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ public ServiceRegistrationInfo registerService(GraphName nodeName, URI nodeSlave
serviceUri, nodeName, nodeSlaveUri));
}

NodeRegistrationInfo node = obtainNodeRegistrationInfo(nodeName, nodeSlaveUri);
final NodeRegistrationInfo node = obtainNodeRegistrationInfo(nodeName, nodeSlaveUri);

ServiceRegistrationInfo service = services.get(serviceName);
ServiceRegistrationInfo service = services.get(serviceName);
if (service != null) {
NodeRegistrationInfo previousServiceNode = service.getNode();
final NodeRegistrationInfo previousServiceNode = service.getNode();
if (previousServiceNode == node) {
// If node is the same, no need to do anything
if (LOGGER.isWarnEnabled()) {
Expand Down Expand Up @@ -452,15 +452,15 @@ private NodeRegistrationInfo obtainNodeRegistrationInfo(GraphName nodeName, URI
* the node being replaced
*/
private void cleanupNode(NodeRegistrationInfo node) {
for (TopicRegistrationInfo topic : node.getPublishers()) {
for (final TopicRegistrationInfo topic : node.getPublishers()) {
topic.removePublisher(node);
}

for (TopicRegistrationInfo topic : node.getSubscribers()) {
for (final TopicRegistrationInfo topic : node.getSubscribers()) {
topic.removeSubscriber(node);
}

for (ServiceRegistrationInfo service : node.getServices()) {
for (final ServiceRegistrationInfo service : node.getServices()) {
services.remove(service.getServiceName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public <T extends Message, S extends Message> DefaultServiceServer<T, S> newSer
final GraphName name = serviceDeclaration.getName();

synchronized (this.mutex) {
if (serviceManager.hasServer(name)) {
if (this.serviceManager.hasServer(name)) {
throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name));
} else {
final DefaultServiceServer<T, S> serviceServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* @author [email protected] (Damon Kohler)
*/
public class ServiceIdentifier {
public final class ServiceIdentifier {

private final GraphName name;
private final URI uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public PublisherFactory(NodeIdentifier nodeIdentifier,
@SuppressWarnings("unchecked")
public <T extends Message> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageSerializer<T> messageSerializer) {
GraphName topicName = topicDeclaration.getName();
final GraphName topicName = topicDeclaration.getName();
synchronized (mutex) {
if (topicParticipantManager.hasPublisher(topicName)) {
return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName);
if (this.topicParticipantManager.hasPublisher(topicName)) {
return (DefaultPublisher<T>) this.topicParticipantManager.getPublisher(topicName);
} else {
DefaultPublisher<T> publisher =
final DefaultPublisher<T> publisher =
new DefaultPublisher<T>(nodeIdentifier, topicDeclaration, messageSerializer,
messageFactory, executorService);
publisher.addListener(new DefaultPublisherListener<T>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.ros.exception.RosRuntimeException;
import org.ros.internal.message.MessageBuffers;
Expand Down
Loading