Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inform the client if it subscribes to a new topic #979

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ var (
errHTTPBadRequestWebPushSubscriptionInvalid = &errHTTP{40038, http.StatusBadRequest, "invalid request: web push payload malformed", "", nil}
errHTTPBadRequestWebPushEndpointUnknown = &errHTTP{40039, http.StatusBadRequest, "invalid request: web push endpoint unknown", "", nil}
errHTTPBadRequestWebPushTopicCountTooHigh = &errHTTP{40040, http.StatusBadRequest, "invalid request: too many web push topic subscriptions", "", nil}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil}
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
errHTTPNoSubscriberUnifiedPush = &errHTTP{40401, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
errHTTPConflictUserExists = &errHTTP{40901, http.StatusConflict, "conflict: user already exists", "", nil}
errHTTPConflictTopicReserved = &errHTTP{40902, http.StatusConflict, "conflict: access control entry for topic or topic pattern already exists", "", nil}
errHTTPConflictSubscriptionExists = &errHTTP{40903, http.StatusConflict, "conflict: topic subscription already exists", "", nil}
Expand All @@ -142,5 +143,4 @@ var (
errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil}
errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil}
errHTTPInternalErrorWebPushUnableToPublish = &errHTTP{50004, http.StatusInternalServerError, "internal server error: unable to publish web push message", "", nil}
errHTTPInsufficientStorageUnifiedPush = &errHTTP{50701, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
)
4 changes: 2 additions & 2 deletions server/message_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func testCacheMessages(t *testing.T, c *messageCache) {
require.Nil(t, c.AddMessage(m2))

// Adding invalid
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example", false))) // These should not be added!

// mytopic: count
counts, err := c.MessageCounts()
Expand Down
28 changes: 14 additions & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,9 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e
}
if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
// Rate-Topics header). The 404 response might remove the push subscription from application servers,
// but the client should resubscribe them when sent the new_topic parameter.
return nil, errHTTPNoSubscriberUnifiedPush.With(t)
} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
return nil, errHTTPTooManyRequestsLimitMessages.With(t)
} else if email != "" && !vrate.EmailAllowed() {
Expand Down Expand Up @@ -848,18 +847,12 @@ func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *
if err != nil {
minc(metricMessagesPublishedFailure)
minc(metricMatrixPublishedFailure)
if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
topic, err := fromContext[*topic](r, contextTopic)
if err != nil {
return err
}
if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPNoSubscriberUnifiedPush.HTTPCode {
pushKey, err := fromContext[string](r, contextMatrixPushKey)
if err != nil {
return err
}
if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
return writeMatrixResponse(w, pushKey)
}
return writeMatrixResponse(w, pushKey)
}
return err
}
Expand Down Expand Up @@ -1225,16 +1218,19 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

createdNewTopics := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopics = createdNewTopics || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
if err := sub(v, newOpenMessage(topicsStr, createdNewTopics)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
Expand Down Expand Up @@ -1374,16 +1370,20 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
}
return s.sendOldMessages(topics, since, scheduled, v, sub)
}

createdNewTopic := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopic = createdNewTopic || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message

if err := sub(v, newOpenMessage(topicsStr, createdNewTopic)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/server_firebase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestToFirebaseMessage_Keepalive(t *testing.T) {
}

func TestToFirebaseMessage_Open(t *testing.T) {
m := newOpenMessage("mytopic")
m := newOpenMessage("mytopic", false)
fbm, err := toFirebaseMessage(m, nil)
require.Nil(t, err)
require.Equal(t, "mytopic", fbm.Topic)
Expand Down
9 changes: 0 additions & 9 deletions server/server_matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net/http"
"strings"
"time"
)

// Matrix Push Gateway / UnifiedPush / ntfy integration:
Expand Down Expand Up @@ -72,14 +71,6 @@ type matrixResponse struct {
Rejected []string `json:"rejected"`
}

const (
// matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter is the time after which a Matrix response
// will return an HTTP 200 with the push key (i.e. "rejected":["<pushkey>"]}), if no rate visitor has been set on
// the topic. Rejecting the push key will instruct the Matrix server to invalidate the pushkey and stop sending
// messages to it. This must be longer than topicExpungeAfter. See https://spec.matrix.org/v1.6/push-gateway-api/
matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter = 12 * time.Hour
)

// errMatrixPushkeyRejected represents an error when handing Matrix gateway messages
//
// If the push key is set, the app server will remove it and will never send messages using the same
Expand Down
64 changes: 57 additions & 7 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
Expand All @@ -147,6 +147,56 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
require.Equal(t, "", messages[1].Title)
require.Equal(t, 0, messages[1].Priority)
require.Nil(t, messages[1].Tags)

// The next time subscribing to the same topic will not result in new_topic on open
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan

messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)

