Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple peers for the "chat-with-mdns" example #2993

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions examples/chat-with-mdns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,38 @@ Use two different terminal windows to run
./chat-with-mdns -port 6668
```


## So how does it work?

1. **Configure a p2p host**

```go
ctx := context.Background()

// libp2p.New constructs a new libp2p Host.
// Other options can be added here.
host, err := libp2p.New()
```

[libp2p.New](https://godoc.org/github.com/libp2p/go-libp2p#New) is the constructor for libp2p node. It creates a host with given configuration.

2. **Set a default handler function for incoming connections.**

This function is called on the local peer when a remote peer initiate a connection and starts a stream with the local peer.

```go
// Set a function as stream handler.
host.SetStreamHandler("/chat/1.1.0", handleStream)
```

```handleStream``` is executed for each new stream incoming to the local peer. ```stream``` is used to exchange data between local and remote peer. This example uses non blocking functions for reading and writing from this stream.
```handleStream``` is executed for each new stream incoming to the local peer. ```stream``` is used to exchange data between local and remote peer. This example uses non blocking functions for reading from this stream.

```go
func handleStream(stream net.Stream) {

// Create a buffer stream for non blocking read and write.
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
// Create a buffer stream for non blocking read.
r := bufio.NewReader(stream)

go readData(rw)
go writeData(rw)

// 'stream' will stay open until you close it (or the other side closes it).
go readData(r)
}
```

Expand All @@ -63,46 +62,47 @@ New [mdns discovery](https://godoc.org/github.com/libp2p/go-libp2p/p2p/discovery
notifee := &discoveryNotifee{PeerChan: make(chan peer.AddrInfo)}
ser, err := discovery.NewMdnsService(peerhost, rendezvous, notifee)
```

