Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter subscribers #30

Open
Robert-M-Muench opened this issue May 24, 2019 · 27 comments
Open

Filter subscribers #30

Robert-M-Muench opened this issue May 24, 2019 · 27 comments

Comments

@Robert-M-Muench
Copy link

I have a sequence that takes (x,y) coordinates. Now I want to notify only subscribes which pass a hit-test for this coordinate. How can this be done?

All algorithms are working against the event streams but not against the subscribers.

@lempiji
Copy link
Owner

lempiji commented May 25, 2019

Thank you for using it.
Is it for the grid GUI posted in the forum?

I think the problem in terms of efficiency is "How to effectively target subscribers to be notified".

As a policy, how about doing the following in doSubscribe?

1.do a hit-test in doSubscribe to create a list of subscribers to notify
2.fetch sequentially with foreach and call put to notify each

Also, as you may have already used, the following algorithm for hit-test is interesting.
https://en.wikipedia.org/wiki/Z-order_curve

@Robert-M-Muench
Copy link
Author

Yes, I want to use RX as the fundamental communication concept for the GUI framework and the inter app-component communication pattern.

I have a very fast hit-test function implemented. It's using an rtree.

I'm (conceptually) struggling on how to do the doSubscribe function. My understanding is, that doSubscribe is like registering my function to call with the observable (I think I need a subject). How can/Why should I do the hit-test in doSubscribe? IMO the hit-test has to be triggered on a .put (where the x,y is received) and then do the hit-test, filter the list of subscribers, and call the registered function.

So, I somehow have to derive my own subject like conditionalSubject where code can subscribe to.

Or am I missing something?

@Robert-M-Muench
Copy link
Author

Robert-M-Muench commented May 26, 2019

If I do something like:

class HitTestSubject(E) : Subject!E {
  override void put(E obj){
  }
}

How can I iterate through all subscribers inside put()? I just want to override this function and use all the rest from RX as is.

@Robert-M-Muench
Copy link
Author

Further investigated how to do this. It's not possible to access the _observer range from super class as it's private not protected. Any idea how to do this?

@lempiji
Copy link
Owner

lempiji commented May 27, 2019

Subject does something a bit more complicated about managing Observers. (because to make subscribe lock-free)
Currently, it is difficult to iterate over the observer by inheriting the Subject.

For now, I think it's practical to implement your own Subject (or Observer), assuming there is no increase or decrease in Observer.

The enhancements to allow access to _observers are a bit more complicated to implement, so I'll consider making a beta version.

If you have a good idea, please give me the pull request.

@Robert-M-Muench
Copy link
Author

Idea: It would be nice to be able to provide a "conditional" function (delegate?) to put() which is checked in the foreach(_observers) loop. The delegate should be called with user-provided data (obj) so that all kind of checks can be done.

Deriving from Subject (or others) make sense to store all kind of run-time data. This data should be accessible by the conditional function used by put().

@Robert-M-Muench
Copy link
Author

For now, I think it's practical to implement your own Subject (or Observer), assuming there is no increase or decrease in Observer.

I don't understand what you mean with "increase or decrease in Observer"? The number of observers will change during run-time.

I already tried to implement my own Subject. But it seems I need to add my implementation directly to RX.subject.d because things I need from this file are marked as private.

@Robert-M-Muench
Copy link
Author

Robert-M-Muench commented May 30, 2019

@lempiji How about a put() that takes a delegate? Like:

struct observerFilter {
  int opApply(scope int delegate(<obj, observer, etc.>) foreach_body) {
  }
}

and inside put() instead of:

foreach(observer, oberservers) {...}

we have:

foreach(observer, userProvidedFilter(observers)){...}

Then I can filter the oberservers as I like.

As a simple interface I can imagine something like:

mysubject.put(myObj, myDelegate);

lempiji added a commit that referenced this issue May 31, 2019
@lempiji
Copy link
Owner

lempiji commented May 31, 2019

@Robert-M-Muench
I added some accessors and wrote a unit test.

However, having to write if and cast first is somewhat unfriendly.

I'm thinking of simply adding a method to SubjectObject, but in many cases it's likely to be a useless method, so I gave up the implementation.

After that, I thought about creating a signature method that I sought by inheriting SubjectObject, but I think that it is not a good way to add classes for each type such as AsyncSubject.

Do you think this is enough?

@Robert-M-Muench
Copy link
Author

  1. Why do you use .put(sub, -1) and not sub.put(-1)? Any reason behind this?
  2. My idea was that there is a special subject.put(obj, filter-function) which calls filter-function to decide if an observer's put(obj) should be called or not. filter-function would get (subject.obj, observer.obj, observers)
  3. An alternative to 2. could be that subject.put() call a filter-function which returns an array/range/etc. of observers for which observer.put() is called. This makes it even more flexible.

