EventBus e WebSocket
parent
f172d24c74
commit
660d76a748
@ -0,0 +1,7 @@
|
||||
package database
|
||||
|
||||
import "fmt"
|
||||
|
||||
func OnPartitaPlayersChange(partitaUid string) string {
|
||||
return fmt.Sprintf("partita[uid=%q].players", partitaUid)
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package events
|
||||
|
||||
import "fmt"
|
||||
|
||||
func OnPartitaPlayerJoins(partitaUid string) string {
|
||||
return fmt.Sprintf("partita[uid=%q].players@join", partitaUid)
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type ListenerFunc func(interface{})
|
||||
|
||||
type Listener struct {
|
||||
Event string
|
||||
Handle ListenerFunc
|
||||
}
|
||||
|
||||
type EventBus struct {
|
||||
lock sync.RWMutex
|
||||
listeners map[string][]*Listener
|
||||
}
|
||||
|
||||
func NewEventBus() *EventBus {
|
||||
return &EventBus{
|
||||
listeners: map[string][]*Listener{},
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe a listener for the given event type
|
||||
func (eb *EventBus) Subscribe(event string, f ListenerFunc) *Listener {
|
||||
eb.lock.Lock()
|
||||
defer eb.lock.Unlock()
|
||||
|
||||
log.Printf("[EventBus] Subscribed listener for %q", event)
|
||||
|
||||
l := &Listener{event, f}
|
||||
|
||||
if prev, found := eb.listeners[event]; !found {
|
||||
eb.listeners[event] = []*Listener{l}
|
||||
} else {
|
||||
eb.listeners[event] = append(prev, l)
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// Unsubscribe removes a listener for the given event type
|
||||
func (eb *EventBus) Unsubscribe(l *Listener) {
|
||||
eb.lock.Lock()
|
||||
defer eb.lock.Unlock()
|
||||
|
||||
listeners := eb.listeners[l.Event]
|
||||
|
||||
i := slices.Index(listeners, l)
|
||||
if i >= 0 {
|
||||
// TODO: Magari usare la tecnica di spostare l'ultimo alla posizione dell'elemento rimosso e accorciare lo slice, tanto alla fine l'ordine dei listener non conta veramente
|
||||
eb.listeners[l.Event] = append(listeners[:i], listeners[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch
|
||||
func (eb *EventBus) Dispatch(event string, data interface{}) {
|
||||
eb.lock.RLock()
|
||||
defer eb.lock.RUnlock()
|
||||
|
||||
log.Printf("[EventBus] Fired event %q with value %v", event, data)
|
||||
|
||||
if listeners, found := eb.listeners[event]; found {
|
||||
// make a copy of "listeners" to pass to the goroutine and not block the dispatcher thread
|
||||
tmpListeners := append([]*Listener{}, listeners...)
|
||||
go func() {
|
||||
for _, listener := range tmpListeners {
|
||||
listener.Handle(data)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
@ -0,0 +1,186 @@
|
||||
package routes
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/aziis98/lupus-lite/events"
|
||||
"github.com/aziis98/lupus-lite/util"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/websocket/v2"
|
||||
)
|
||||
|
||||
// func (s *Server) websocketHandler(handler func(c *fiber.Ctx) (interface{}, error)) fiber.Handler {
|
||||
// return websocket.New(func(c *websocket.Conn) {
|
||||
// clientId := util.GenerateRandomString(10)
|
||||
|
||||
// partitaUid := c.Params("partita")
|
||||
// log.Printf("[%v] Connection started", clientId)
|
||||
|
||||
// done := make(chan error)
|
||||
// clientListener := s.eventBus.Subscribe(
|
||||
// database.OnPartitaPlayerJoin(partitaUid),
|
||||
// func(e interface{}) {
|
||||
// partita, err := s.db.GetPartita(partitaUid)
|
||||
// if err != nil {
|
||||
// done <- err
|
||||
// return
|
||||
// }
|
||||
|
||||
// log.Printf(`[%v] Sent message`, clientId)
|
||||
// if err := c.WriteJSON(partita.Players); err != nil {
|
||||
// done <- err
|
||||
// return
|
||||
// }
|
||||
// },
|
||||
// )
|
||||
// defer func() {
|
||||
// s.eventBus.Unsubscribe(clientListener)
|
||||
// log.Printf("[%v] Connection closed", clientId)
|
||||
// }()
|
||||
|
||||
// for {
|
||||
// _, _, err := c.ReadMessage()
|
||||
// if err != nil {
|
||||
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
// log.Printf("[%v] Error: %v", clientId, err)
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// select {
|
||||
// case err := <-done:
|
||||
// if err != nil {
|
||||
// log.Printf(`[%v] Error: %v`, clientId, err)
|
||||
// }
|
||||
// return
|
||||
// case <-time.After(1 * time.Second):
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
|
||||
// func (s *Server) registerEventBinding(event string, process func(c *fiber.Ctx, e any) (any, error)) fiber.Handler {
|
||||
// return websocket.New(func(c *websocket.Conn) {
|
||||
// clientId := util.GenerateRandomString(10)
|
||||
// log.Printf("[%v] Connection started", clientId)
|
||||
// done := make(chan error)
|
||||
// clientListener := s.eventBus.Subscribe(
|
||||
// event,
|
||||
// func(e interface{}) {
|
||||
// partita, err := s.db.GetPartita(partitaUid)
|
||||
// if err != nil {
|
||||
// done <- err
|
||||
// return
|
||||
// }
|
||||
|
||||
// log.Printf(`[%v] Sent message`, clientId)
|
||||
// if err := c.WriteJSON(partita.Players); err != nil {
|
||||
// done <- err
|
||||
// return
|
||||
// }
|
||||
// },
|
||||
// )
|
||||
// defer func() {
|
||||
// s.eventBus.Unsubscribe(clientListener)
|
||||
// log.Printf("[%v] Connection closed", clientId)
|
||||
// }()
|
||||
|
||||
// for {
|
||||
// _, _, err := c.ReadMessage()
|
||||
// if err != nil {
|
||||
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
// log.Printf("[%v] Error: %v", clientId, err)
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// select {
|
||||
// case err := <-done:
|
||||
// if err != nil {
|
||||
// log.Printf(`[%v] Error: %v`, clientId, err)
|
||||
// }
|
||||
// return
|
||||
// case <-time.After(1 * time.Second):
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
|
||||
type clientMessage struct {
|
||||
Type string `json:"type"`
|
||||
Event string `json:"event"`
|
||||
}
|
||||
|
||||
type serverMessage struct {
|
||||
Event string `json:"event"`
|
||||
Data any `json:"data"`
|
||||
}
|
||||
|
||||
// registerEventBinder creates a WebSocket handler that can subscribe the socket to some events generated on the server, for example a message from the client like { type: "subscribe", event: "foo" } binds the connection to the event "foo" and forwards events and the associated data to client as JSON.
|
||||
func registerEventBinder(eb *events.EventBus) fiber.Handler {
|
||||
return websocket.New(func(c *websocket.Conn) {
|
||||
clientId := util.GenerateRandomString(10)
|
||||
|
||||
log.Printf("[%v] Connection started", clientId)
|
||||
defer log.Printf("[%v] Connection closed", clientId)
|
||||
|
||||
listeners := []*events.Listener{}
|
||||
defer func() {
|
||||
for _, l := range listeners {
|
||||
eb.Unsubscribe(l)
|
||||
}
|
||||
}()
|
||||
|
||||
done := make(chan error)
|
||||
for {
|
||||
var msg clientMessage
|
||||
if err := c.ReadJSON(&msg); err != nil {
|
||||
log.Printf(`[%v] Read error: %v`, clientId, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[%v] Received: %v", clientId, msg)
|
||||
|
||||
switch msg.Type {
|
||||
case "subscribe":
|
||||
newListener := eb.Subscribe(
|
||||
msg.Event,
|
||||
func(e interface{}) {
|
||||
log.Printf(`[%v] Sent message`, clientId)
|
||||
if err := c.WriteJSON(serverMessage{
|
||||
Event: msg.Event,
|
||||
Data: e,
|
||||
}); err != nil {
|
||||
log.Printf(`[%v] Write error: %v`, clientId, err)
|
||||
done <- err
|
||||
}
|
||||
},
|
||||
)
|
||||
listeners = append(listeners, newListener)
|
||||
default:
|
||||
log.Printf("[%v] Unknown message type %q", clientId, msg.Type)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) EventRoute(r fiber.Router) {
|
||||
r.Use("/ws", func(c *fiber.Ctx) error {
|
||||
if websocket.IsWebSocketUpgrade(c) {
|
||||
c.Locals("allowed", true)
|
||||
return c.Next()
|
||||
}
|
||||
return fiber.ErrUpgradeRequired
|
||||
})
|
||||
|
||||
r.Get("/ws", registerEventBinder(s.eventBus))
|
||||
}
|
Loading…
Reference in New Issue