Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rpc_soak and channel_soak test to cover concurrency in Go #7926

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fd83eee
Improve the rpc soak test and channel soak test to cover concurrency …
zbilun Dec 12, 2024
0bbe38b
Remove .idea/ directory from Git tracking
zbilun Dec 12, 2024
219e17e
Fix the style issue
zbilun Dec 12, 2024
ead030a
Remove .idea/ from .gitignore
zbilun Dec 12, 2024
cbddddc
Add .gitignore file
zbilun Dec 12, 2024
be47357
fix channel close issue
zbilun Dec 12, 2024
bfd1b8e
Stop tracking .idea directory
zbilun Dec 12, 2024
3929ecf
Fix the comments style.
zbilun Dec 13, 2024
742ed86
Fix the go style issues.
zbilun Dec 17, 2024
9fdcd56
Fix the go style issues about the space.
zbilun Dec 17, 2024
b718300
Debug for test failure.
zbilun Dec 17, 2024
2df82f8
Replace long parameter lists with struct for cleaner code.
zbilun Dec 17, 2024
2d72173
Clean code by deleting useless comments.
zbilun Dec 17, 2024
a0a3ae2
Fix the test fail
zbilun Dec 17, 2024
957c1b4
Fix the format check
zbilun Dec 17, 2024
67948ce
Fix the p.addr
zbilun Dec 17, 2024
f03b6fe
Change the addrStr back due to the print type issue.
zbilun Dec 17, 2024
f621f31
Clean comments.
zbilun Dec 17, 2024
2204ae8
Add print message.
zbilun Dec 17, 2024
ca86c16
Debug the test fail
zbilun Dec 18, 2024
22c5544
Duplicate print error
zbilun Dec 18, 2024
04282c5
Refactor the doSoakTest func.
zbilun Dec 18, 2024
959c594
Clean comments.
zbilun Dec 18, 2024
895af05
Clean empty line.
zbilun Dec 18, 2024
262b98e
Address the second round comments.
zbilun Dec 29, 2024
1282e49
Fix the format issues.
zbilun Dec 29, 2024
5ef8eca
Fix the naming check.
zbilun Dec 29, 2024
64fc618
Remove .gitignore file from repository
zbilun Dec 29, 2024
85e2438
Refactor the common config.
zbilun Dec 30, 2024
0395c79
Clean comments.
zbilun Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
zbilun marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea/

13 changes: 10 additions & 3 deletions interop/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
numThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.")
zbilun marked this conversation as resolved.
Show resolved Hide resolved
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
Expand Down Expand Up @@ -261,7 +262,7 @@ func main() {
}
opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
}
conn, err := grpc.Dial(serverAddr, opts...)
conn, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
logger.Fatalf("Fail to dial: %v", err)
}
Expand Down Expand Up @@ -358,10 +359,16 @@ func main() {
interop.DoPickFirstUnary(ctx, tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
interop.DoSoakTest(ctxWithDeadline, conn, serverAddr, *numThreads, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize,
zbilun marked this conversation as resolved.
Show resolved Hide resolved
time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, *soakOverallTimeoutSeconds,
interop.UseSharedChannel)
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
interop.DoSoakTest(ctxWithDeadline, conn, serverAddr, *numThreads, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize,
time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, *soakOverallTimeoutSeconds,
func(currentChannel *grpc.ClientConn) (*grpc.ClientConn, testgrpc.TestServiceClient) {
return interop.CreateNewChannel(currentChannel, serverAddr, opts)
})
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(ctx, tc)
Expand Down
255 changes: 201 additions & 54 deletions interop/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -684,98 +685,244 @@ func DoPickFirstUnary(ctx context.Context, tc testgrpc.TestServiceClient) {
}
}

func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
var conn *grpc.ClientConn
conn, err = grpc.Dial(serverAddr, dopts...)
// SoakIterationResult represents the latency and status results of a single iteration in the soak test.
type SoakIterationResult struct {
LatencyMs int64
Status string // The status of the iteration
zbilun marked this conversation as resolved.
Show resolved Hide resolved
zbilun marked this conversation as resolved.
Show resolved Hide resolved
}

// ThreadResults stores the aggregated results for a specific thread during the soak test.
type ThreadResults struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to create a new file for all the soak test things? This file is quite long already.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion! I understand the concern about the file length. However, the current design is aligned with the Java version. Perhaps we can defer this for now and address any cross-language changes after the C++ and Go versions are released in a separate PR?

IterationsDone int
Failures int
Latencies *stats.Histogram
}

// ManagedChannel determines whether a new channel will be created or an existing one reused.
type ManagedChannel func(*grpc.ClientConn) (*grpc.ClientConn, testgrpc.TestServiceClient)

// createChannel initializes the shared channel (once for all threads).
func createChannel(serverAddr string, dopts []grpc.DialOption) (*grpc.ClientConn, testgrpc.TestServiceClient) {
conn, err := grpc.NewClient(serverAddr, dopts...)
if err != nil {
log.Fatalf("Failed to create shared channel: %v", err)
}
client := testgrpc.NewTestServiceClient(conn)
return conn, client
}

func closeChannel(channel *grpc.ClientConn) {
if channel != nil {
err := channel.Close()
if err != nil {
return
log.Fatalf("Failed to close channel: %v", err)
}
defer conn.Close()
client = testgrpc.NewTestServiceClient(conn)
}
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
}

// CreateNewChannel creates a new channel by shutting down the current one (for channel soak tests).
func CreateNewChannel(currentChannel *grpc.ClientConn, serverAddr string, dopts []grpc.DialOption) (*grpc.ClientConn, testgrpc.TestServiceClient) {
closeChannel(currentChannel)
conn, client := createChannel(serverAddr, dopts)
return conn, client
zbilun marked this conversation as resolved.
Show resolved Hide resolved
}

// UseSharedChannel reuses the provided currentChannel.
func UseSharedChannel(currentChannel *grpc.ClientConn) (*grpc.ClientConn, testgrpc.TestServiceClient) {
client := testgrpc.NewTestServiceClient(currentChannel)
return currentChannel, client
}

func doOneSoakIteration(
ctx context.Context,
client testgrpc.TestServiceClient,
soakRequestSize int,
soakResponseSize int,
copts []grpc.CallOption) (SoakIterationResult, error) {
start := time.Now()
zbilun marked this conversation as resolved.
Show resolved Hide resolved
var err error
zbilun marked this conversation as resolved.
Show resolved Hide resolved
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(soakResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = client.UnaryCall(ctx, req, copts...)
zbilun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return
return SoakIterationResult{}, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize)
return
return SoakIterationResult{}, err
}
return
// Calculate latency and return result.
latency := time.Since(start).Milliseconds()
return SoakIterationResult{
LatencyMs: latency,
Status: "OK",
}, nil
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration) {
start := time.Now()
var elapsedTime float64
iterationsDone := 0
totalFailures := 0
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
}
h := stats.NewHistogram(hopts)
for i := 0; i < soakIterations; i++ {
func executeSoakTestInThread(
ctx context.Context,
soakIterationsPerThread int,
startNs int64,
minTimeBetweenRPCs time.Duration,
soakRequestSize int,
soakResponseSize int,
perIterationMaxAcceptableLatency time.Duration,
overallTimeoutSeconds int,
serverAddr string,
threadResults *ThreadResults,
mu *sync.Mutex,
sharedChannel *grpc.ClientConn,
threadID int,
MayCreateNewChannel ManagedChannel) {
zbilun marked this conversation as resolved.
Show resolved Hide resolved
timeoutDuration := time.Duration(overallTimeoutSeconds) * time.Second
currentChannel := sharedChannel

for i := 0; i < soakIterationsPerThread; i++ {
if ctx.Err() != nil {
elapsedTime = time.Since(start).Seconds()
break
return
}

if time.Since(time.Unix(0, startNs)) >= timeoutDuration {
zbilun marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Test exceeded overall timeout of %d seconds, stopping...\n", overallTimeoutSeconds)
return
}
earliestNextStart := time.After(minTimeBetweenRPCs)
iterationsDone++
// Get the channel and client from the provided channelFunc (either shared or new).
_, client := MayCreateNewChannel(currentChannel)
var p peer.Peer
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)})
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
result, err := doOneSoakIteration(
ctx,
client,
soakRequestSize,
soakResponseSize,
[]grpc.CallOption{grpc.Peer(&p)})
addrStr := "nil"
if p.Addr != nil {
addrStr = p.Addr.String()
} else {
fmt.Fprintf(os.Stderr, "No peer address available for this RPC.\n")
}
zbilun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
totalFailures++
addrStr := "nil"
if p.Addr != nil {
addrStr = p.Addr.String()
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
fmt.Fprintf(os.Stderr, "Thread %d: soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", threadID, i, 0, addrStr, serverAddr, err)
mu.Lock()
zbilun marked this conversation as resolved.
Show resolved Hide resolved
threadResults.Failures++
mu.Unlock()
<-earliestNextStart
continue
}
if latency > perIterationMaxAcceptableLatency {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
latencyMs := result.LatencyMs
if latencyMs > perIterationMaxAcceptableLatency.Milliseconds() {
fmt.Fprintf(os.Stderr, "Thread %d: soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", threadID, i, latencyMs, addrStr, serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
mu.Lock()
threadResults.Failures++
mu.Unlock()
<-earliestNextStart
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
// Success: log the details of the iteration.
mu.Lock()
threadResults.Latencies.Add(latencyMs)
threadResults.IterationsDone++
mu.Unlock()
fmt.Fprintf(os.Stderr, "Thread %d: soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", threadID, i, latencyMs, addrStr, serverAddr)
<-earliestNextStart
}
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(
ctx context.Context,
conn *grpc.ClientConn,
serverAddr string,
numThreads int,
soakIterations int,
maxFailures int,
soakRequestSize int,
soakResponseSize int,
perIterationMaxAcceptableLatency time.Duration,
minTimeBetweenRPCs time.Duration,
overallTimeoutSeconds int,
MayCreateNewChannel ManagedChannel) {
if soakIterations%numThreads != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by numThreads\n")
}
sharedChannel := conn
startNs := time.Now().UnixNano()
var wg sync.WaitGroup
mu := sync.Mutex{}
threadResults := make([]ThreadResults, numThreads)
iterationsPerThread := soakIterations / numThreads
for i := 0; i < numThreads; i++ {
wg.Add(1)
go func(threadID int) {
defer wg.Done()
executeSoakTestInThread(
ctx,
iterationsPerThread,
startNs,
minTimeBetweenRPCs,
soakRequestSize,
soakResponseSize,
perIterationMaxAcceptableLatency,
overallTimeoutSeconds,
serverAddr,
&threadResults[threadID],
&mu,
sharedChannel,
threadID,
MayCreateNewChannel)
}(i)
}

// Wait for all goroutines to complete.
wg.Wait()

//Handle results.
totalIterations := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, thread := range threadResults {
totalIterations += thread.IterationsDone
totalFailures += thread.Failures
if thread.Latencies != nil {
// Add latencies from the thread's Histogram to the main latencies.
latencies.Merge(thread.Latencies)
}
}
var b bytes.Buffer
h.Print(&b)
fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
if iterationsDone < soakIterations {
logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, elapsedTime, iterationsDone, soakIterations)
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
serverAddr, totalIterations, soakIterations, totalFailures, b.String())

if totalIterations != soakIterations {
fmt.Fprintf(os.Stderr, "Soak test consumed all %d seconds of time and quit early, ran %d out of %d iterations.\n", overallTimeoutSeconds, totalIterations, soakIterations)
}

if totalFailures > maxFailures {
logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, maxFailures)
}
defer closeChannel(sharedChannel)
}

type testServer struct {
Expand Down
Loading
Loading