Skip to content

Commit

Permalink
App 3820: pass timestamp to onAnswererliveness (viamrobotics#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
RoxyFarhad authored Feb 16, 2024
1 parent 0185a12 commit fc9f253
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 14 deletions.
3 changes: 2 additions & 1 deletion rpc/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func TestDialWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down
6 changes: 3 additions & 3 deletions rpc/wrtc_call_queue_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type mongoDBWebRTCCallQueue struct {
csCtxCancel func()

// function to update access times on robot parts based on this call queue
activeAnswerersfunc *func(hostnames []string)
activeAnswerersfunc *func(hostnames []string, atTime time.Time)
// 1 caller/answerer -> 1 caller id -> 1 event stream
callExchangeSubs map[string]map[*mongodbCallExchange]struct{}

Expand Down Expand Up @@ -131,7 +131,7 @@ func NewMongoDBWebRTCCallQueue(
maxHostCallers uint64,
client *mongo.Client,
logger golog.Logger,
activeAnswerersfunc func(hostnames []string),
activeAnswerersfunc func(hostnames []string, atTime time.Time),
) (WebRTCCallQueue, error) {
if operatorID == "" {
return nil, errors.New("expected non-empty operatorID")
Expand Down Expand Up @@ -407,7 +407,7 @@ func (queue *mongoDBWebRTCCallQueue) operatorLivenessLoop() {
}

if queue.activeAnswerersfunc != nil {
(*queue.activeAnswerersfunc)(hostsWithAnswerers)
(*queue.activeAnswerersfunc)(hostsWithAnswerers, time.Now())
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions rpc/wrtc_call_queue_mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func TestMongoDBWebRTCCallQueue(t *testing.T) {
t.Helper()
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
logger := golog.NewTestLogger(t)
callQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
callQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
return callQueue, callQueue, func() {
test.That(t, callQueue.Close(), test.ShouldBeNil)
Expand All @@ -39,11 +40,11 @@ func TestMongoDBWebRTCCallQueueMulti(t *testing.T) {
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
logger := golog.NewTestLogger(t)
callerQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString()+"-caller",
maxCallerQueueSize, client, logger, func(hosts []string) {})
maxCallerQueueSize, client, logger, func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)

answererQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString()+"-answerer",
maxCallerQueueSize, client, logger, func(hosts []string) {})
maxCallerQueueSize, client, logger, func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
return callerQueue, answererQueue, func() {
test.That(t, callerQueue.Close(), test.ShouldBeNil)
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestMongoDBWebRTCCallQueueMulti(t *testing.T) {
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
answererQueue, err := NewMongoDBWebRTCCallQueue(
context.Background(), uuid.NewString()+"-answerer",
1, client, logger, func(hostnames []string) { activeAnswererChannelStub <- len(hostnames) })
1, client, logger, func(hostnames []string, atTime time.Time) { activeAnswererChannelStub <- len(hostnames) })
test.That(t, err, test.ShouldBeNil)
defer answererQueue.Close()

Expand Down
16 changes: 11 additions & 5 deletions rpc/wrtc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strings"
"testing"
"time"

"github.com/edaniels/golog"
"github.com/google/uuid"
Expand Down Expand Up @@ -39,7 +40,8 @@ func TestWebRTCClientServerWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down Expand Up @@ -131,7 +133,8 @@ func TestWebRTCClientDialCancelWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down Expand Up @@ -217,7 +220,8 @@ func TestWebRTCClientDialReflectAnswererErrorWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down Expand Up @@ -308,7 +312,8 @@ func TestWebRTCClientDialConcurrentWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down Expand Up @@ -447,7 +452,8 @@ func TestWebRTCClientAnswerConcurrentWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down
4 changes: 3 additions & 1 deletion rpc/wrtc_signaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/edaniels/golog"
"github.com/google/uuid"
Expand Down Expand Up @@ -35,7 +36,8 @@ func TestWebRTCSignalingWithMongoDBQueue(t *testing.T) {
logger := golog.NewTestLogger(t)
client := testutils.BackingMongoDBClient(t)
test.That(t, client.Database(mongodbWebRTCCallQueueDBName).Drop(context.Background()), test.ShouldBeNil)
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger, func(hosts []string) {})
signalingCallQueue, err := NewMongoDBWebRTCCallQueue(context.Background(), uuid.NewString(), 50, client, logger,
func(hosts []string, atTime time.Time) {})
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, signalingCallQueue.Close(), test.ShouldBeNil)
Expand Down

0 comments on commit fc9f253

Please sign in to comment.