Skip to content

Commit

Permalink
basic
Browse files Browse the repository at this point in the history
  • Loading branch information
ImSoZRious committed Mar 26, 2024
1 parent 527d0b1 commit e113ba6
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 13 deletions.
44 changes: 44 additions & 0 deletions apps/api/cmd/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"context"
"errors"
"log"
"time"

"github.com/redis/go-redis/v9"
)

type Broadcaster struct {
hub *Hub
cache *redis.Client
state *string
}

func newBroadcaster(hub *Hub, cache *redis.Client) Broadcaster {
return Broadcaster{
hub: hub,
cache: cache,
state: nil,
}
}

func (b *Broadcaster) run() {
for {
time.Sleep(5 * time.Second)
ctx := context.Background()
state, err := b.cache.Get(ctx, "s:state").Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
}
log.Printf("[ERROR] broadcaster: %v", err)
continue
}
if b.state == nil || *b.state != state {
b.state = &state
b.hub.broadcast <- []byte(state)
}
}
}

134 changes: 134 additions & 0 deletions apps/api/cmd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"bytes"
"log"
"net/http"
"time"

"github.com/gorilla/websocket"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 512
)

var (
newline = []byte{'\n'}
space = []byte{' '}
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub

// The websocket connection.
conn *websocket.Conn

// Buffered channel of outbound messages.
send chan []byte
}

// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)

// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}

if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client

// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}

49 changes: 49 additions & 0 deletions apps/api/cmd/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
// Registered clients.
clients map[*Client]bool

// Inbound messages from the clients.
broadcast chan []byte

// Register requests from the clients.
register chan *Client

// Unregister requests from clients.
unregister chan *Client
}

func newHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}
}

func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
50 changes: 37 additions & 13 deletions apps/api/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,49 @@
package main

import (
"fmt"
"flag"
"log"
"net/http"
"time"

"github.com/redis/go-redis/v9"
)

func handle(w http.ResponseWriter, r *http.Request) {
log.Println("Received a request at my domain")
w.Write([]byte("Hello, Domain name!"))
var addr = flag.String("addr", ":8080", "http service address")

func serveHome(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL)
if r.URL.Path != "/" {
http.Error(w, "Not found", http.StatusNotFound)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
http.ServeFile(w, r, "home.html")
}

func main() {
router := http.NewServeMux()
router.HandleFunc("/", handle)

server := http.Server{
Addr: ":8080",
Handler: router,
flag.Parse()
conn := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
})
hub := newHub()
broadcaster := newBroadcaster(hub, conn)
go hub.run()
go broadcaster.run()
http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
server := &http.Server{
Addr: *addr,
ReadHeaderTimeout: 3 * time.Second,
}
err := server.ListenAndServe()
if err != nil {
log.Fatal("ListenAndServe: ", err)
}

fmt.Println("Server listening on port :8080")
server.ListenAndServe()
}
9 changes: 9 additions & 0 deletions apps/api/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: '3.9'

services:
redis:
image: redis:7.2.4-bookworm
container_name: redis
ports:
- 6379:6379

11 changes: 11 additions & 0 deletions apps/api/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
module github.com/isd-sgcu/cutu-2024

go 1.22.1

require (
github.com/gorilla/websocket v1.5.1
github.com/redis/go-redis/v9 v9.5.1
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
golang.org/x/net v0.17.0 // indirect
)
14 changes: 14 additions & 0 deletions apps/api/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=

0 comments on commit e113ba6

Please sign in to comment.