Files
fujarna/backend/internal/websocket/hub.go
Jan Novak a7244406fd
Some checks failed
Build and Push / build (push) Failing after 8s
Initial release: Disc Agenda frisbee tournament platform
Full-stack tournament management app with real-time scoring:
- Go 1.26 backend with REST API and WebSocket live scoring
- React 19 + Vite 8 frontend with mobile-first design
- File-based JSON storage with JSONL audit logs
- Multi-stage Docker build with Gitea CI/CD pipeline
- Post-tournament questionnaire with spirit voting
- Technical documentation and project description

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 14:48:15 +01:00

356 lines
7.7 KiB
Go

package websocket
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/frisbee-tournament/backend/internal/models"
"github.com/frisbee-tournament/backend/internal/storage"
)
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
userID string
}
type Hub struct {
gameID string
tourneyID string
store *storage.Store
hubMgr *HubManager
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
// TournamentHub forwards game score updates to all schedule viewers for a tournament.
type TournamentHub struct {
tourneyID string
store *storage.Store
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
func (th *TournamentHub) Run() {
for {
select {
case client := <-th.register:
th.mu.Lock()
th.clients[client] = true
th.mu.Unlock()
case client := <-th.unregister:
th.mu.Lock()
if _, ok := th.clients[client]; ok {
delete(th.clients, client)
close(client.send)
}
th.mu.Unlock()
case message := <-th.broadcast:
th.mu.RLock()
for client := range th.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(th.clients, client)
}
}
th.mu.RUnlock()
}
}
}
func (th *TournamentHub) RegisterClient(conn *websocket.Conn) *Client {
c := &Client{
conn: conn,
send: make(chan []byte, 256),
}
th.register <- c
return c
}
// ReadPumpTournament keeps the connection alive (reads pongs) but ignores incoming messages.
func (c *Client) ReadPumpTournament(th *TournamentHub) {
defer func() {
th.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
if _, _, err := c.conn.ReadMessage(); err != nil {
break
}
}
}
type HubManager struct {
hubs map[string]*Hub
tournamentHubs map[string]*TournamentHub
mu sync.RWMutex
store *storage.Store
}
func NewHubManager(store *storage.Store) *HubManager {
return &HubManager{
hubs: make(map[string]*Hub),
tournamentHubs: make(map[string]*TournamentHub),
store: store,
}
}
func (m *HubManager) GetOrCreateHub(tourneyID, gameID string) *Hub {
key := tourneyID + "/" + gameID
m.mu.Lock()
defer m.mu.Unlock()
if h, ok := m.hubs[key]; ok {
return h
}
h := &Hub{
gameID: gameID,
tourneyID: tourneyID,
store: m.store,
hubMgr: m,
clients: make(map[*Client]bool),
broadcast: make(chan []byte, 256),
register: make(chan *Client),
unregister: make(chan *Client),
}
m.hubs[key] = h
go h.Run()
return h
}
func (m *HubManager) GetOrCreateTournamentHub(tourneyID string) *TournamentHub {
m.mu.Lock()
defer m.mu.Unlock()
if th, ok := m.tournamentHubs[tourneyID]; ok {
return th
}
th := &TournamentHub{
tourneyID: tourneyID,
store: m.store,
clients: make(map[*Client]bool),
broadcast: make(chan []byte, 256),
register: make(chan *Client),
unregister: make(chan *Client),
}
m.tournamentHubs[tourneyID] = th
go th.Run()
return th
}
// BroadcastToTournament forwards a game update to the tournament hub if it exists.
func (m *HubManager) BroadcastToTournament(tourneyID string, msg []byte) {
m.mu.RLock()
th, ok := m.tournamentHubs[tourneyID]
m.mu.RUnlock()
if ok {
select {
case th.broadcast <- msg:
default:
}
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
// Send current score to new client
state, err := h.store.GetScore(h.tourneyID, h.gameID)
if err == nil {
data, _ := json.Marshal(map[string]any{
"type": "score_state",
"state": state,
})
client.send <- data
}
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.RUnlock()
}
}
}
func (h *Hub) HandleScoreUpdate(update models.ScoreUpdate) (*models.ScoreState, error) {
state, err := h.store.GetScore(h.tourneyID, h.gameID)
if err != nil {
return nil, err
}
oldHome := state.HomeScore
oldAway := state.AwayScore
switch update.Action {
case "increment":
if state.Status == "final" {
return state, nil
}
if update.Team == "home" {
state.HomeScore++
} else {
state.AwayScore++
}
case "decrement":
if state.Status == "final" {
return state, nil
}
if update.Team == "home" && state.HomeScore > 0 {
state.HomeScore--
} else if update.Team == "away" && state.AwayScore > 0 {
state.AwayScore--
}
case "set":
if state.Status == "final" {
return state, nil
}
if update.Team == "home" {
state.HomeScore = update.Value
} else {
state.AwayScore = update.Value
}
case "set_status":
// update.Team carries the target status: scheduled, live, final
if update.Team == "scheduled" || update.Team == "live" || update.Team == "final" {
state.Status = update.Team
}
}
// Auto-transition: scheduled → live when score becomes non-zero
if update.Action != "set_status" && state.Status == "scheduled" && (state.HomeScore > 0 || state.AwayScore > 0) {
state.Status = "live"
}
if err := h.store.SaveScore(h.tourneyID, state); err != nil {
return nil, err
}
// Write audit log
entry := models.AuditEntry{
Timestamp: time.Now(),
Action: update.Action,
Team: update.Team,
Value: update.Value,
OldHome: oldHome,
OldAway: oldAway,
NewHome: state.HomeScore,
NewAway: state.AwayScore,
UserID: update.UserID,
}
if err := h.store.AppendAuditLog(h.tourneyID, h.gameID, entry); err != nil {
log.Printf("audit log error: %v", err)
}
// Broadcast to all clients on this game hub
msg, _ := json.Marshal(map[string]any{
"type": "score_update",
"state": state,
"audit": entry,
})
h.broadcast <- msg
// Also forward to tournament hub for schedule viewers
h.hubMgr.BroadcastToTournament(h.tourneyID, msg)
return state, nil
}
func (h *Hub) RegisterClient(conn *websocket.Conn, userID string) *Client {
c := &Client{
hub: h,
conn: conn,
send: make(chan []byte, 256),
userID: userID,
}
h.register <- c
return c
}
func (c *Client) ReadPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(4096)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
var update models.ScoreUpdate
if err := json.Unmarshal(message, &update); err != nil {
continue
}
update.Timestamp = time.Now()
update.UserID = c.userID
if _, err := c.hub.HandleScoreUpdate(update); err != nil {
log.Printf("score update error: %v", err)
}
}
}
func (c *Client) WritePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}