diff --git a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c index f718a3e87..362171306 100644 --- a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c +++ b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c @@ -19,9 +19,15 @@ #include "deserialization_to_bpf_map.h" #include "../../config/kmesh_marcos_def.h" -#define LOG_ERR(fmt, args...) printf(fmt, ##args) -#define LOG_WARN(fmt, args...) printf(fmt, ##args) -#define LOG_INFO(fmt, args...) printf(fmt, ##args) +#define PRINTF(fmt, args...) \ + do { \ + printf(fmt, ##args); \ + fflush(stdout); \ + } while (0) + +#define LOG_ERR(fmt, args...) PRINTF(fmt, ##args) +#define LOG_WARN(fmt, args...) PRINTF(fmt, ##args) +#define LOG_INFO(fmt, args...) PRINTF(fmt, ##args) struct op_context { void *key; @@ -37,10 +43,10 @@ struct op_context { const ProtobufCMessageDescriptor *desc; }; -#define init_op_context(context, key, val, desc, o_fd, fd, o_info, i_info, m_info) \ +#define init_op_context(context, k, v, desc, o_fd, fd, o_info, i_info, m_info) \ do { \ - (context).key = (key); \ - (context).value = (val); \ + (context).key = (k); \ + (context).value = (v); \ (context).desc = (desc); \ (context).outter_fd = (o_fd); \ (context).map_fd = (fd); \ @@ -51,6 +57,16 @@ struct op_context { (context).curr_fd = (fd); \ } while (0) +#define append_new_node(elem_list_head, curr_elem_list_node, new_node) \ + do { \ + if (curr_elem_list_node == NULL) { \ + curr_elem_list_node = elem_list_head = new_node; \ + } else { \ + curr_elem_list_node->next = new_node; \ + curr_elem_list_node = new_node; \ + } \ + } while (0) + #define TASK_SIZE (100) struct inner_map_stat { int map_fd; @@ -771,6 +787,55 @@ static int repeat_field_query(struct op_context *ctx, const ProtobufCFieldDescri return ret; } +void deserial_free_elem_list(struct element_list_node *head) +{ + while (head != NULL) { + struct element_list_node *n = head; + deserial_free_elem(n->elem); + head = n->next; + free(n); + } +} + +static void *create_struct_list(struct op_context *ctx, int *err) +{ + void *prev_key = NULL; + void *value; + struct element_list_node *elem_list_head = NULL; + struct element_list_node *curr_elem_list_node = NULL; + + *err = 0; + ctx->key = calloc(1, ctx->curr_info->key_size); + while (!bpf_map_get_next_key(ctx->curr_fd, prev_key, ctx->key)) { + prev_key = ctx->key; + + value = create_struct(ctx, err); + if (*err) { + LOG_ERR("create_struct failed, err = %d\n", err); + break; + } + + if (value == NULL) { + continue; + } + + struct element_list_node *new_node = (struct element_list_node *)calloc(1, sizeof(struct element_list_node)); + if (!new_node) { + *err = -1; + break; + } + + new_node->elem = value; + new_node->next = NULL; + append_new_node(elem_list_head, curr_elem_list_node, new_node); + } + if (*err) { + deserial_free_elem_list(elem_list_head); + return NULL; + } + return elem_list_head; +} + static void *create_struct(struct op_context *ctx, int *err) { void *value; @@ -814,13 +879,71 @@ static void *create_struct(struct op_context *ctx, int *err) if (ret) { LOG_INFO("field[%d] query fail\n", i); *err = 1; - return value; + break; } } + if (*err) { + deserial_free_elem(value); + return NULL; + } + return value; } +struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor) +{ + int ret, err; + struct element_list_node *value_list_head = NULL; + const char *map_name = NULL; + struct op_context context = {.inner_map_object = NULL}; + const ProtobufCMessageDescriptor *desc; + struct bpf_map_info outter_info = {0}, inner_info = {0}, info = {0}; + int map_fd, outter_fd = 0, inner_fd = 0; + unsigned int id, outter_id = 0, inner_id = 0; + + if (msg_desciptor == NULL) + return NULL; + + desc = (ProtobufCMessageDescriptor *)msg_desciptor; + if (desc->magic != PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC) + return NULL; + + map_name = desc->short_name; + ret = get_map_ids(map_name, &id, &outter_id, &inner_id); + if (ret) + return NULL; + + ret = get_map_fd_info(id, &map_fd, &info); + if (ret < 0) { + LOG_ERR("invalid MAP_ID: %d\n", id); + return NULL; + } + + ret = get_map_fd_info(inner_id, &inner_fd, &inner_info); + ret |= get_map_fd_info(outter_id, &outter_fd, &outter_info); + if (ret < 0 || map_info_check(&outter_info, &inner_info)) + goto end; + + init_op_context(context, NULL, NULL, desc, outter_fd, map_fd, &outter_info, &inner_info, &info); + + value_list_head = create_struct_list(&context, &err); + if (err != 0) { + LOG_ERR("create_struct_list failed, err = %d", err); + } + +end: + if (context.key != NULL) + free(context.key); + if (map_fd > 0) + close(map_fd); + if (outter_fd > 0) + close(outter_fd); + if (inner_fd > 0) + close(inner_fd); + return value_list_head; +} + void *deserial_lookup_elem(void *key, const void *msg_desciptor) { int ret, err; @@ -860,8 +983,7 @@ void *deserial_lookup_elem(void *key, const void *msg_desciptor) normalize_key(&context, key, map_name); value = create_struct(&context, &err); if (err != 0) { - deserial_free_elem(value); - value = NULL; + LOG_ERR("create_struct failed, err = %d\n", err); } end: diff --git a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h index 6d8c71970..9d58a27bf 100644 --- a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h +++ b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h @@ -7,9 +7,16 @@ /* equal MAP_SIZE_OF_OUTTER_MAP */ #define MAX_OUTTER_MAP_ENTRIES (8192) +struct element_list_node { + void *elem; + struct element_list_node *next; +}; + int deserial_update_elem(void *key, void *value); void *deserial_lookup_elem(void *key, const void *msg_desciptor); +struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor); void deserial_free_elem(void *value); +void deserial_free_elem_list(struct element_list_node *head); int deserial_delete_elem(void *key, const void *msg_desciptor); int deserial_init(); diff --git a/pkg/bpf/bpf_kmesh.go b/pkg/bpf/bpf_kmesh.go index 578117d93..ac682a893 100644 --- a/pkg/bpf/bpf_kmesh.go +++ b/pkg/bpf/bpf_kmesh.go @@ -441,3 +441,7 @@ func (sc *BpfKmesh) Detach() error { } return nil } + +func AdsL7Enabled() bool { + return true +} diff --git a/pkg/bpf/bpf_kmesh_l4.go b/pkg/bpf/bpf_kmesh_l4.go index 70fd6fa94..a22c9f319 100644 --- a/pkg/bpf/bpf_kmesh_l4.go +++ b/pkg/bpf/bpf_kmesh_l4.go @@ -111,3 +111,7 @@ func (sc *BpfKmesh) Detach() error { } return nil } + +func AdsL7Enabled() bool { + return false +} diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 357d5243e..7023d0728 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -164,24 +164,6 @@ func (cache *ClusterCache) Delete() { } } -func (cache *ClusterCache) DumpBpf() []*cluster_v2.Cluster { - cache.mutex.RLock() - defer cache.mutex.RUnlock() - clusters := make([]*cluster_v2.Cluster, 0, len(cache.apiClusterCache)) - for name, c := range cache.apiClusterCache { - tmp := &cluster_v2.Cluster{} - if err := maps_v2.ClusterLookup(name, tmp); err != nil { - log.Errorf("ClusterLookup failed, %s", name) - continue - } - - tmp.ApiStatus = c.ApiStatus - clusters = append(clusters, tmp) - } - - return clusters -} - func (cache *ClusterCache) Dump() []*cluster_v2.Cluster { cache.mutex.RLock() defer cache.mutex.RUnlock() diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index 851c7d38b..77de061c9 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -17,6 +17,7 @@ package cache_v2 import ( + "sort" "testing" "github.com/agiledragon/gomonkey/v2" @@ -177,6 +178,33 @@ func TestClusterFlush(t *testing.T) { }) } +func TestClusterLookupAll(t *testing.T) { + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + t.Cleanup(cleanup) + testClusterNames := []string{"ut-cluster-1", "ut-cluster-2", "ut-cluster-3"} + for _, testClusterName := range testClusterNames { + err := maps_v2.ClusterUpdate(testClusterName, &cluster_v2.Cluster{Name: testClusterName}) + assert.Nil(t, err) + } + + clusters, err := maps_v2.ClusterLookupAll() + assert.Nil(t, err) + + var actualClusterNames []string + + for _, cluster := range clusters { + actualClusterNames = append(actualClusterNames, cluster.Name) + } + + sort.Strings(actualClusterNames) + assert.Equal(t, actualClusterNames, testClusterNames) +} + func BenchmarkClusterFlush(b *testing.B) { t := &testing.T{} config := options.BpfConfig{ diff --git a/pkg/cache/v2/listener.go b/pkg/cache/v2/listener.go index 0144c6914..7228e7f74 100644 --- a/pkg/cache/v2/listener.go +++ b/pkg/cache/v2/listener.go @@ -127,24 +127,6 @@ func (cache *ListenerCache) Flush() { } } -func (cache *ListenerCache) DumpBpf() []*listener_v2.Listener { - cache.mutex.RLock() - defer cache.mutex.RUnlock() - listeners := make([]*listener_v2.Listener, 0, len(cache.apiListenerCache)) - for name, listener := range cache.apiListenerCache { - tmp := &listener_v2.Listener{} - if err := maps_v2.ListenerLookup(listener.GetAddress(), tmp); err != nil { - log.Errorf("ListenerLookup failed, %s", name) - continue - } - - tmp.ApiStatus = listener.ApiStatus - listeners = append(listeners, tmp) - } - - return listeners -} - func (cache *ListenerCache) Dump() []*listener_v2.Listener { cache.mutex.RLock() defer cache.mutex.RUnlock() diff --git a/pkg/cache/v2/listener_test.go b/pkg/cache/v2/listener_test.go index f4f26bda7..3e9a87583 100644 --- a/pkg/cache/v2/listener_test.go +++ b/pkg/cache/v2/listener_test.go @@ -18,6 +18,7 @@ package cache_v2 import ( "fmt" + "sort" "testing" "github.com/agiledragon/gomonkey/v2" @@ -36,6 +37,35 @@ import ( "kmesh.net/kmesh/pkg/utils/test" ) +func TestListenerLookupAll(t *testing.T) { + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + t.Cleanup(cleanup) + testListenerNames := []string{"ut-listener-1", "ut-listener-2", "ut-listener-3"} + for i, testListenerName := range testListenerNames { + err := maps_v2.ListenerUpdate(&core_v2.SocketAddress{ + Port: uint32(i + 1), + }, &listener_v2.Listener{Name: testListenerName}) + assert.Nil(t, err) + } + + listeners, err := maps_v2.ListenerLookupAll() + assert.Nil(t, err) + + var actualListenerNames []string + + for _, listener := range listeners { + actualListenerNames = append(actualListenerNames, listener.Name) + } + + sort.Strings(actualListenerNames) + assert.Equal(t, actualListenerNames, testListenerNames) +} + func TestListenerFlush(t *testing.T) { t.Run("listener status is UPDATE", func(t *testing.T) { updateListenerAddress := []*core_v2.SocketAddress{} diff --git a/pkg/cache/v2/maps/authz.go b/pkg/cache/v2/maps/authz.go index c91a5bc5b..4983b9654 100644 --- a/pkg/cache/v2/maps/authz.go +++ b/pkg/cache/v2/maps/authz.go @@ -21,6 +21,7 @@ package maps // #include "workloadapi/security/authorization.pb-c.h" import "C" import ( + "errors" "fmt" "unsafe" @@ -71,6 +72,34 @@ func AuthorizationLookup(key uint32, value *security_v2.Authorization) error { return err } +func AuthorizationLookupAll() ([]*security_v2.Authorization, error) { + cMsg := C.deserial_lookup_all_elems(unsafe.Pointer(&C.istio__security__authorization__descriptor)) + if cMsg == nil { + return nil, errors.New("AuthorizationLookupAll deserial_lookup_all_elems failed") + } + + elem_list_head := (*C.struct_element_list_node)(cMsg) + defer C.deserial_free_elem_list(elem_list_head) + + var ( + authorizations []*security_v2.Authorization + err error + ) + for elem_list_head != nil { + cValue := elem_list_head.elem + elem_list_head = elem_list_head.next + authorization := security_v2.Authorization{} + err = authorizationToGolang(&authorization, (*C.Istio__Security__Authorization)(cValue)) + log.Debugf("AuthorizationLookupAll, value [%s]", authorization.String()) + if err != nil { + return nil, err + } + authorizations = append(authorizations, &authorization) + } + + return authorizations, nil +} + func AuthorizationUpdate(policyKey uint32, value *security_v2.Authorization) error { cKey := C.uint(policyKey) cMsg, err := authorizationToClang(value) diff --git a/pkg/cache/v2/maps/cluster.go b/pkg/cache/v2/maps/cluster.go index cd29529e5..7124f2f52 100644 --- a/pkg/cache/v2/maps/cluster.go +++ b/pkg/cache/v2/maps/cluster.go @@ -24,6 +24,7 @@ package maps // #include "cluster/cluster.pb-c.h" import "C" import ( + "errors" "fmt" "unsafe" @@ -76,6 +77,34 @@ func ClusterLookup(key string, value *cluster_v2.Cluster) error { return err } +func ClusterLookupAll() ([]*cluster_v2.Cluster, error) { + cMsg := C.deserial_lookup_all_elems(unsafe.Pointer(&C.cluster__cluster__descriptor)) + if cMsg == nil { + return nil, errors.New("ClusterLookupAll deserial_lookup_all_elems failed") + } + + elem_list_head := (*C.struct_element_list_node)(cMsg) + defer C.deserial_free_elem_list(elem_list_head) + + var ( + clusters []*cluster_v2.Cluster + err error + ) + for elem_list_head != nil { + cValue := elem_list_head.elem + elem_list_head = elem_list_head.next + cluster := cluster_v2.Cluster{} + err = clusterToGolang(&cluster, (*C.Cluster__Cluster)(cValue)) + log.Debugf("ClusterLookupAll, value [%s]", cluster.String()) + if err != nil { + return nil, err + } + clusters = append(clusters, &cluster) + } + + return clusters, nil +} + func ClusterUpdate(key string, value *cluster_v2.Cluster) error { log.Debugf("ClusterUpdate [%s], [%s]", key, value.String()) diff --git a/pkg/cache/v2/maps/listener.go b/pkg/cache/v2/maps/listener.go index 28978d6b8..a7cd509d1 100644 --- a/pkg/cache/v2/maps/listener.go +++ b/pkg/cache/v2/maps/listener.go @@ -24,6 +24,7 @@ package maps // #include "listener/listener.pb-c.h" import "C" import ( + "errors" "fmt" "unsafe" @@ -65,6 +66,34 @@ func listenerFreeClang(cMsg *C.Listener__Listener) { C.listener__listener__free_unpacked(cMsg, nil) } +func ListenerLookupAll() ([]*listener_v2.Listener, error) { + cMsg := C.deserial_lookup_all_elems(unsafe.Pointer(&C.listener__listener__descriptor)) + if cMsg == nil { + return nil, errors.New("ListenerLookupAll deserial_lookup_all_elems failed") + } + + elem_list_head := (*C.struct_element_list_node)(cMsg) + defer C.deserial_free_elem_list(elem_list_head) + + var ( + listeners []*listener_v2.Listener + err error + ) + for elem_list_head != nil { + cValue := elem_list_head.elem + elem_list_head = elem_list_head.next + listener := listener_v2.Listener{} + err = listenerToGolang(&listener, (*C.Listener__Listener)(cValue)) + log.Debugf("ListenerLookupAll, value [%s]", listener.String()) + if err != nil { + return nil, err + } + listeners = append(listeners, &listener) + } + + return listeners, nil +} + func ListenerLookup(key *core_v2.SocketAddress, value *listener_v2.Listener) error { var err error diff --git a/pkg/cache/v2/maps/route.go b/pkg/cache/v2/maps/route.go index 60f3c6ef0..e0ab9f601 100644 --- a/pkg/cache/v2/maps/route.go +++ b/pkg/cache/v2/maps/route.go @@ -24,6 +24,7 @@ package maps // #include "route/route.pb-c.h" import "C" import ( + "errors" "fmt" "unsafe" @@ -59,6 +60,34 @@ func routeConfigFreeClang(cMsg *C.Route__RouteConfiguration) { C.route__route_configuration__free_unpacked(cMsg, nil) } +func RouteConfigLookupAll() ([]*route_v2.RouteConfiguration, error) { + cMsg := C.deserial_lookup_all_elems(unsafe.Pointer(&C.route__route_configuration__descriptor)) + if cMsg == nil { + return nil, errors.New("RouteConfigLookupAll deserial_lookup_all_elems failed") + } + + elem_list_head := (*C.struct_element_list_node)(cMsg) + defer C.deserial_free_elem_list(elem_list_head) + + var ( + routes []*route_v2.RouteConfiguration + err error + ) + for elem_list_head != nil { + cValue := elem_list_head.elem + elem_list_head = elem_list_head.next + route := route_v2.RouteConfiguration{} + err = routeConfigToGolang(&route, (*C.Route__RouteConfiguration)(cValue)) + log.Debugf("RouteConfigLookupAll, value [%s]", route.String()) + if err != nil { + return nil, err + } + routes = append(routes, &route) + } + + return routes, nil +} + func RouteConfigLookup(key string, value *route_v2.RouteConfiguration) error { var err error diff --git a/pkg/cache/v2/route.go b/pkg/cache/v2/route.go index 929a12ec2..ce2e69e4b 100644 --- a/pkg/cache/v2/route.go +++ b/pkg/cache/v2/route.go @@ -108,23 +108,6 @@ func (cache *RouteConfigCache) Flush() { } } -func (cache *RouteConfigCache) DumpBpf() []*route_v2.RouteConfiguration { - cache.mutex.RLock() - defer cache.mutex.RUnlock() - mapCache := make([]*route_v2.RouteConfiguration, 0, len(cache.apiRouteConfigCache)) - for name, route := range cache.apiRouteConfigCache { - tmp := &route_v2.RouteConfiguration{} - if err := maps_v2.RouteConfigLookup(name, tmp); err != nil { - log.Errorf("RouteConfigLookup failed, %s", name) - continue - } - - tmp.ApiStatus = route.ApiStatus - mapCache = append(mapCache, tmp) - } - return mapCache -} - func (cache *RouteConfigCache) Dump() []*route_v2.RouteConfiguration { cache.mutex.RLock() defer cache.mutex.RUnlock() diff --git a/pkg/cache/v2/route_test.go b/pkg/cache/v2/route_test.go index 1dba73206..a5fcc470a 100644 --- a/pkg/cache/v2/route_test.go +++ b/pkg/cache/v2/route_test.go @@ -17,6 +17,7 @@ package cache_v2 import ( + "sort" "testing" "github.com/agiledragon/gomonkey/v2" @@ -26,10 +27,44 @@ import ( core_v2 "kmesh.net/kmesh/api/v2/core" route_v2 "kmesh.net/kmesh/api/v2/route" + "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/utils/hash" + "kmesh.net/kmesh/pkg/utils/test" ) +func TestRouteConfigLookupAll(t *testing.T) { + // We only use route configuration when L7 enabled + if !bpf.AdsL7Enabled() { + return + } + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + t.Cleanup(cleanup) + testRouteNames := []string{"ut-route-1", "ut-route-2", "ut-route-3"} + for _, testRouteName := range testRouteNames { + err := maps_v2.RouteConfigUpdate(testRouteName, &route_v2.RouteConfiguration{Name: testRouteName}) + assert.Nil(t, err) + } + + routes, err := maps_v2.RouteConfigLookupAll() + assert.Nil(t, err) + + var actualRouteNames []string + + for _, route := range routes { + actualRouteNames = append(actualRouteNames, route.Name) + } + + sort.Strings(actualRouteNames) + assert.Equal(t, actualRouteNames, testRouteNames) +} + func TestRouteFlush(t *testing.T) { t.Run("route status is UPDATE", func(t *testing.T) { updateRouterName := []string{} diff --git a/pkg/status/status_server.go b/pkg/status/status_server.go index fb45d8ff1..18afe368c 100644 --- a/pkg/status/status_server.go +++ b/pkg/status/status_server.go @@ -32,6 +32,8 @@ import ( adminv2 "kmesh.net/kmesh/api/v2/admin" "kmesh.net/kmesh/api/v2/workloadapi/security" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf" + maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller" "kmesh.net/kmesh/pkg/controller/ads" @@ -194,17 +196,25 @@ func (s *Server) bpfAdsMaps(w http.ResponseWriter, r *http.Request) { if !s.checkAdsMode(w) { return } - - client := s.xdsClient - w.WriteHeader(http.StatusOK) - cache := client.AdsController.Processor.Cache + var err error dynamicRes := &adminv2.ConfigResources{} - - dynamicRes.ClusterConfigs = cache.ClusterCache.DumpBpf() - dynamicRes.ListenerConfigs = cache.ListenerCache.DumpBpf() - dynamicRes.RouteConfigs = cache.RouteCache.DumpBpf() + dynamicRes.ClusterConfigs, err = maps_v2.ClusterLookupAll() + if err != nil { + log.Errorf("ClusterLookupAll failed: %v", err) + } + dynamicRes.ListenerConfigs, err = maps_v2.ListenerLookupAll() + if err != nil { + log.Errorf("ListenerLookupAll failed: %v", err) + } + if bpf.AdsL7Enabled() { + dynamicRes.RouteConfigs, err = maps_v2.RouteConfigLookupAll() + if err != nil { + log.Errorf("RouteConfigLookupAll failed: %v", err) + } + } ads.SetApiVersionInfo(dynamicRes) + w.WriteHeader(http.StatusOK) fmt.Fprintln(w, protojson.Format(&adminv2.ConfigDump{ DynamicResources: dynamicRes, })) diff --git a/pkg/status/status_server_test.go b/pkg/status/status_server_test.go index 8e4a46d49..1ed1649ba 100644 --- a/pkg/status/status_server_test.go +++ b/pkg/status/status_server_test.go @@ -30,12 +30,19 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/encoding/protojson" "istio.io/istio/pilot/test/util" + "kmesh.net/kmesh/api/v2/admin" + "kmesh.net/kmesh/api/v2/cluster" + "kmesh.net/kmesh/api/v2/core" + "kmesh.net/kmesh/api/v2/listener" "kmesh.net/kmesh/api/v2/workloadapi" "kmesh.net/kmesh/daemon/options" + maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller" + "kmesh.net/kmesh/pkg/controller/ads" "kmesh.net/kmesh/pkg/controller/workload" "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" @@ -403,3 +410,76 @@ func TestServer_dumpWorkloadBpfMap(t *testing.T) { fmt.Printf("Dump: %v\n", dump) }) } + +func TestServer_dumpAdsBpfMap(t *testing.T) { + t.Run("Workload mode test", func(t *testing.T) { + config := options.BpfConfig{ + Mode: "workload", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + defer cleanup() + + // workload mode will failed + server := &Server{} + req := httptest.NewRequest(http.MethodGet, patternBpfWorkloadMaps, nil) + w := httptest.NewRecorder() + server.configDumpWorkload(w, req) + + body, err := io.ReadAll(w.Body) + assert.Nil(t, err) + assert.Equal(t, invalidModeErrMessage, string(body)) + }) + + t.Run("Ads mode test", func(t *testing.T) { + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + defer cleanup() + + server := &Server{ + xdsClient: &controller.XdsClient{ + AdsController: &ads.Controller{}, + }, + } + + testClusterKeys := []string{"t1", "t2"} + testClusters := []*cluster.Cluster{ + {Name: testClusterKeys[0]}, + {Name: testClusterKeys[1]}, + } + + for index, testClusterKey := range testClusterKeys { + testCluster := testClusters[index] + maps_v2.ClusterUpdate(testClusterKey, testCluster) + } + + testListenerKeys := []*core.SocketAddress{ + {Port: 1}, {Port: 2}, + } + testListeners := []*listener.Listener{{Name: "t1"}, {Name: "t2"}} + + for index, testListenerKey := range testListenerKeys { + testListener := testListeners[index] + maps_v2.ListenerUpdate(testListenerKey, testListener) + } + + req := httptest.NewRequest(http.MethodGet, patternBpfAdsMaps, nil) + w := httptest.NewRecorder() + server.bpfAdsMaps(w, req) + body, err := io.ReadAll(w.Body) + fmt.Printf("dump: %s\n", string(body)) + assert.Nil(t, err) + + dump := admin.ConfigDump{} + err = protojson.Unmarshal(body, &dump) + assert.Nil(t, err) + + assert.Equal(t, len(testClusters), len(dump.DynamicResources.ClusterConfigs)) + assert.Equal(t, len(testListeners), len(dump.DynamicResources.ListenerConfigs)) + }) +}