// Subscribing to any new topic again will result in new_topic being sent
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic,topic2/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan

messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic,topic2", messages[0].Topic)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
}

func TestServer_PublishAndSubscribe(t *testing.T) {
Expand Down Expand Up @@ -1456,8 +1506,8 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber(t *testing.T) {
s := newTestServer(t, c)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 507, response.Code)
require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`+"\n", response.Body.String())
}

func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing.T) {
Expand All @@ -1466,16 +1516,16 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing.
s := newTestServer(t, c)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`

// No success if no rate visitor set (this also creates the topic in memory)
// Simply reject if no rate visitor set (this creates the topic in memory)
response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 507, response.Code)
require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String()))
require.Nil(t, s.topics["mytopic"].rateVisitor)

// Fake: This topic has been around for 13 hours without a rate visitor
s.topics["mytopic"].lastAccess = time.Now().Add(-13 * time.Hour)

// Same request should now return HTTP 200 with a rejected pushkey
// Same request should still return an HTTP 200 with a rejected pushkey
response = request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
require.Equal(t, 200, response.Code)
require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String()))
Expand Down
25 changes: 17 additions & 8 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ const (
// topic represents a channel to which subscribers can subscribe, and publishers
// can publish a message
type topic struct {
ID string
subscribers map[int]*topicSubscriber
rateVisitor *visitor
lastAccess time.Time
mu sync.RWMutex
ID string
subscribers map[int]*topicSubscriber
rateVisitor *visitor
lastAccess time.Time
neverSubscribed bool
mu sync.RWMutex
}

type topicSubscriber struct {
Expand All @@ -38,9 +39,10 @@ type subscriber func(v *visitor, msg *message) error
// newTopic creates a new topic
func newTopic(id string) *topic {
return &topic{
ID: id,
subscribers: make(map[int]*topicSubscriber),
lastAccess: time.Now(),
ID: id,
subscribers: make(map[int]*topicSubscriber),
lastAccess: time.Now(),
neverSubscribed: true,
}
}

Expand All @@ -61,6 +63,7 @@ func (t *topic) Subscribe(s subscriber, userID string, cancel func()) (subscribe
cancel: cancel,
}
t.lastAccess = time.Now()
t.neverSubscribed = false
return subscriberID
}

Expand All @@ -79,6 +82,12 @@ func (t *topic) LastAccess() time.Time {
return t.lastAccess
}

func (t *topic) NeverSubscribed() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.neverSubscribed
}

func (t *topic) SetRateVisitor(v *visitor) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
17 changes: 11 additions & 6 deletions server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (

// List of possible events
const (
openEvent = "open"
keepaliveEvent = "keepalive"
messageEvent = "message"
pollRequestEvent = "poll_request"
openEvent = "open"
openCreatedNewTopic = "new_topic"
keepaliveEvent = "keepalive"
messageEvent = "message"
pollRequestEvent = "poll_request"
)

const (
Expand Down Expand Up @@ -123,8 +124,12 @@ func newMessage(event, topic, msg string) *message {
}

// newOpenMessage is a convenience method to create an open message
func newOpenMessage(topic string) *message {
return newMessage(openEvent, topic, "")
func newOpenMessage(topic string, createdNewTopics bool) *message {
msg := ""
if createdNewTopics { // can expand this to a comma seperated string for more future parameters
msg = openCreatedNewTopic
}
return newMessage(openEvent, topic, msg)
}

// newKeepaliveMessage is a convenience method to create a keepalive message
Expand Down
Loading