Overall I think that the filter-function should be run-time provided and not a fixed implementation in the class hierarchy. More like a plug-in which can change during run-time.

@Robert-M-Muench
Copy link
Author

I have another idea: Why not use a reactive stream "filter-observers" which subject provides, and I can subscribe to with whatever function I want. Then subject checks if there is a subscriber, if, it calls the subscriber with the necessary information (maybe a struct?) and gets back a list of observers to notify.

Or a call where I can submit a list of observers to notify and subject keeps the list.

@lempiji
Copy link
Owner

lempiji commented Jun 2, 2019

1:
I use this format because Observer was originally defined as OutputRange and this is the correct use.
See isOutputRange in std.range.

I have failed several times in the past, and there are some traps to solve the overload, so it's better not to use UFCS either.

By the way, you can subscribe to Observable!string with Observer!char.

About 3 years ago, I wrote a detailed article about this in Japanese, but I can't translate it into English.
https://qiita.com/lempiji/items/2070719fee944e64b457

2:
Since the signature of put is already defined by OutputRange, I don't want to go in the direction of extending this itself.
std.digest has a similar interface, but I don't like it.

3, anonymous idea:
I thought it would be possible to apply a filter function at run-time by slightly extending MyCustomSubject in unittest.

SubjectObject manages the child Observers in one CompositeObserver, rather than in a list, to make internal processing lock-free. (so that cas can exchange it for atomic,)

The hardest thing for a Subject is to actually pull out the Observer list and process it.
If the Subject processes the list in the foreach itself from the retrieved Observer, it seems to reveal the role that CompositeObserver encapsulates.

However, bundling your own Observers into a single Observer is more desirable in terms of performance.
Because the SubjectObject internally wraps the Observer with Observer!E, this performance degradation is minimal.

I think subscription management is hard to implement, but if the number of your own Observers doesn't change dynamically, it shouldn't be that hard.

@Robert-M-Muench
Copy link
Author

  1. Ok, understood. So, .put() uses some run-time fallbacks to create the desired effect. Still learning ranges.

  2. My idea could be split into two calls. One to set the function to call and the other being the normal .put. I still like the idea to specify a filter-like function dynamically.

  3. I'm currently extending an RX fork for this idea. And did it in a way where you give a Subject the filter-function, which just forwards it to the internally used CompositeObserver. So, it's more to keep the interface logically clear, since one is working with a Subject.

In my use case GUI, the observers dynamically change a lot and fast, so efficient subscription management is critical. But, no pre-mature optimization, I want to get the whole thing up and running first.

So, how to proceed? Any conclusion now?

@Robert-M-Muench
Copy link
Author

Here you can see how I did it so far: https://pastebin.com/HC7fMMVA

@Robert-M-Muench
Copy link
Author

Robert-M-Muench commented Jun 2, 2019

How about this: FilteredSubject gets a _filteredObserver member and a filteredSubscribe() function. The code can be shared mostly. Some client code somehow gets the list of relevant observers and updates the _filteredObserver. If _filteredObserver contains something .put() will use this otherwise _observers.

With this client-code can subscribe at will, and some other app-part can limit which of the subscribed observers get notified.

Not sure how fast subscribing/unsubscribing is. What do you think? This would mainly keep all the logic as is, re-use a lot of code and can even be derived from Subject.

@Robert-M-Muench
Copy link
Author

Please see: https://pastebin.com/GKa08SPM

After playing around I think this approach is the best and minimally invasive. Some code can manage a list of observers which are notified.

I think deriving a FilteredSubject class makes sense, where the _filteredObservers is added.

I don't fully understand all the shared(...) stuff or the use of cas(...) so not sure how to best implement things. Maybe you can take the FilteredSubject idea and do a correct implementation?

What do you think?

@Robert-M-Muench
Copy link
Author

The problem I now see is if I use this approach in my app code:

mySubject.doSubscribe!((Para p) {myFunc(p);}); 

I think a special observer is created (??? which uses myFunc() as a delegate) and subscribed, meaning it goes into _observers.

Later on, I need to somehow insert this specially created observer to the _filteredObserver list, so a comparison between the entries in _observer and _filterObserver is possible.

But how do I access the thing created by doSubscribe()? I don't understand what I could compare...

@lempiji
Copy link
Owner

lempiji commented Jun 14, 2019

