Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New cloud event API #2840

Draft
wants to merge 93 commits into
base: develop
Choose a base branch
from
Draft

New cloud event API #2840

wants to merge 93 commits into from

Conversation

sergeuz
Copy link
Member

@sergeuz sergeuz commented Oct 24, 2024

Description

This PR introduces a new API for publishing and subscribing to cloud events. The new API supports sending and receiving up to 16K of data in an event.

Examples

Publishing a text event:

CloudEvent event;
event.name("my_event");
event.data("abc");
Particle.publish(event);
// or
Particle.publish(CloudEvent().name("my_event").data("abc"));

Publishing a binary-encoded event:

CloudEvent event;
event.name("my_event");
event.data("\x01\x02\x03\x04", 4);
event.contentType(ContentType::BINARY);
Particle.publish(event);

Using the event instance as a regular stream:

CloudEvent event;
event.name("my_event");
for (int i = 0; i < 10; ++i) {
    event.write("abc", 3);
}
Particle.publish(event);

Polling the status of an event:

CloudEvent event;
bool done = false;

void loop() {
    if (event.isNew()) {
        event = CloudEvent().name("my_event").data("abc");
        Particle.publish(event);
    }
    if (!done) {
        if (!event.isOk()) {
            // This catches any error that might have occurred while creating or sending the event
            Log.error("Failed to send event: %d", event.error());
            done = true;
        } else if (event.isSent()) {
            Log.info("Successfully sent event");
            done = true;
        }
    }
}

Using a callback to receive status updates for an event:

CloudEvent event;

void loop() {
    if (event.isNew()) {
        event = CloudEvent().name("my_event").data("abc");
        event.onStatusChange([](auto event) {
            if (!event.isOk()) {
                Log.error("Failed to send event: %d", event.error());
            } else if (event.isSent()) {
                Log.info("Successfully sent event");
            }
        });
        Particle.publish(event);
    }
}

Subscribing to events:

void eventHandler(CloudEvent event) {
    Log.info("Received event: %s", event.name());
    Log.info("Event data: %s", event.dataString().c_str());
    // or
    while (event.available() > 0) {
        Serial.print((char)event.read());
    }
}

void setup() {
    Particle.subscribe("my_event", eventHandler);
}

Copy link
Member

@monkbroc monkbroc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I mostly looked at the application API for large publishes.

}
assert(file_);
CHECK(seekInFile(fs.instance(), file_.get(), p - maxHeapSize_));
size_t n = CHECK_FS(lfs_file_write(fs.instance(), file_.get(), data, bytesToWrite));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: make sure that lfs_file_write can handle writing large buffers (>1000 bytes) in one shot or split the writes in here to the limit of lfs_file_write

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avtolstoy Is this still something on your team's radar or was it perhaps fixed already? It's that problem with writing to external flash when the source buffer is in internal flash, possibly at an unaligned address.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a fix for this, should get a PR out today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

*
* @return Event data.
*/
Variant dataAsVariant() /* FIXME: const */; // TODO: Rename?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still relevant since we only support receiving binary payloads from large events?

Copy link
Member Author

@sergeuz sergeuz Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly added it for feature parity with the existing API. The user can still send CBOR data in a binary event and use this method to parse it. Let me know if we don't want to support encoding and decoding variants in CloudEvent directly and I'll remove the related methods from it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6b4d489.

*
* @return `true` if the event is being sent to the Cloud, otherwise `false`.
*/
bool sending() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no shorthand to see if a CloudEvent is in NEW state. Can you add a new() method?

Copy link
Member Author

@sergeuz sergeuz Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new is a reserved keyword so it will be isNew(). For consistency, I'll need to rename all the other shorthand methods to have an is prefix which I originally omitted for brevity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rename the initial state PENDING

Copy link
Member Author

@sergeuz sergeuz Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added isNew and renamed the other methods to isSending, isOk, etc. PENDING is good but may be confusing for events in progress. Also, incoming events are received in that state too.

*
* @return `true` if the event is valid, otherwise `false`.
*/
bool valid() const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is odd since it checks if the state is not INVALID. Can you make this method invalid() so it's symmetrical with all the other state helpers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep it this way so that it's consistent with other APIs unless you feel strongly about it. We only use valid() or isValid() everywhere else in Device OS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good

*
* This method has no effect if the event is not currently being sent to the Cloud.
*
* A cancelled event cannot be published again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true? What state does a cancelled event go to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a cancelled event gets invalidated. Making it transition to a failed state instead is possible and would be more user friendly but it will require some additional work:

// TODO: For now, transition to an invalid state as an event in a failed state can be sent again
// and that would create a race condition between cancellation and normal completion of the event
// in sendComplete()
.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. No need for additional work here.

}
int r = coap_write_payload(payload, data, size, d_->pos, nullptr /* reserved */);
if (r < 0) {
if (r == Error::COAP_TOO_LARGE_PAYLOAD) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for an application to know how much room is left on the event (or what's the maximum total size of a large event) before calling write and getting COAP_TOO_LARGE_PAYLOAD?

Copy link
Member Author

@sergeuz sergeuz Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum supported size is exposed as CloudEvent::MAX_SIZE. The app can check how much space is left like this:

if (event.pos() + 100 <= CloudEvent::MAX_SIZE) {
    event.write(data, 100);
}

}
size_t size = this->size();
if (!RateLimiter::instance().take(size)) {
return Error::LIMIT_EXCEEDED;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So exceeding the amount of data in flight will return LIMIT_EXCEEDED, but the event will stay in the NEW state so it can be sent again later. Is there a way to query how much data is in flight to know when a new event can be sent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the event will stay in the NEW state

That was an oversight. The event will transition to a failed state on this error as well.

Is there a way to query how much data is in flight to know when a new event can be sent?

There's canPublish():

if (CloudEvent::canPublish(3000)) {
    Particle.publish(CloudEvent().name("my_event").data(buf, 3000));
}

I was hesitant to define it as Particle.canPublish() as not to create confusion since we have different limits for the new and old APIs.

@sergeuz sergeuz force-pushed the new-event-api/sc-131168 branch from 6de9504 to 7841734 Compare December 5, 2024 15:02
@Kategrode Kategrode added this to the 6.3.0 milestone Dec 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants