Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22233 Zone replica listener #3931

Open
wants to merge 77 commits into
base: main
Choose a base branch
from

Conversation

kgusakov
Copy link
Contributor

Thank you for submitting the pull request.

To streamline the review process of the patch and ensure better code quality
we ask both an author and a reviewer to verify the following:

The Review Checklist

  • Formal criteria: TC status, codestyle, mandatory documentation. Also make sure to complete the following:
    - There is a single JIRA ticket related to the pull request.
    - The web-link to the pull request is attached to the JIRA ticket.
    - The JIRA ticket has the Patch Available state.
    - The description of the JIRA ticket explains WHAT was made, WHY and HOW.
    - The pull request title is treated as the final commit message. The following pattern must be used: IGNITE-XXXX Change summary where XXXX - number of JIRA issue.
  • Design: new code conforms with the design principles of the components it is added to.
  • Patch quality: patch cannot be split into smaller pieces, its size must be reasonable.
  • Code quality: code is clean and readable, necessary developer documentation is added if needed.
  • Tests code quality: test set covers positive/negative scenarios, happy/edge cases. Tests are effective in terms of execution time and resources.

Notes

* @param replica Table replica
* @return Future, which will be completed when operation done
*/
public CompletableFuture<Void> addTableReplica(ZonePartitionId zonePartitionId, TablePartitionId replicationGroupId, Replica replica) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed,

  1. I'd consider moving the method to the Replica itself.
  2. Change Replica param to Listener.
  3. Rename given method to addTableReplicaListener or addTableRequestProcessor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut);
CompletableFuture<ReplicaListener> newReplicaListenerFut = resultFuture.thenApply(createListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that previously createListener is rather confusing naming. I'd rather use common supplier suffix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but actually it is not a supplier, but the function with an argument instead.

@@ -50,7 +53,28 @@ public TopologyAwareRaftGroupService raftClient() {

@Override
public CompletableFuture<ReplicaResult> processRequest(ReplicaRequest request, String senderId) {
return listener.invoke(request, senderId);
if (!(request instanceof TableAware)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of requests are supposed to be processed here besides TableAware ones, Txn-related and SafeTime propagation related ones, right? Or there are more?
If two mentioned above - could you please create a tickets and add todos here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, done

ReplicationGroupId replicationGroupId = request.groupId();

// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine this code when the zone based replication will done.
if (replicationGroupId instanceof TablePartitionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please name requests that will use TablePartitionId for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, that I understood the question: in general in the main all requests, besides the idle safe time request, I suppose. But in the collocation feature branch I think it is not truth.

@@ -410,6 +412,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {

private final IndexMetaStorage indexMetaStorage;

private PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that TableManager should depend on PartitionReplciaLifecycleManager. Already existing replicaManager dependency should be enough. Meaning that TableManager should retrieve Replica from ReplicaManager and addTableProcessor to the replica directly. Basically it's a continuation of our discussion on collocation sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but we still need the module dependency, because I need to check the PartitionReplicaLifecycleManager.ENABLED flag

@@ -1004,7 +1011,7 @@ private CompletableFuture<Void> startPartitionAndStartClient(
MvTableStorage mvTableStorage = internalTable.storage();

try {
var ret = replicaMgr.startReplica(
return replicaMgr.startReplica(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the "code removal todo", it won't be necessary to startReplica after full migration to the zone based ones. Currently, yes, we will add it both to replicas directly and populate aggregated one with table processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


assertDoesNotThrow(() -> keyValueView.put(null, 1L, 200));

// Actually we are testing not the fair put value, but the hardcoded one from temporary noop replica listener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not be true any longer. We only skip the raft replication part, but we will retrieve previously inserted data. Because of this, I believe that we may verify random value insertion/retrieval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment. What about the values - it's already has two differents values check 100 vs 200. Do you think that we really has fair randomization here?


/**
* RAFT listener for the zone partition.
*/
public class ZonePartitionRaftListener implements RaftGroupListener {
private static final IgniteLogger LOG = Loggers.forClass(ZonePartitionReplicaImpl.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -37,7 +50,35 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather add following implementation for onRead()

        iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) -> {
            Command command = clo.command();

            assert false : "No read commands expected, [cmd=" + command + ']';
        });

just like the one we have in PartitionListener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the table replica listener if needed.
private final Map<TablePartitionId, ReplicaListener> replicas = new ConcurrentHashMap<>();

private final RaftGroupService raftClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use RaftCommandRunner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// TODO: https://issues.apache.org/jira/browse/IGNITE-22621 implement zone-based transaction storage
// and txn messages processing
if (request instanceof TxFinishReplicaRequest) {
TxFinishReplicaRequest txFinishReplicaRquest = (TxFinishReplicaRequest) request;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case. type in Rquest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

.get(txFinishReplicaRquest.commitPartitionId().asTablePartitionId())
.invoke(requestForTableListener, senderId);
} else {
LOG.debug("Non table request is not supported by the zone partition yet " + request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we will see such messages for SafeTimePropagationRequest, right? Any other? I believe I've already asked this before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, only the SafeTimePropagationRequest I guess.

* @return Replica's listener.
*/
@Deprecated(forRemoval = true)
ReplicaListener listener();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that it's a tmp solution, how are you going to add table processor to the ZoneReplcia?
I mean that currently it's

                replicaMgr.replica(new ZonePartitionId(zoneId, partId))
                    .thenAcceptAsync(zoneReplica ->
                            ((ZonePartitionReplicaListener) zoneReplica.listener()).addTableReplicaListener(
                                    new TablePartitionId(tableId, partId), createListener
                            ), ioExecutor
                    );

What is an expected solution to substitute aforementioned one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part is the the fact, that the ZonePartitionReplicaImpl is the part of replicator module and I can't use the ZonePartitionReplicaListener cast or smth else inside it. So, I have no better ideas for now, to be honest

@@ -635,6 +645,133 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
});
}

private CompletableFuture<Boolean> initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming seems confusing to me. I'd rather use populateZonePartitionListenerWithTableProcessor or similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to invent the new name for onTableCreate, because it is already occupied. WDYT about the "createLocalTableEntities"?

var startPartsFut = new ArrayList<CompletableFuture<?>>();

for (int i = 0; i < zoneDescriptor.partitions(); i++) {
if (partitionReplicaLifecycleManager.hasLocalPartition(new ZonePartitionId(zoneDescriptor.id(), i))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK we've discussed possible races here and several options to solve the problem e.g. by using closure that will atomically check whether we have partition localyl and populate it tableProcessors. Am I right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we need to replace it by the discussed methods, I made todo

// TODO: https://issues.apache.org/jira/browse/IGNITE-22624 replace this method by the replicas await process.


TableImpl table = createTableImpl(causalityToken, tableDescriptor, zoneDescriptor);

tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check it carefully. Do you believe that an order of updating corresponding VV's and addition of tableProcessors to the zone replica is correctly linearised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current guarantees is the same as in the main path, but not for storage, but for storages + table processors. So, I guess yes.

@@ -635,6 +645,133 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
});
}

private CompletableFuture<Boolean> initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most confusing method in the PR, to my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name or in general? Let's discuss the name here #3931 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants