forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
alterpartitionreassignments.go
134 lines (109 loc) · 3.91 KB
/
alterpartitionreassignments.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package kafka
import (
"context"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
)
// AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
// reassign to multiple topics.
Topic string
// Assignments is the list of partition reassignments to submit to the API.
Assignments []AlterPartitionReassignmentsRequestAssignment
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
// partition.
type AlterPartitionReassignmentsRequestAssignment struct {
// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
Topic string
// PartitionID is the ID of the partition to make the reassignments in.
PartitionID int
// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
BrokerIDs []int
}
// AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
type AlterPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error
// PartitionResults contains the specific results for each partition.
PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
}
// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
// doing reassignments for a single partition.
type AlterPartitionReassignmentsResponsePartitionResult struct {
// Topic is the topic name.
Topic string
// PartitionID is the ID of the partition that was altered.
PartitionID int
// Error is set to a non-nil value including the code and message if an error was encountered
// during the update for this partition.
Error error
}
func (c *Client) AlterPartitionReassignments(
ctx context.Context,
req *AlterPartitionReassignmentsRequest,
) (*AlterPartitionReassignmentsResponse, error) {
apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
for _, assignment := range req.Assignments {
topic := assignment.Topic
if topic == "" {
topic = req.Topic
}
apiTopic := apiTopicMap[topic]
if apiTopic == nil {
apiTopic = &alterpartitionreassignments.RequestTopic{
Name: topic,
}
apiTopicMap[topic] = apiTopic
}
replicas := []int32{}
for _, brokerID := range assignment.BrokerIDs {
replicas = append(replicas, int32(brokerID))
}
apiTopic.Partitions = append(
apiTopic.Partitions,
alterpartitionreassignments.RequestPartition{
PartitionIndex: int32(assignment.PartitionID),
Replicas: replicas,
},
)
}
apiReq := &alterpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}
for _, apiTopic := range apiTopicMap {
apiReq.Topics = append(apiReq.Topics, *apiTopic)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*alterpartitionreassignments.Response)
resp := &AlterPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
}
for _, topicResult := range apiResp.Results {
for _, partitionResult := range topicResult.Partitions {
resp.PartitionResults = append(
resp.PartitionResults,
AlterPartitionReassignmentsResponsePartitionResult{
Topic: topicResult.Name,
PartitionID: int(partitionResult.PartitionIndex),
Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
},
)
}
}
return resp, nil
}