From 660d76a7485db7870ddaeab448eba15d216c9ef8 Mon Sep 17 00:00:00 2001 From: Antonio De Lucreziis Date: Fri, 24 Jun 2022 03:10:23 +0200 Subject: [PATCH] EventBus e WebSocket --- _frontend/src/hooks.jsx | 58 +++++++--- _frontend/src/partita/main.jsx | 17 ++- _frontend/vite.config.js | 4 + database/events.go | 7 ++ events/db.go | 7 -- events/eventbus.go | 76 ++++++++++++++ events/events.go | 68 ------------ go.mod | 3 + go.sum | 12 +++ main.go | 8 +- routes/api.go | 83 +-------------- routes/pages.go | 2 +- routes/ws.go | 186 +++++++++++++++++++++++++++++++++ 13 files changed, 353 insertions(+), 178 deletions(-) create mode 100644 database/events.go delete mode 100644 events/db.go create mode 100644 events/eventbus.go create mode 100644 routes/ws.go diff --git a/_frontend/src/hooks.jsx b/_frontend/src/hooks.jsx index c80648a..c9801a1 100644 --- a/_frontend/src/hooks.jsx +++ b/_frontend/src/hooks.jsx @@ -1,26 +1,54 @@ -import { useState, useEffect } from 'preact/hooks' +import { useState, useEffect, useRef } from 'preact/hooks' -export function useLiveValue(path, initialValue) { - const [value, setValue] = useState(initialValue) +export function useEventBus() { + const bus = useRef({}) + const pendingSubscriptions = useRef([]) useEffect(() => { - const url = `${path}/events` - console.log('LiveValue:', url) - - const source = new EventSource(url) - source.onmessage = event => { - console.log('Received:', event.data) - if (event.data !== 'initial message') { - setValue(JSON.parse(event.data)) + const ws = new WebSocket(`ws://${location.host}/ws`) + + function registerPending() { + while (pendingSubscriptions.current.length > 0) { + const subscription = pendingSubscriptions.current.pop() + console.log(`Registering subscription:`, subscription) + ws.send(JSON.stringify(subscription)) } } - fetch(path) - .then(res => res.json()) - .then(setValue) + ws.addEventListener('open', e => { + console.log('Websocket connection started') + + registerPending() + }) + ws.addEventListener('close', () => { + console.log('Websocket connection closed') + }) + ws.addEventListener('message', e => { + registerPending() + + const v = JSON.parse(e.data) + console.log('Received:', v) + + const { event, data } = v + bus.current[event](data) + }) }, []) - return value + const useLiveValue = (event, initialValue) => { + const [value, setValue] = useState(initialValue) + bus.current[event] = setValue + + useEffect(() => { + pendingSubscriptions.current.push({ + type: 'subscribe', + event, + }) + }, []) + + return [value, setValue] + } + + return [useLiveValue] } export function useUser() { diff --git a/_frontend/src/partita/main.jsx b/_frontend/src/partita/main.jsx index 2ec12e7..e738421 100644 --- a/_frontend/src/partita/main.jsx +++ b/_frontend/src/partita/main.jsx @@ -1,16 +1,27 @@ import { render } from 'preact' -import { useLiveValue, useUser } from '../hooks.jsx' +import { useEffect } from 'preact/hooks' +import { useEventBus, useUser } from '../hooks.jsx' const PARTITA_UID = location.pathname.split('/')[2] const App = () => { + const [useLiveValue] = useEventBus() + const user = useUser() - const joinedPlayers = useLiveValue(`/api/partita/${PARTITA_UID}/joined-players`, []) + const [joinedPlayers, setJoinedPlayers] = useLiveValue( + `partita[uid="${PARTITA_UID}"].players`, + [] + ) + + useEffect(() => { + fetch(`/api/partita/${PARTITA_UID}/players`) + .then(res => res.json()) + .then(setJoinedPlayers) + }, []) const onJoinPartita = async () => { const res = await fetch(`/api/partita/${PARTITA_UID}/join-partita`) if (res.ok) { - console.log(await res.text()) console.log('Ti sei unito alla partita!') } } diff --git a/_frontend/vite.config.js b/_frontend/vite.config.js index ab72e56..0c85ff4 100644 --- a/_frontend/vite.config.js +++ b/_frontend/vite.config.js @@ -38,6 +38,10 @@ export default defineConfig({ changeOrigin: true, rewrite: path => path, }, + '/ws': { + target: 'ws://127.0.0.1:4000', + ws: true, + }, }, }, plugins: [ diff --git a/database/events.go b/database/events.go new file mode 100644 index 0000000..abcb114 --- /dev/null +++ b/database/events.go @@ -0,0 +1,7 @@ +package database + +import "fmt" + +func OnPartitaPlayersChange(partitaUid string) string { + return fmt.Sprintf("partita[uid=%q].players", partitaUid) +} diff --git a/events/db.go b/events/db.go deleted file mode 100644 index 798bd40..0000000 --- a/events/db.go +++ /dev/null @@ -1,7 +0,0 @@ -package events - -import "fmt" - -func OnPartitaPlayerJoins(partitaUid string) string { - return fmt.Sprintf("partita[uid=%q].players@join", partitaUid) -} diff --git a/events/eventbus.go b/events/eventbus.go new file mode 100644 index 0000000..2ee2651 --- /dev/null +++ b/events/eventbus.go @@ -0,0 +1,76 @@ +package events + +import ( + "log" + "sync" + + "golang.org/x/exp/slices" +) + +type ListenerFunc func(interface{}) + +type Listener struct { + Event string + Handle ListenerFunc +} + +type EventBus struct { + lock sync.RWMutex + listeners map[string][]*Listener +} + +func NewEventBus() *EventBus { + return &EventBus{ + listeners: map[string][]*Listener{}, + } +} + +// Subscribe a listener for the given event type +func (eb *EventBus) Subscribe(event string, f ListenerFunc) *Listener { + eb.lock.Lock() + defer eb.lock.Unlock() + + log.Printf("[EventBus] Subscribed listener for %q", event) + + l := &Listener{event, f} + + if prev, found := eb.listeners[event]; !found { + eb.listeners[event] = []*Listener{l} + } else { + eb.listeners[event] = append(prev, l) + } + + return l +} + +// Unsubscribe removes a listener for the given event type +func (eb *EventBus) Unsubscribe(l *Listener) { + eb.lock.Lock() + defer eb.lock.Unlock() + + listeners := eb.listeners[l.Event] + + i := slices.Index(listeners, l) + if i >= 0 { + // TODO: Magari usare la tecnica di spostare l'ultimo alla posizione dell'elemento rimosso e accorciare lo slice, tanto alla fine l'ordine dei listener non conta veramente + eb.listeners[l.Event] = append(listeners[:i], listeners[i+1:]...) + } +} + +// Dispatch +func (eb *EventBus) Dispatch(event string, data interface{}) { + eb.lock.RLock() + defer eb.lock.RUnlock() + + log.Printf("[EventBus] Fired event %q with value %v", event, data) + + if listeners, found := eb.listeners[event]; found { + // make a copy of "listeners" to pass to the goroutine and not block the dispatcher thread + tmpListeners := append([]*Listener{}, listeners...) + go func() { + for _, listener := range tmpListeners { + listener.Handle(data) + } + }() + } +} diff --git a/events/events.go b/events/events.go index 10370e8..b3adf69 100644 --- a/events/events.go +++ b/events/events.go @@ -1,69 +1 @@ package events - -import ( - "sync" - - "golang.org/x/exp/slices" -) - -type ListenerFunc func(interface{}) - -type Listener struct { - Event string - Handle ListenerFunc -} - -type EventBus struct { - lock sync.RWMutex - listeners map[string][]*Listener -} - -func NewEventBus() *EventBus { - return &EventBus{ - listeners: map[string][]*Listener{}, - } -} - -// Subscribe a listener for the given event type -func (eb *EventBus) Subscribe(event string, f ListenerFunc) *Listener { - l := &Listener{event, f} - - eb.lock.Lock() - if prev, found := eb.listeners[event]; !found { - eb.listeners[event] = []*Listener{l} - } else { - eb.listeners[event] = append(prev, l) - } - eb.lock.Unlock() - - return l -} - -// Unsubscribe removes a listener for the given event type -func (eb *EventBus) Unsubscribe(l *Listener) { - eb.lock.Lock() - listeners := eb.listeners[l.Event] - - i := slices.Index(listeners, l) - if i >= 0 { - // TODO: Magari usare la tecnica di spostare l'ultimo alla posizione dell'elemento rimosso e accorciare lo slice, tanto alla fine l'ordine dei listener non conta veramente - eb.listeners[l.Event] = append(listeners[:i], listeners[i+1:]...) - } - - eb.lock.Unlock() -} - -// Dispatch -func (eb *EventBus) Dispatch(event string, data interface{}) { - eb.lock.RLock() - if listeners, found := eb.listeners[event]; found { - // make a copy of "listeners" to pass to the goroutine and not block the dispatcher thread - tmpListeners := append([]*Listener{}, listeners...) - go func() { - for _, listener := range tmpListeners { - listener.Handle(data) - } - }() - } - eb.lock.RUnlock() -} diff --git a/go.mod b/go.mod index 3bc04d9..b4beff8 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,11 @@ require ( require ( github.com/andybalholm/brotli v1.0.4 // indirect + github.com/fasthttp/websocket v1.5.0 // indirect github.com/gofiber/template v1.6.28 // indirect + github.com/gofiber/websocket/v2 v2.0.22 // indirect github.com/klauspost/compress v1.15.0 // indirect + github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.37.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect diff --git a/go.sum b/go.sum index 604a5df..8996a06 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws= +github.com/fasthttp/websocket v1.5.0 h1:B4zbe3xXyvIdnqjOZrafVFklCUq5ZLo/TqCt5JA1wLE= +github.com/fasthttp/websocket v1.5.0/go.mod h1:n0BlOQvJdPbTuBkZT0O5+jk/sp/1/VCzquR1BehI2F4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= @@ -131,6 +133,8 @@ github.com/gofiber/fiber/v2 v2.34.1 h1:C6saXB7385HvtXX+XMzc5Dqj5S/aEXOfKCW7JNep4 github.com/gofiber/fiber/v2 v2.34.1/go.mod h1:ozRQfS+D7EL1+hMH+gutku0kfx1wLX4hAxDCtDzpj4U= github.com/gofiber/template v1.6.28 h1:G7u+MZDkGeWGmQyfOvFm71dzWRLu5YyMwyxHQnGRZao= github.com/gofiber/template v1.6.28/go.mod h1:nD+Fds2sa7AnXSkDHK+i0ROa+Q8NwW6b9+w4rCHqc1k= +github.com/gofiber/websocket/v2 v2.0.22 h1:aR2PomjLYRoQdFLFq5dH4OqJ93NiVfrfQTJqi1zxthU= +github.com/gofiber/websocket/v2 v2.0.22/go.mod h1:/F8SLCxN9kEfBvwGW0FBQ4/+yF18GA3Q9ckqynuiSZk= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -200,6 +204,7 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -248,6 +253,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -315,6 +321,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sagikazarmark/crypt v0.3.0/go.mod h1:uD/D+6UF4SrIR1uGEv7bBNkNqLGqUr43MRiaGWX1Nig= +github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 h1:Orn7s+r1raRTBKLSc9DmbktTT04sL+vkzsbRD2Q8rOI= +github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -338,6 +346,7 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.33.0/go.mod h1:KJRK/MXx0J+yd0c5hlR+s1tIHD72sniU8ZJjl97LIw4= github.com/valyala/fasthttp v1.37.0 h1:7WHCyI7EAkQMVmrfBhWTCOaeROb1aCBiTopx63LkMbE= github.com/valyala/fasthttp v1.37.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= @@ -373,6 +382,7 @@ golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -459,6 +469,7 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220111093109-d55c255bac03/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -555,6 +566,7 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/main.go b/main.go index f50ec62..8216a2a 100644 --- a/main.go +++ b/main.go @@ -25,11 +25,9 @@ func main() { // Static files app.Static("/", "./_frontend/dist") - // Pages - app.Route("/", server.Pages) - - // Api routes - app.Route("/api", server.Api) + app.Route("/", server.EventRoute) + app.Route("/", server.PageRoutes) + app.Route("/api", server.ApiRoutes) if strings.HasPrefix(mode, "dev") { log.Printf(`Running dev server for frontend: "npm run dev"`) diff --git a/routes/api.go b/routes/api.go index 792938b..8e01c06 100644 --- a/routes/api.go +++ b/routes/api.go @@ -1,22 +1,19 @@ package routes import ( - "bufio" "encoding/json" - "fmt" "log" "strconv" "time" - "github.com/aziis98/lupus-lite/events" + "github.com/aziis98/lupus-lite/database" "github.com/aziis98/lupus-lite/lupus" "github.com/aziis98/lupus-lite/model" "github.com/aziis98/lupus-lite/util" "github.com/gofiber/fiber/v2" - "github.com/valyala/fasthttp" ) -func (s *Server) Api(api fiber.Router) { +func (s *Server) ApiRoutes(api fiber.Router) { api.Get("/status", func(c *fiber.Ctx) error { s, err := json.MarshalIndent(s.db, "", " ") if err != nil { @@ -146,7 +143,7 @@ func (s *Server) Api(api fiber.Router) { return c.JSON(requestUser(c).PublicUser()) }) - api.Get("/partita/:partita/joined-players", func(c *fiber.Ctx) error { + api.Get("/partita/:partita/players", func(c *fiber.Ctx) error { partitaUid := c.Params("partita") partita, err := s.db.GetPartita(partitaUid) @@ -157,78 +154,6 @@ func (s *Server) Api(api fiber.Router) { return c.JSON(partita.Players) }) - api.Get("/partita/:partita/joined-players/events", func(c *fiber.Ctx) error { - partitaUid := c.Params("partita") - - c.Set("Content-Type", "text/event-stream") - c.Set("Cache-Control", "no-cache") - c.Set("Connection", "keep-alive") - c.Set("Transfer-Encoding", "chunked") - - sse := make(chan any, 1) - done := make(chan bool) - - clientId := util.GenerateRandomString(16) - log.Printf(`[%v] New SSE listener`, clientId) - - l := s.eventBus.Subscribe( - events.OnPartitaPlayerJoins(partitaUid), - func(e interface{}) { - sse <- e - }, - ) - - go func() { - <-done - close(sse) - close(done) - s.eventBus.Unsubscribe(l) - }() - - c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - log.Printf("[%v] Starting communication...", clientId) - - if _, err := fmt.Fprintf(w, "data: initial message\n\n"); err != nil { - log.Printf(`SSE Writer error: %v`, err) - return - } - - outer: - for { - select { - case e := <-sse: - log.Printf(`[%v] Event: New player joined`, clientId) - players := e.([]string) - - msg, err := json.Marshal(players) - if err != nil { - log.Printf("[%v] Error %v", clientId, err) - break outer - } - - if _, err := fmt.Fprintf(w, "data: %s\n\n", msg); err != nil { - log.Printf("[%v] Error: sse writer error, %v", clientId, err) - break outer - } - - err = w.Flush() - log.Printf(`[%v] Flush result %v`, clientId, err) - if err != nil { - log.Printf("[%v] Error: error while flushing, %v, Closing connection.", clientId, err) - break outer - } - case <-time.After(10 * time.Second): - break outer - } - } - - log.Printf("[%v] Connection timeout", clientId) - done <- true - })) - - return nil - }) - api.Get("/partita/:partita/join-partita", s.requireLogged, func(c *fiber.Ctx) error { user := requestUser(c) partitaUid := c.Params("partita") @@ -244,7 +169,7 @@ func (s *Server) Api(api fiber.Router) { return err } - s.eventBus.Dispatch(events.OnPartitaPlayerJoins(partitaUid), partita.Players) + s.eventBus.Dispatch(database.OnPartitaPlayersChange(partitaUid), partita.Players) return c.SendString("ok") }) diff --git a/routes/pages.go b/routes/pages.go index 4150536..b5bb4b0 100644 --- a/routes/pages.go +++ b/routes/pages.go @@ -2,7 +2,7 @@ package routes import "github.com/gofiber/fiber/v2" -func (s *Server) Pages(r fiber.Router) { +func (s *Server) PageRoutes(r fiber.Router) { r.Get("/", func(c *fiber.Ctx) error { return c.SendFile("_frontend/dist/index.html") }) diff --git a/routes/ws.go b/routes/ws.go new file mode 100644 index 0000000..771b947 --- /dev/null +++ b/routes/ws.go @@ -0,0 +1,186 @@ +package routes + +import ( + "log" + + "github.com/aziis98/lupus-lite/events" + "github.com/aziis98/lupus-lite/util" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + +// func (s *Server) websocketHandler(handler func(c *fiber.Ctx) (interface{}, error)) fiber.Handler { +// return websocket.New(func(c *websocket.Conn) { +// clientId := util.GenerateRandomString(10) + +// partitaUid := c.Params("partita") +// log.Printf("[%v] Connection started", clientId) + +// done := make(chan error) +// clientListener := s.eventBus.Subscribe( +// database.OnPartitaPlayerJoin(partitaUid), +// func(e interface{}) { +// partita, err := s.db.GetPartita(partitaUid) +// if err != nil { +// done <- err +// return +// } + +// log.Printf(`[%v] Sent message`, clientId) +// if err := c.WriteJSON(partita.Players); err != nil { +// done <- err +// return +// } +// }, +// ) +// defer func() { +// s.eventBus.Unsubscribe(clientListener) +// log.Printf("[%v] Connection closed", clientId) +// }() + +// for { +// _, _, err := c.ReadMessage() +// if err != nil { +// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { +// log.Printf("[%v] Error: %v", clientId, err) +// } +// return +// } + +// select { +// case err := <-done: +// if err != nil { +// log.Printf(`[%v] Error: %v`, clientId, err) +// } +// return +// case <-time.After(1 * time.Second): +// } +// } +// }) +// } + +// func (s *Server) registerEventBinding(event string, process func(c *fiber.Ctx, e any) (any, error)) fiber.Handler { +// return websocket.New(func(c *websocket.Conn) { +// clientId := util.GenerateRandomString(10) +// log.Printf("[%v] Connection started", clientId) +// done := make(chan error) +// clientListener := s.eventBus.Subscribe( +// event, +// func(e interface{}) { +// partita, err := s.db.GetPartita(partitaUid) +// if err != nil { +// done <- err +// return +// } + +// log.Printf(`[%v] Sent message`, clientId) +// if err := c.WriteJSON(partita.Players); err != nil { +// done <- err +// return +// } +// }, +// ) +// defer func() { +// s.eventBus.Unsubscribe(clientListener) +// log.Printf("[%v] Connection closed", clientId) +// }() + +// for { +// _, _, err := c.ReadMessage() +// if err != nil { +// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { +// log.Printf("[%v] Error: %v", clientId, err) +// } +// return +// } + +// select { +// case err := <-done: +// if err != nil { +// log.Printf(`[%v] Error: %v`, clientId, err) +// } +// return +// case <-time.After(1 * time.Second): +// } +// } +// }) +// } + +type clientMessage struct { + Type string `json:"type"` + Event string `json:"event"` +} + +type serverMessage struct { + Event string `json:"event"` + Data any `json:"data"` +} + +// registerEventBinder creates a WebSocket handler that can subscribe the socket to some events generated on the server, for example a message from the client like { type: "subscribe", event: "foo" } binds the connection to the event "foo" and forwards events and the associated data to client as JSON. +func registerEventBinder(eb *events.EventBus) fiber.Handler { + return websocket.New(func(c *websocket.Conn) { + clientId := util.GenerateRandomString(10) + + log.Printf("[%v] Connection started", clientId) + defer log.Printf("[%v] Connection closed", clientId) + + listeners := []*events.Listener{} + defer func() { + for _, l := range listeners { + eb.Unsubscribe(l) + } + }() + + done := make(chan error) + for { + var msg clientMessage + if err := c.ReadJSON(&msg); err != nil { + log.Printf(`[%v] Read error: %v`, clientId, err) + return + } + + log.Printf("[%v] Received: %v", clientId, msg) + + switch msg.Type { + case "subscribe": + newListener := eb.Subscribe( + msg.Event, + func(e interface{}) { + log.Printf(`[%v] Sent message`, clientId) + if err := c.WriteJSON(serverMessage{ + Event: msg.Event, + Data: e, + }); err != nil { + log.Printf(`[%v] Write error: %v`, clientId, err) + done <- err + } + }, + ) + listeners = append(listeners, newListener) + default: + log.Printf("[%v] Unknown message type %q", clientId, msg.Type) + return + } + + select { + case err := <-done: + if err != nil { + return + } + default: + } + } + }) +} + +func (s *Server) EventRoute(r fiber.Router) { + r.Use("/ws", func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + c.Locals("allowed", true) + return c.Next() + } + return fiber.ErrUpgradeRequired + }) + + r.Get("/ws", registerEventBinder(s.eventBus)) +}