Skip to content

Commit

Permalink
feat: use gvisor parse network packet in pod (#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencaiwulue authored Nov 15, 2024
1 parent cad5d23 commit 2aa7812
Show file tree
Hide file tree
Showing 24 changed files with 352 additions and 404 deletions.
6 changes: 1 addition & 5 deletions cmd/kubevpn/cmds/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
kubevpn clone service/productpage --ssh-addr <HOST:PORT> --ssh-username <USERNAME> --gssapi-password <PASSWORD>
`)),
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
// not support temporally
if options.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}
util.InitLoggerForClient(false)
// startup daemon process and sudo process
return daemon.StartupDaemon(cmd.Context())
Expand Down Expand Up @@ -164,7 +160,7 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false")
cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&options.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().StringVar((*string)(&options.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem))

cmd.Flags().StringVar(&options.TargetImage, "target-image", "", "Clone container use this image to startup container, if not special, use origin image")
cmd.Flags().StringVar(&options.TargetContainer, "target-container", "", "Clone container use special image to startup this container, if not special, use origin image")
Expand Down
6 changes: 1 addition & 5 deletions cmd/kubevpn/cmds/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
// not support temporally
if connect.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -166,7 +162,7 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
cmd.Flags().BoolVar(&config.Debug, "debug", false, "enable debug mode or not, true or false")
cmd.Flags().StringVar(&config.Image, "image", config.Image, "use this image to startup container")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&connect.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().StringVar((*string)(&connect.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem))
cmd.Flags().BoolVar(&foreground, "foreground", false, "Hang up")
cmd.Flags().BoolVar(&lite, "lite", false, "connect to multiple cluster in lite mode. mode \"lite\": design for only connecting to multiple cluster network. mode \"full\": not only connect to cluster network, it also supports proxy workloads inbound traffic to local PC.")

Expand Down
2 changes: 1 addition & 1 deletion cmd/kubevpn/cmds/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func CmdControlPlane(_ cmdutil.Factory) *cobra.Command {
`)),
RunE: func(cmd *cobra.Command, args []string) error {
util.InitLoggerForServer(config.Debug)
go util.StartupPProf(0)
go util.StartupPProfForServer(0)
go func(ctx context.Context) {
conf, err := miekgdns.ClientConfigFromFile(resolvconf.Path())
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions cmd/kubevpn/cmds/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ func CmdDev(f cmdutil.Factory) *cobra.Command {
return err
}
util.InitLoggerForClient(config.Debug)
// not support temporally
if options.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}

if p := options.RunOptions.Platform; p != "" {
if _, err = platforms.Parse(p); err != nil {
Expand Down Expand Up @@ -144,7 +140,7 @@ func CmdDev(f cmdutil.Factory) *cobra.Command {
cmdutil.CheckErr(cmd.RegisterFlagCompletionFunc("container", completion.ContainerCompletionFunc(f)))
cmd.Flags().StringVar((*string)(&options.ConnectMode), "connect-mode", string(dev.ConnectModeHost), "Connect to kubernetes network in container or in host, eg: ["+string(dev.ConnectModeContainer)+"|"+string(dev.ConnectModeHost)+"]")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&options.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().StringVar((*string)(&options.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem))

// diy docker options
cmd.Flags().StringVar(&options.DevImage, "dev-image", "", "Use to startup docker container, Default is pod image")
Expand Down
6 changes: 1 addition & 5 deletions cmd/kubevpn/cmds/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
if err = daemon.StartupDaemon(cmd.Context()); err != nil {
return err
}
// not support temporally
if connect.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}
return err
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -186,7 +182,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false")
cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&connect.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().StringVar((*string)(&connect.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem))
cmd.Flags().BoolVar(&foreground, "foreground", false, "foreground hang up")

handler.AddExtraRoute(cmd.Flags(), extraRoute)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubevpn/cmds/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command {
PreRun: func(*cobra.Command, []string) {
util.InitLoggerForServer(config.Debug)
runtime.GOMAXPROCS(0)
go util.StartupPProf(0)
go util.StartupPProfForServer(6060)
},
RunE: func(cmd *cobra.Command, args []string) error {
rand.Seed(time.Now().UnixNano())
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubevpn/cmds/syncthing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func CmdSyncthing(_ cmdutil.Factory) *cobra.Command {
Short: i18n.T("Syncthing"),
Long: templates.LongDesc(i18n.T(`Syncthing`)),
RunE: func(cmd *cobra.Command, args []string) (err error) {
go util.StartupPProf(0)
go util.StartupPProfForServer(0)
return syncthing.StartServer(cmd.Context(), detach, dir)
},
Hidden: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubevpn/cmds/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func CmdWebhook(f cmdutil.Factory) *cobra.Command {
Args: cobra.MaximumNArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
util.InitLoggerForServer(true)
go util.StartupPProf(0)
go util.StartupPProfForServer(0)
},
RunE: func(cmd *cobra.Command, args []string) error {
return webhook.Main(f)
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ type Engine string

const (
EngineGvisor Engine = "gvisor"
EngineMix Engine = "mix"
EngineRaw Engine = "raw"
EngineSystem Engine = "system"
)

const Slogan = "Now you can access resources in the kubernetes cluster !"
27 changes: 27 additions & 0 deletions pkg/core/gvisoricmpforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package core

import (
"context"

log "github.com/sirupsen/logrus"
"gvisor.dev/gvisor/pkg/tcpip/stack"

"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)

func ICMPForwarder(s *stack.Stack, ctx context.Context) func(stack.TransportEndpointID, *stack.PacketBuffer) bool {
return func(id stack.TransportEndpointID, buffer *stack.PacketBuffer) bool {
log.Debugf("[TUN-ICMP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s",
id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(),
)
ctx1, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
ok, err := util.PingOnce(ctx1, id.RemoteAddress.String(), id.LocalAddress.String())
if err != nil {
log.Debugf("[TUN-ICMP] Failed to ping dst %s from src %s",
id.LocalAddress.String(), id.RemoteAddress.String(),
)
}
return ok
}
}
17 changes: 10 additions & 7 deletions pkg/core/gvisorstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

func NewStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack {
nicID := tcpip.NICID(1)
s := stack.New(stack.Options{
NetworkProtocols: []stack.NetworkProtocolFactory{
ipv4.NewProtocol,
Expand All @@ -33,26 +34,28 @@ func NewStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack {
RawFactory: raw.EndpointFactory{},
})
// set handler for TCP UDP ICMP
s.SetTransportProtocolHandler(tcp.ProtocolNumber, TCPForwarder(s))
s.SetTransportProtocolHandler(udp.ProtocolNumber, UDPForwarder(s))
s.SetTransportProtocolHandler(tcp.ProtocolNumber, TCPForwarder(s, ctx))
s.SetTransportProtocolHandler(udp.ProtocolNumber, UDPForwarder(s, ctx))
s.SetTransportProtocolHandler(header.ICMPv4ProtocolNumber, ICMPForwarder(s, ctx))
s.SetTransportProtocolHandler(header.ICMPv6ProtocolNumber, ICMPForwarder(s, ctx))

s.SetRouteTable([]tcpip.Route{
{
Destination: header.IPv4EmptySubnet,
NIC: 1,
NIC: nicID,
},
{
Destination: header.IPv6EmptySubnet,
NIC: 1,
NIC: nicID,
},
})

s.CreateNICWithOptions(1, packetsocket.New(tun), stack.NICOptions{
s.CreateNICWithOptions(nicID, packetsocket.New(tun), stack.NICOptions{
Disabled: false,
Context: ctx,
})
s.SetPromiscuousMode(1, true)
s.SetSpoofing(1, true)
s.SetPromiscuousMode(nicID, true)
s.SetSpoofing(nicID, true)

// Enable SACK Recovery.
{
Expand Down
31 changes: 10 additions & 21 deletions pkg/core/gvisortcpforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"time"

log "github.com/sirupsen/logrus"
"gvisor.dev/gvisor/pkg/tcpip"
Expand All @@ -18,35 +20,22 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)

var GvisorTCPForwardAddr string

func TCPForwarder(s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketBuffer) bool {
GvisorTCPForwardAddr := GvisorTCPForwardAddr
func TCPForwarder(s *stack.Stack, ctx context.Context) func(stack.TransportEndpointID, *stack.PacketBuffer) bool {
return tcp.NewForwarder(s, 0, 100000, func(request *tcp.ForwarderRequest) {
defer request.Complete(false)
id := request.ID()
log.Debugf("[TUN-TCP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s",
id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(),
)

node, err := ParseNode(GvisorTCPForwardAddr)
if err != nil {
log.Errorf("[TUN-TCP] Failed to parse gvisor tcp forward addr %s: %v", GvisorTCPForwardAddr, err)
return
}
node.Client = &Client{
Connector: GvisorTCPTunnelConnector(),
Transporter: TCPTransporter(),
}
forwardChain := NewChain(5, node)

remote, err := forwardChain.dial(context.Background())
// 2, dial proxy
host := id.LocalAddress.String()
port := fmt.Sprintf("%d", id.LocalPort)
var remote net.Conn
var d = net.Dialer{Timeout: time.Second * 5}
remote, err := d.DialContext(ctx, "tcp", net.JoinHostPort(host, port))
if err != nil {
log.Debugf("[TUN-TCP] Failed to dial remote conn: %v", err)
return
}
if err = WriteProxyInfo(remote, id); err != nil {
log.Debugf("[TUN-TCP] Failed to write proxy info: %v", err)
log.Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err)
return
}

Expand Down
96 changes: 32 additions & 64 deletions pkg/core/gvisortcphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,62 @@ package core

import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"sync"

log "github.com/sirupsen/logrus"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/link/channel"
"gvisor.dev/gvisor/pkg/tcpip/transport/tcp"

"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)

type gvisorTCPTunnelConnector struct {
type gvisorTCPHandler struct {
// map[srcIP]net.Conn
routeMapTCP *sync.Map
packetChan chan *datagramPacket
}

func GvisorTCPTunnelConnector() Connector {
return &gvisorTCPTunnelConnector{}
}

func (c *gvisorTCPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) {
switch con := conn.(type) {
case *net.TCPConn:
err := con.SetNoDelay(true)
if err != nil {
return nil, err
}
err = con.SetKeepAlive(true)
if err != nil {
return nil, err
}
err = con.SetKeepAlivePeriod(15 * time.Second)
if err != nil {
return nil, err
}
}
return conn, nil
}

type gvisorTCPHandler struct{}

func GvisorTCPHandler() Handler {
return &gvisorTCPHandler{}
return &gvisorTCPHandler{
routeMapTCP: RouteMapTCP,
packetChan: TCPPacketChan,
}
}

func (h *gvisorTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) {
defer tcpConn.Close()
log.Debugf("[TUN-TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
// 1, get proxy info
endpointID, err := ParseProxyInfo(tcpConn)
if err != nil {
log.Errorf("[TUN-TCP] Failed to parse proxy info: %v", err)
return
}
log.Debugf("[TUN-TCP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s",
endpointID.LocalPort, endpointID.LocalAddress.String(), endpointID.RemotePort, endpointID.RemoteAddress.String(),
)
// 2, dial proxy
host := endpointID.LocalAddress.String()
port := fmt.Sprintf("%d", endpointID.LocalPort)
var remote net.Conn
remote, err = net.DialTimeout("tcp", net.JoinHostPort(host, port), time.Second*5)
if err != nil {
log.Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err)
return
}
cancel, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
log.Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
h.handle(cancel, tcpConn)
}

func (h *gvisorTCPHandler) handle(ctx context.Context, tcpConn net.Conn) {
endpoint := channel.New(tcp.DefaultReceiveBufferSize, uint32(config.DefaultMTU), tcpip.GetRandMacAddr())
errChan := make(chan error, 2)
go func() {
i := config.LPool.Get().([]byte)[:]
defer config.LPool.Put(i[:])
written, err2 := io.CopyBuffer(remote, tcpConn, i)
log.Debugf("[TUN-TCP] Write length %d data to remote", written)
errChan <- err2
h.readFromTCPConnWriteToEndpoint(ctx, tcpConn, endpoint)
util.SafeClose(errChan)
}()
go func() {
i := config.LPool.Get().([]byte)[:]
defer config.LPool.Put(i[:])
written, err2 := io.CopyBuffer(tcpConn, remote, i)
log.Debugf("[TUN-TCP] Read length %d data from remote", written)
errChan <- err2
h.readFromEndpointWriteToTCPConn(ctx, tcpConn, endpoint)
util.SafeClose(errChan)
}()
err = <-errChan
if err != nil && !errors.Is(err, io.EOF) {
log.Debugf("[TUN-TCP] Disconnect: %s >-<: %s: %v", tcpConn.LocalAddr(), remote.RemoteAddr(), err)
stack := NewStack(ctx, endpoint)
defer stack.Destroy()
select {
case <-errChan:
return
case <-ctx.Done():
return
}
}

func GvisorTCPListener(addr string) (net.Listener, error) {
log.Debugf("Gvisor tcp listen addr %s", addr)
log.Debugf("Gvisor TCP listening addr: %s", addr)
laddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 2aa7812

Please sign in to comment.