Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ads bpf map lookup all #827

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 117 additions & 6 deletions bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
#include "deserialization_to_bpf_map.h"
#include "../../config/kmesh_marcos_def.h"

#define LOG_ERR(fmt, args...) printf(fmt, ##args)
#define LOG_WARN(fmt, args...) printf(fmt, ##args)
#define LOG_INFO(fmt, args...) printf(fmt, ##args)
#define PRINTF(fmt, args...) \
do { \
printf(fmt, ##args); \
fflush(stdout); \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without fflush we cannot see the log.
the c printf uses a buffer.

I met this when i am debugging.

} while (0)

#define LOG_ERR(fmt, args...) PRINTF(fmt, ##args)
#define LOG_WARN(fmt, args...) PRINTF(fmt, ##args)
#define LOG_INFO(fmt, args...) PRINTF(fmt, ##args)

struct op_context {
void *key;
Expand All @@ -37,10 +43,10 @@ struct op_context {
const ProtobufCMessageDescriptor *desc;
};

#define init_op_context(context, key, val, desc, o_fd, fd, o_info, i_info, m_info) \
#define init_op_context(context, k, v, desc, o_fd, fd, o_info, i_info, m_info) \
do { \
(context).key = (key); \
(context).value = (val); \
(context).key = (k); \
(context).value = (v); \
(context).desc = (desc); \
(context).outter_fd = (o_fd); \
(context).map_fd = (fd); \
Expand Down Expand Up @@ -771,6 +777,56 @@ static int repeat_field_query(struct op_context *ctx, const ProtobufCFieldDescri
return ret;
}

void deserial_free_elem_list(struct element_list_node *head)
{
while (head != NULL) {
struct element_list_node *n = head;
deserial_free_elem(n->elem);
head = n->next;
free(n);
}
}

static void *create_struct_list(struct op_context *ctx, int *err)
{
void *prev_key = NULL;
void *value;
struct element_list_node *elem_list_head = NULL;
struct element_list_node *curr_elem_list_node = NULL;

*err = 0;
ctx->key = calloc(1, ctx->curr_info->key_size);
while (!bpf_map_get_next_key(ctx->curr_fd, prev_key, ctx->key)) {
prev_key = ctx->key;

value = create_struct(ctx, err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When create_struct returns error, value may not be NULL. create_struct needs to be optimized. When an error is returned, the value is released in the function instead of depending on the caller. Currently, leakage may occur in create_struct_list.

if (*err != 0) {
LOG_ERR("create_struct failed, err = %d\n", err);
deserial_free_elem_list(elem_list_head);
Copy link
Contributor

@nlgwcy nlgwcy Sep 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an error occurs, resources must be reclaimed and returned in a unified manner. The recommended format is as follows:

while (!bpf_map_get_next_key(ctx->curr_fd, prev_key, ctx->key)) {
    value = create_struct(ctx, err);
        if (*err != 0)
           break;
    ......
    struct element_list_node *new_node = (struct element_list_node *)calloc(1, sizeof(struct element_list_node));
        if (!new_node) {
        *err = -1;
        break;
    }
    .......
}

if (*err) {
    deserial_free_elem_list(elem_list_head);
    return NULL;
}
return elem_list_head;

value = NULL;
}

if (value == NULL) {
continue;
}

struct element_list_node *new_node = (struct element_list_node *)calloc(1, sizeof(struct element_list_node));
if (!new_node) {
deserial_free_elem_list(elem_list_head);
return NULL;
}
new_node->elem = value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are basic operations for updating list. It is recommended that you encapsulate them into independent functions or macros or call existing libraries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will modify it.

new_node->next = NULL;
if (curr_elem_list_node == NULL) {
curr_elem_list_node = elem_list_head = new_node;
} else {
curr_elem_list_node->next = new_node;
curr_elem_list_node = new_node;
}
}
return elem_list_head;
}

static void *create_struct(struct op_context *ctx, int *err)
{
void *value;
Expand Down Expand Up @@ -821,6 +877,61 @@ static void *create_struct(struct op_context *ctx, int *err)
return value;
}

struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor)
{
int ret, err;
struct element_list_node *value_list_head = NULL;
const char *map_name = NULL;
struct op_context context = {.inner_map_object = NULL};
const ProtobufCMessageDescriptor *desc;
struct bpf_map_info outter_info = {0}, inner_info = {0}, info = {0};
int map_fd, outter_fd = 0, inner_fd = 0;
unsigned int id, outter_id = 0, inner_id = 0;

if (msg_desciptor == NULL)
return NULL;

desc = (ProtobufCMessageDescriptor *)msg_desciptor;
if (desc->magic != PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC)
return NULL;

map_name = desc->short_name;
ret = get_map_ids(map_name, &id, &outter_id, &inner_id);
if (ret)
return NULL;

ret = get_map_fd_info(id, &map_fd, &info);
if (ret < 0) {
LOG_ERR("invalid MAP_ID: %d\n", id);
return NULL;
}

ret = get_map_fd_info(inner_id, &inner_fd, &inner_info);
ret |= get_map_fd_info(outter_id, &outter_fd, &outter_info);
if (ret < 0 || map_info_check(&outter_info, &inner_info))
goto end;

init_op_context(context, NULL, NULL, desc, outter_fd, map_fd, &outter_info, &inner_info, &info);

value_list_head = create_struct_list(&context, &err);
if (err != 0) {
LOG_ERR("create_struct_list failed, err = %d", err);
deserial_free_elem_list(value_list_head);
value_list_head = NULL;
}

end:
if (context.key != NULL)
free(context.key);
if (map_fd > 0)
close(map_fd);
if (outter_fd > 0)
close(outter_fd);
if (inner_fd > 0)
close(inner_fd);
return value_list_head;
}

void *deserial_lookup_elem(void *key, const void *msg_desciptor)
{
int ret, err;
Expand Down
7 changes: 7 additions & 0 deletions bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
/* equal MAP_SIZE_OF_OUTTER_MAP */
#define MAX_OUTTER_MAP_ENTRIES (8192)

struct element_list_node {
void *elem;
struct element_list_node *next;
};

int deserial_update_elem(void *key, void *value);
void *deserial_lookup_elem(void *key, const void *msg_desciptor);
struct element_list_node *deserial_lookup_all_elems(const void *msg_desciptor);
void deserial_free_elem(void *value);
void deserial_free_elem_list(struct element_list_node *head);
int deserial_delete_elem(void *key, const void *msg_desciptor);

int deserial_init();
Expand Down
4 changes: 4 additions & 0 deletions pkg/bpf/bpf_kmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,7 @@ func (sc *BpfKmesh) Detach() error {
}
return nil
}

func AdsL7Enabled() bool {
return true
}
4 changes: 4 additions & 0 deletions pkg/bpf/bpf_kmesh_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@
}
return nil
}

func AdsL7Enabled() bool {
return false

Check warning on line 116 in pkg/bpf/bpf_kmesh_l4.go

View check run for this annotation

Codecov / codecov/patch

pkg/bpf/bpf_kmesh_l4.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}
16 changes: 3 additions & 13 deletions pkg/cache/v2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,10 @@
}