register [Notifee interface](https://godoc.org/github.com/libp2p/go-libp2p/p2p/discovery#Notifee) with service so that we get notified about peer discovery

```go
ser.Start()
ser.Start()
```



4. **Open streams to peers found.**

Finally we open stream to the peers we found, as we find them

```go
peer := <-peerChan // will block until we discover a peer
// this is used to avoid call `NewStream` from both side
if peer.ID > host.ID() {
// if other end peer id greater than us, don't connect to it, just wait for it to connect us
fmt.Println("Found peer:", peer, " id is greater than us, wait for it to connect to us")
continue
}
fmt.Println("Found peer:", peer, ", connecting")

if err := host.Connect(ctx, peer); err != nil {
fmt.Println("Connection failed:", err)
continue
}

// open a stream, this stream will be handled by handleStream other end
stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

if err != nil {
fmt.Println("Stream open failed", err)
} else {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

go writeData(rw)
go readData(rw)
fmt.Println("Connected to:", peer)
}
peer := <-peerChan // will block until we discover a peer
fmt.Println("Found peer:", peer, ", connecting")

if err := host.Connect(ctx, peer); err != nil {
fmt.Println("Connection failed:", err)
continue
}

// open a stream, this stream will be handled by handleStream other end
stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))
if err != nil {
fmt.Println("Stream open failed", err)

continue
}

p := &Peer{
id: string(peer.ID),
stream: stream,
}
p.Start()

fmt.Println("Connected to:", peer.ID)

peers.AddPeer(p)
peers.WriteAll(fmt.Sprintf("%s Joined", host.ID().String()))
}
```

## Authors

1. Bineesh Lazar
93 changes: 45 additions & 48 deletions examples/chat-with-mdns/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,33 @@ import (
)

func handleStream(stream network.Stream) {
fmt.Println("Got a new stream!")

// Create a buffer stream for non-blocking read and write.
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

go readData(rw)
go writeData(rw)
r := bufio.NewReader(stream)

// 'stream' will stay open until you close it (or the other side closes it).
go readData(r)
}

func readData(rw *bufio.ReadWriter) {
// Read messages from connected peers
// In this example, we only read from the peers but use another Stream
// to broadcast the messages
func readData(r *bufio.Reader) {
for {
str, err := rw.ReadString('\n')
str, err := r.ReadString('\n')
if err != nil {
fmt.Println("Error reading from buffer")
panic(err)

return
}

if str == "" {
return
}

if str != "\n" {
// Green console colour: \x1b[32m
// Reset console colour: \x1b[0m
fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
}

}
}

func writeData(rw *bufio.ReadWriter) {
stdReader := bufio.NewReader(os.Stdin)

for {
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from stdin")
panic(err)
}

_, err = rw.WriteString(fmt.Sprintf("%s\n", sendData))
if err != nil {
fmt.Println("Error writing to buffer")
panic(err)
}
err = rw.Flush()
if err != nil {
fmt.Println("Error flushing buffer")
panic(err)
}
}
}

Expand Down Expand Up @@ -98,7 +73,7 @@ func main() {
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", cfg.listenHost, cfg.listenPort))

// libp2p.New constructs a new libp2p Host.
// Other options can be added here.
// Other options can be added here
host, err := libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
Expand All @@ -113,32 +88,54 @@ func main() {

fmt.Printf("\n[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", cfg.listenHost, cfg.listenPort, host.ID())

peers := &Peers{
peers: map[string]*Peer{},
}

go func() {
stdReader := bufio.NewReader(os.Stdin)

for {
fmt.Print("> ")

sendData, err := stdReader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from stdin")
panic(err)
}

peers.SendAll(fmt.Sprintf("%s: %s", host.ID().String(), sendData))
}
}()

peerChan := initMDNS(host, cfg.RendezvousString)
for { // allows multiple peers to join
peer := <-peerChan // will block until we discover a peer
if peer.ID > host.ID() {
// if other end peer id greater than us, don't connect to it, just wait for it to connect us
fmt.Println("Found peer:", peer, " id is greater than us, wait for it to connect to us")
continue
}
fmt.Println("Found peer:", peer, ", connecting")
fmt.Println("Found peer:", peer.ID, ", connecting")

if err := host.Connect(ctx, peer); err != nil {
fmt.Println("Connection failed:", err)

continue
}

// open a stream, this stream will be handled by handleStream other end
stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

if err != nil {
fmt.Println("Stream open failed", err)
} else {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

go writeData(rw)
go readData(rw)
fmt.Println("Connected to:", peer)
continue
}

p := &Peer{
id: string(peer.ID),
stream: stream,
}
p.Start()

fmt.Println("Connected to:", peer.ID)

peers.AddPeer(p)
peers.SendAll(fmt.Sprintf("%s Joined", host.ID().String()))
}
}
80 changes: 80 additions & 0 deletions examples/chat-with-mdns/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"bufio"
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/network"
)

type Peers struct {
peers map[string]*Peer
peerMu sync.Mutex
}

func (p *Peers) AddPeer(peer *Peer) {
p.peerMu.Lock()
defer p.peerMu.Unlock()

if _, ok := p.peers[peer.id]; !ok {
p.peers[peer.id] = peer
}
}

func (p *Peers) SendAll(msg string) {
for _, peer := range p.peers {
peer.write(msg)
}
}

type Peer struct {
stream network.Stream
stdinCh chan string
stopCh chan any
id string
}

func (p *Peer) Start() {
w := bufio.NewWriter(bufio.NewWriter(p.stream))

p.stopCh = make(chan any, 1)
p.stdinCh = make(chan string, 10)

go p.writeData(w, p.stdinCh, p.stopCh)
}

func (p *Peer) writeData(w *bufio.Writer, inCh chan string, stopCh chan any) {
for {
select {
case in := <-inCh:
_, err := fmt.Fprintf(w, "%s\n", in)
if err != nil {
fmt.Printf("Error writing to buffer: %s\n", err.Error())

return
}

err = w.Flush()
if err != nil {
fmt.Printf("Error flushing buffer: %s\n", err.Error())

return
}

case <-stopCh:
return
}
}
}

func (p *Peer) Stop() {
defer close(p.stopCh)
defer close(p.stdinCh)

p.stopCh <- struct{}{}
}

func (p *Peer) write(msg string) {
p.stdinCh <- msg
}
Loading