I looked at the code of HitTest. I think I understand the concept.

But keeping everything lock-free is really hard and tedious...

It will take time, so why don't you do the following for now?
(It is stored in _observers without being processed internally.)

auto observer = makeObserver(&myFunc).observerObject!Para();
subject.doSubscribe(observer);

@Robert-M-Muench
Copy link
Author

makeObserver(&myFunc).observerObject!Para();

How is this line working?

makeObserver normally needs at least two delegates but here only one is given, so observerObject() returns an ObserverObject that has a completed() and failure() member?

But, observerObject takes an E which I understand, and uses an R. Where is R coming from / what is R in the above case? And, the _range.completed() and _range.failure() functions would be null in our case, right? So, later on, if these are called, the makeObserver implementation checks for this null, and does nothing.

Such a detailed explanation would help a lot to understand the RX code. It's a pretty advanced code base and without knowing all the D details etc. it's very hard to understand.

@Robert-M-Muench
Copy link
Author

Robert-M-Muench commented Jun 15, 2019

WRT the R parameter, I'm not sure how your line cab be re-written as:

 observerObject!Para(makeObserver(&myFunc));

But here makeObserver() would be called first, which would be missing the delegates for completed and failure... or

makeObserver(observerObject!Para(&myFunc));

which returns an ObserverObject with default completed() and failure() variants. That sounds logical.

On the other hand, I don't understand how the UFCS syntax works here. Because UFCS states: "A free function can be called with a syntax that looks as if the function were a member function of its first parameter type."

But wouldn't the first parameter be: makeObserver(&myFunc)? Which leads to my first version, which is unlogical. So... I'm confused how this works.

@lempiji
Copy link
Owner

lempiji commented Jun 16, 2019

Oh, I was a mistake.
The avobe code don't need makeObserver.

auto observer = observerObject!Para(&myFunc);

@Robert-M-Muench
Copy link
Author

Ok. That makes more sense.

Anyway, I'm still struggling with the following:

  1. We have 1..N objects of type A.
  2. Each object can subscribe to 1..M streams using 1..M different observerObject!T(&myMemberFunc). So an object subscribes to different streams.
  3. I have one central HitTest which returns me the hit objects of type A.
  4. I know which of the 1..M streams the returned set of objects from step 3 should be filtered.

The problem I have is, how can I filter inside the streams based on the object reference when I only have an observerObject? Can I somehow get the object reference (parentof?) of the observerObject?

@Robert-M-Muench
Copy link
Author

Robert-M-Muench commented Jun 16, 2019

This is an code example from unittests:

    int putCount = 0;
    int completedCount = 0;
    int failureCount = 0;

    class TestObserver : Observer!int
    {
        void put(int n)
        {
            putCount++;
        }

        void completed()
        {
            completedCount++;
        }

        void failure(Exception e)
        {
            failureCount++;
        }
    }

    static assert(isObserver!(TestObserver, int));

    auto test = new TestObserver;
    Observer!int observer = observerObject!int(test);

How can I compare observer to test? Something like: observer == test should yield true. And the same for a member function:

    Observer!int observer = observerObject!int(&test.mymember);

How can I now compare/find-out that obeserver and test are related?

@Robert-M-Muench
Copy link
Author

So, I think it's somehow necessary to use delegates here. As it's possible to access the .ptr of a delegate which gives the object to compare to.

@Robert-M-Muench
Copy link
Author

@lempiji Since I don't have any email from you (would be great to get it via PM) I know tried my best to implement a ContextObserver. The idea is, that it can carry a pointer to whatever gives access to a context. And that this context pointer is used for filtering. Like only inform observers that have the same context pointer.

Anyway, I'm struggling with a couple of things (marked with @@). Maybe you can take a look and I think you get the idea. But my implementation is mostly not good nor usable.

See: https://github.com/Robert-M-Muench/rx/blob/dev/source/rx/contextobserver.d

.

@Robert-M-Muench
Copy link
Author

After a long time still struggling...

Looking at your MyFilterSubject example in subject.d I don't have a clue how I can get rid of the foreach loop? The whole RX lib seems to assume that observers are iterable with a foreach loop.

What I want to do is avoid looping through all observer, but query the observers to notify. So that query would return a list, which I would iterate over.

So, this requires a subscribe() function that handles the subscriptions differently. But that doesn't compile. See: https://pastebin.com/5BTT16Ze for example code.

@aberba
Copy link

aberba commented Jun 30, 2020

@Robert-M-Muench there's GitHub gist for showing snippets without leaving GitHub.

Sorry I can't help with the RX thing though. Way about my brains.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants