-
Notifications
You must be signed in to change notification settings - Fork 796
/
alterclientquotas.go
131 lines (106 loc) · 3.56 KB
/
alterclientquotas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/alterclientquotas"
)
// AlterClientQuotasRequest represents a request sent to a kafka broker to
// alter client quotas.
type AlterClientQuotasRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of client quotas entries to alter.
Entries []AlterClientQuotaEntry
// Whether the alteration should be validated, but not performed.
ValidateOnly bool
}
type AlterClientQuotaEntry struct {
// The quota entities to alter.
Entities []AlterClientQuotaEntity
// An individual quota configuration entry to alter.
Ops []AlterClientQuotaOps
}
type AlterClientQuotaEntity struct {
// The quota entity type.
EntityType string
// The name of the quota entity, or null if the default.
EntityName string
}
type AlterClientQuotaOps struct {
// The quota configuration key.
Key string
// The quota configuration value to set, otherwise ignored if the value is to be removed.
Value float64
// Whether the quota configuration value should be removed, otherwise set.
Remove bool
}
type AlterClientQuotaResponseQuotas struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error
// The altered quota entities.
Entities []AlterClientQuotaEntity
}
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
// quotas request.
type AlterClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// List of altered client quotas responses.
Entries []AlterClientQuotaResponseQuotas
}
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
entries := make([]alterclientquotas.Entry, len(req.Entries))
for entryIdx, entry := range req.Entries {
entities := make([]alterclientquotas.Entity, len(entry.Entities))
for entityIdx, entity := range entry.Entities {
entities[entityIdx] = alterclientquotas.Entity{
EntityType: entity.EntityType,
EntityName: entity.EntityName,
}
}
ops := make([]alterclientquotas.Ops, len(entry.Ops))
for opsIdx, op := range entry.Ops {
ops[opsIdx] = alterclientquotas.Ops{
Key: op.Key,
Value: op.Value,
Remove: op.Remove,
}
}
entries[entryIdx] = alterclientquotas.Entry{
Entities: entities,
Ops: ops,
}
}
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
Entries: entries,
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
}
res := m.(*alterclientquotas.Response)
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
for responseEntryIdx, responseEntry := range res.Results {
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
for responseEntityIdx, responseEntity := range responseEntry.Entities {
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
EntityType: responseEntity.EntityType,
EntityName: responseEntity.EntityName,
}
}
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
Entities: responseEntities,
}
}
ret := &AlterClientQuotasResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Entries: responseEntries,
}
return ret, nil
}