func (cache *ClusterCache) DumpBpf() []*cluster_v2.Cluster {
cache.mutex.RLock()
defer cache.mutex.RUnlock()
clusters := make([]*cluster_v2.Cluster, 0, len(cache.apiClusterCache))
for name, c := range cache.apiClusterCache {
tmp := &cluster_v2.Cluster{}
if err := maps_v2.ClusterLookup(name, tmp); err != nil {
log.Errorf("ClusterLookup failed, %s", name)
continue
}

tmp.ApiStatus = c.ApiStatus
clusters = append(clusters, tmp)
clusters, err := maps_v2.ClusterLookupAll()
if err != nil {
log.Errorf("ClusterLookup failed, %v", err)

Check warning on line 170 in pkg/cache/v2/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/cluster.go#L168-L170

Added lines #L168 - L170 were not covered by tests
}

return clusters
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/cache/v2/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cache_v2

import (
"sort"
"testing"

"github.com/agiledragon/gomonkey/v2"
Expand Down Expand Up @@ -177,6 +178,33 @@ func TestClusterFlush(t *testing.T) {
})
}

func TestClusterLookupAll(t *testing.T) {
config := options.BpfConfig{
Mode: "ads",
BpfFsPath: "/sys/fs/bpf",
Cgroup2Path: "/mnt/kmesh_cgroup2",
}
cleanup, _ := test.InitBpfMap(t, config)
t.Cleanup(cleanup)
testClusterNames := []string{"ut-cluster-1", "ut-cluster-2", "ut-cluster-3"}
for _, testClusterName := range testClusterNames {
err := maps_v2.ClusterUpdate(testClusterName, &cluster_v2.Cluster{Name: testClusterName})
assert.Nil(t, err)
}

clusters, err := maps_v2.ClusterLookupAll()
assert.Nil(t, err)

var actualClusterNames []string

for _, cluster := range clusters {
actualClusterNames = append(actualClusterNames, cluster.Name)
}

sort.Strings(actualClusterNames)
assert.Equal(t, actualClusterNames, testClusterNames)
}

func BenchmarkClusterFlush(b *testing.B) {
t := &testing.T{}
config := options.BpfConfig{
Expand Down
16 changes: 3 additions & 13 deletions pkg/cache/v2/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,10 @@
}

func (cache *ListenerCache) DumpBpf() []*listener_v2.Listener {
cache.mutex.RLock()
defer cache.mutex.RUnlock()
listeners := make([]*listener_v2.Listener, 0, len(cache.apiListenerCache))
for name, listener := range cache.apiListenerCache {
tmp := &listener_v2.Listener{}
if err := maps_v2.ListenerLookup(listener.GetAddress(), tmp); err != nil {
log.Errorf("ListenerLookup failed, %s", name)
continue
}

tmp.ApiStatus = listener.ApiStatus
listeners = append(listeners, tmp)
listeners, err := maps_v2.ListenerLookupAll()
if err != nil {
log.Errorf("ListenerLookupAll failed, %v", err)

Check warning on line 133 in pkg/cache/v2/listener.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/listener.go#L131-L133

Added lines #L131 - L133 were not covered by tests
}

return listeners
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/cache/v2/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache_v2

import (
"fmt"
"sort"
"testing"

"github.com/agiledragon/gomonkey/v2"
Expand All @@ -36,6 +37,35 @@ import (
"kmesh.net/kmesh/pkg/utils/test"
)

func TestListenerLookupAll(t *testing.T) {
config := options.BpfConfig{
Mode: "ads",
BpfFsPath: "/sys/fs/bpf",
Cgroup2Path: "/mnt/kmesh_cgroup2",
}
cleanup, _ := test.InitBpfMap(t, config)
t.Cleanup(cleanup)
testListenerNames := []string{"ut-listener-1", "ut-listener-2", "ut-listener-3"}
for i, testListenerName := range testListenerNames {
err := maps_v2.ListenerUpdate(&core_v2.SocketAddress{
Port: uint32(i + 1),
}, &listener_v2.Listener{Name: testListenerName})
assert.Nil(t, err)
}

listeners, err := maps_v2.ListenerLookupAll()
assert.Nil(t, err)

var actualListenerNames []string

for _, listener := range listeners {
actualListenerNames = append(actualListenerNames, listener.Name)
}

sort.Strings(actualListenerNames)
assert.Equal(t, actualListenerNames, testListenerNames)
}

func TestListenerFlush(t *testing.T) {
t.Run("listener status is UPDATE", func(t *testing.T) {
updateListenerAddress := []*core_v2.SocketAddress{}
Expand Down
29 changes: 29 additions & 0 deletions pkg/cache/v2/maps/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// #include "workloadapi/security/authorization.pb-c.h"
import "C"
import (
"errors"
"fmt"
"unsafe"

Expand Down Expand Up @@ -71,6 +72,34 @@
return err
}

func AuthorizationLookupAll() ([]*security_v2.Authorization, error) {
cMsg := C.deserial_lookup_all_elems(unsafe.Pointer(&C.istio__security__authorization__descriptor))
if cMsg == nil {
return nil, errors.New("AuthorizationLookupAll deserial_lookup_all_elems failed")

Check warning on line 78 in pkg/cache/v2/maps/authz.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/maps/authz.go#L75-L78

Added lines #L75 - L78 were not covered by tests
}

elem_list_head := (*C.struct_element_list_node)(cMsg)
defer C.deserial_free_elem_list(elem_list_head)

Check warning on line 82 in pkg/cache/v2/maps/authz.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/maps/authz.go#L81-L82

Added lines #L81 - L82 were not covered by tests

var (
authorizations []*security_v2.Authorization
err error
)
for elem_list_head != nil {
cValue := elem_list_head.elem
elem_list_head = elem_list_head.next
authorization := security_v2.Authorization{}
err = authorizationToGolang(&authorization, (*C.Istio__Security__Authorization)(cValue))
log.Debugf("AuthorizationLookupAll, value [%s]", authorization.String())
if err != nil {
return nil, err

Check warning on line 95 in pkg/cache/v2/maps/authz.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/maps/authz.go#L84-L95

Added lines #L84 - L95 were not covered by tests
}
authorizations = append(authorizations, &authorization)

Check warning on line 97 in pkg/cache/v2/maps/authz.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/maps/authz.go#L97

Added line #L97 was not covered by tests
}

return authorizations, nil

Check warning on line 100 in pkg/cache/v2/maps/authz.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/maps/authz.go#L100

Added line #L100 was not covered by tests
}

func AuthorizationUpdate(policyKey uint32, value *security_v2.Authorization) error {
cKey := C.uint(policyKey)
cMsg, err := authorizationToClang(value)
Expand Down
Loading
Loading