diff --git a/Dockerfile b/Dockerfile index 515ac41..c2c3728 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,9 +25,9 @@ RUN apt-get update && \ mkdir public RUN cd /usr/local && \ - wget https://golang.org/dl/go1.17.3.linux-amd64.tar.gz && \ + wget https://golang.org/dl/go1.17.5.linux-amd64.tar.gz && \ rm -rf /usr/local/go && \ - tar -C /usr/local -xzf go1.17.3.linux-amd64.tar.gz + tar -C /usr/local -xzf go1.17.5.linux-amd64.tar.gz ENV PATH=$PATH:/usr/local/go/bin diff --git a/server/internal/client/clientmanager.go b/server/internal/client/clientmanager.go index f77e3d7..3ce2adc 100644 --- a/server/internal/client/clientmanager.go +++ b/server/internal/client/clientmanager.go @@ -16,13 +16,13 @@ import ( type ClientPubsubManager struct { // Game name : client info gameClientMap map[string]GameClientInfo - lock *sync.Mutex + lock *sync.RWMutex } func NewClientPubsubManager() *ClientPubsubManager { return &ClientPubsubManager{ gameClientMap: make(map[string]GameClientInfo), - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, } } @@ -56,8 +56,6 @@ func (cm *ClientPubsubManager) GetUsernameForGame(game, room, username string) ( func (cm *ClientPubsubManager) SubscribeClientToRoom(serviceName string, conn gnet.Conn) (*ClientSockInfo, error) { // TODO: cleanup old sock addr in clientRemoteAddrMap - cm.lock.Lock() - defer cm.lock.Unlock() // Split provided servicename into something we can use gameName, roomName, err := SplitServiceName(serviceName) @@ -71,6 +69,7 @@ func (cm *ClientPubsubManager) SubscribeClientToRoom(serviceName string, conn gn } // Does the game client manager exist? if not, create + cm.lock.Lock() gameServ, ok := cm.gameClientMap[gameName] if !ok { gameServ = GameClientInfo{ @@ -79,6 +78,7 @@ func (cm *ClientPubsubManager) SubscribeClientToRoom(serviceName string, conn gn cm.gameClientMap[gameName] = gameServ } + cm.lock.Unlock() // TODO: initialize second map @@ -93,21 +93,39 @@ func (cm *ClientPubsubManager) SubscribeClientToRoom(serviceName string, conn gn } // Deletion in old room will be handled in an async worker + cm.lock.Lock() gameServ.clientRoomRemoteAddrMap[roomName] = append(gameServ.clientRoomRemoteAddrMap[roomName], sockInfo) + cm.lock.Unlock() if serviceType == Game { + cm.lock.RLock() + players := gameServ.clientRoomRemoteAddrMap[roomName] + cm.lock.RUnlock() // Send client all player room state - for _, player := range gameServ.clientRoomRemoteAddrMap[roomName] { + for _, player := range players { syncChanges := player.SyncObject.GetAllChanges() syncPayload, err := json.Marshal(&syncChanges) if err != nil { log.Errorf("Could not get initial player state. Err: %s", err) } + // Send the new client everyone's state err = sockInfo.ClientInfo.SendFromPlayer(syncPayload, player.ClientInfo.GetAddr()) if err != nil { log.Errorf("Failed to send initial state from player. Err: %s", err) } + + // Send the client's state to everyone + // syncChanges = sockInfo.SyncObject.GetAllChanges() + // syncPayload, err = json.Marshal(&syncChanges) + // if err != nil { + // log.Errorf("Could not get initial player state. Err: %s", err) + // } + + // err = player.ClientInfo.SendFromPlayer(syncPayload, sockInfo.ClientInfo.GetAddr()) + // if err != nil { + // log.Errorf("Failed to send initial state from player. Err: %s", err) + // } } } @@ -132,24 +150,26 @@ func SplitServiceName(serviceName string) (gameName, serviceType string, err err // TODO: refactor here too func (cm *ClientPubsubManager) Broadcast(payload interface{}, sockinfo *ClientSockInfo, fromServer bool) error { - cm.lock.Lock() - defer cm.lock.Unlock() payloadBytes, err := json.Marshal(payload) if err != nil { return err } + cm.lock.RLock() gameClientInfo, ok := cm.gameClientMap[sockinfo.GameName] + cm.lock.RUnlock() if !ok { return fmt.Errorf("Failed to broadcast, could not find game client info for game name %s", sockinfo.GameName) } + cm.lock.RLock() clients, ok := gameClientInfo.clientRoomRemoteAddrMap[sockinfo.RoomName] + cm.lock.RUnlock() if !ok { return fmt.Errorf("Failed to broadcast, could not find client list for room name %s", sockinfo.RoomName) } - log.Printf("Broadcasting for room %s, clients: %s", sockinfo.RoomName, len(clients)) + log.Debugf("Broadcasting for room %s, clients: %s", sockinfo.RoomName, len(clients)) for _, client := range clients { switch fromServer { case true: diff --git a/server/internal/client/syncobject.go b/server/internal/client/syncobject.go index 96e43b1..2d49f1f 100644 --- a/server/internal/client/syncobject.go +++ b/server/internal/client/syncobject.go @@ -2,6 +2,7 @@ package client import ( "strconv" + "sync" ) var serial int = 0 @@ -82,26 +83,50 @@ type SyncObject struct { TypingStatus *uint16 `json:"typingstatus,omitempty"` typingStatusChanged bool + + moveSeq uint8 + animframeSeq uint8 + spriteSeq uint8 + animspeedSeq uint8 + + // Could use more granular locking but this is ok + timeLock *sync.Mutex + posLock *sync.Mutex } func NewSyncObject() *SyncObject { serial += 1 var defaultSpeed uint16 = 4 - return &SyncObject{UID: strconv.Itoa(serial), Type: "objectSync", MovementAnimationSpeed: &defaultSpeed} + return &SyncObject{UID: strconv.Itoa(serial), + Type: "objectSync", + MovementAnimationSpeed: &defaultSpeed, + timeLock: &sync.Mutex{}, + posLock: &sync.Mutex{}, + } } -func (so *SyncObject) SetPos(x, y uint16) { - so.posChanged = true - so.Pos = &Position{X: x, Y: y} +func (so *SyncObject) SetPos(x, y uint16, seqNum uint8) { + so.posLock.Lock() + defer so.posLock.Unlock() + if so.moveSeq < seqNum { + so.moveSeq = seqNum + so.posChanged = true + so.Pos = &Position{X: x, Y: y} + } } -func (so *SyncObject) SetSprite(id uint16, sheet string) { +func (so *SyncObject) SetSprite(id uint16, sheet string, seqNum uint8) { // "why do I have to do this?" if id == 0 && sheet == "" { return } - so.Sprite = &Sprite{ID: id, Sheet: sheet} - so.spriteChanged = true + so.timeLock.Lock() + defer so.timeLock.Unlock() + if so.spriteSeq < seqNum { + so.spriteSeq = seqNum + so.Sprite = &Sprite{ID: id, Sheet: sheet} + so.spriteChanged = true + } // TODO: sprite validation goes here } @@ -116,14 +141,18 @@ func (so *SyncObject) SetWeather(t, strength uint16) { } func (so *SyncObject) SetSwitch(id, value uint32) { - println("Set switch") so.switchChanged = true so.Switch = &Switch{ID: id, Value: value} } -func (so *SyncObject) SetAnimFrame(frame uint16) { - so.AnimFrame = &frame - so.animframeChanged = true +func (so *SyncObject) SetAnimFrame(frame uint16, seqNum uint8) { + so.timeLock.Lock() + defer so.timeLock.Unlock() + if so.animframeSeq < seqNum { + so.animframeSeq = seqNum + so.AnimFrame = &frame + so.animframeChanged = true + } } func (so *SyncObject) SetName(name string) { @@ -131,9 +160,14 @@ func (so *SyncObject) SetName(name string) { so.Name = name } -func (so *SyncObject) SetMovementAnimationSpeed(animationSpeed uint16) { - so.movementAnimationSpeedChanged = true - so.MovementAnimationSpeed = &animationSpeed +func (so *SyncObject) SetMovementAnimationSpeed(animationSpeed uint16, seqNum uint8) { + so.timeLock.Lock() + defer so.timeLock.Unlock() + if so.animspeedSeq < seqNum { + so.animspeedSeq = seqNum + so.MovementAnimationSpeed = &animationSpeed + so.movementAnimationSpeedChanged = true + } } func (so *SyncObject) SetFacing(facing uint16) { diff --git a/server/internal/msghandler/chathandler.go b/server/internal/msghandler/chathandler.go index 75a7b7d..22cb08f 100644 --- a/server/internal/msghandler/chathandler.go +++ b/server/internal/msghandler/chathandler.go @@ -66,7 +66,7 @@ func (ch *ChatHandler) muxMessage(payload []byte, c gnet.Conn, s *client.ClientS func (ch *ChatHandler) pardonChat(payload []byte, client *client.ClientSockInfo) error { t := clientmessages.UnignoreChatEvents{} - matched, err := protocol.Marshal(payload, &t, false) + matched, _, err := protocol.Marshal(payload, &t, false) switch { case !matched: return errors.New("Failed to match") @@ -98,7 +98,7 @@ func (ch *ChatHandler) pardonChat(payload []byte, client *client.ClientSockInfo) func (ch *ChatHandler) ignoreChat(payload []byte, client *client.ClientSockInfo) error { t := clientmessages.IgnoreChatEvents{} - matched, err := protocol.Marshal(payload, &t, false) + matched, _, err := protocol.Marshal(payload, &t, false) switch { case !matched: return errors.New("Failed to match") @@ -136,7 +136,7 @@ func (ch *ChatHandler) ignoreChat(payload []byte, client *client.ClientSockInfo) func (ch *ChatHandler) setUsername(payload []byte, client *client.ClientSockInfo) error { t := clientmessages.SetUsername{} - matched, err := protocol.Marshal(payload, &t, false) + matched, _, err := protocol.Marshal(payload, &t, false) switch { case err != nil: return err @@ -160,7 +160,7 @@ func (ch *ChatHandler) setUsername(payload []byte, client *client.ClientSockInfo func (ch *ChatHandler) sendUserMessage(payload []byte, client *client.ClientSockInfo) error { t := clientmessages.SendMessage{} - matched, err := protocol.Marshal(payload, &t, false) + matched, _, err := protocol.Marshal(payload, &t, false) switch { case err != nil: return err diff --git a/server/internal/msghandler/gamehandler.go b/server/internal/msghandler/gamehandler.go index a8d5cfc..6bb1efb 100644 --- a/server/internal/msghandler/gamehandler.go +++ b/server/internal/msghandler/gamehandler.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/go-kit/kit/metrics" "github.com/horahoradev/YNO10k/internal/client" "github.com/horahoradev/YNO10k/internal/clientmessages" ynmetrics "github.com/horahoradev/YNO10k/internal/metrics" @@ -38,6 +39,11 @@ type GameHandler struct { inactiveSockInfoFlushMap map[string]*client.ClientSockInfo activeRWLock *sync.RWMutex inactiveRWLock *sync.RWMutex + + // Metrics + spriteCounter metrics.Counter + moveCounter metrics.Counter + soundCounter metrics.Counter } func NewGameHandler(ps client.PubSubManager) *GameHandler { @@ -48,6 +54,9 @@ func NewGameHandler(ps client.PubSubManager) *GameHandler { inactiveSockInfoFlushMap: make(map[string]*client.ClientSockInfo), inactiveRWLock: &sync.RWMutex{}, activeRWLock: &sync.RWMutex{}, + spriteCounter: ynmetrics.ConcreteCounter("yn10k.set_sprite_count"), + moveCounter: ynmetrics.ConcreteCounter("yn10k.set_pos_count"), + soundCounter: ynmetrics.ConcreteCounter("yn10k.set_sound_count"), } g.flushWorker() return &g @@ -173,8 +182,9 @@ func (ch *GameHandler) handleDisconnect(payload []byte, s *client.ClientSockInfo } func (ch *GameHandler) handleMovement(payload []byte, c *client.ClientSockInfo) error { + ch.moveCounter.Add(1) t := clientmessages.Movement{} - matched, err := protocol.Marshal(payload, &t, true) + matched, seq, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return err @@ -182,14 +192,15 @@ func (ch *GameHandler) handleMovement(payload []byte, c *client.ClientSockInfo) return errors.New("Failed to match in handleMovement") } - c.SyncObject.SetPos(t.X, t.Y) + c.SyncObject.SetPos(t.X, t.Y, seq) ch.scheduleChanges(c.SyncObject.UID, c) return nil } func (ch *GameHandler) handleSprite(payload []byte, c *client.ClientSockInfo) error { + ch.spriteCounter.Add(1) t := clientmessages.Sprite{} - matched, err := protocol.Marshal(payload, &t, true) + matched, seq, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return fmt.Errorf("Failed to handleSprite. Err: %s", err) @@ -197,7 +208,7 @@ func (ch *GameHandler) handleSprite(payload []byte, c *client.ClientSockInfo) er return errors.New("Failed to match in handleSprite") } - c.SyncObject.SetSprite(t.SpriteID, t.Spritesheet) + c.SyncObject.SetSprite(t.SpriteID, t.Spritesheet, seq) ch.scheduleChanges(c.SyncObject.UID, c) return nil } @@ -209,8 +220,9 @@ func (ch *GameHandler) scheduleChanges(uid string, c *client.ClientSockInfo) { } func (ch *GameHandler) handleSound(payload []byte, c *client.ClientSockInfo) error { + ch.soundCounter.Add(1) t := clientmessages.Sound{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return err @@ -225,7 +237,7 @@ func (ch *GameHandler) handleSound(payload []byte, c *client.ClientSockInfo) err func (ch *GameHandler) handleWeather(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.Weather{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case !matched: return errors.New("Failed to match in handleWeather") @@ -240,7 +252,7 @@ func (ch *GameHandler) handleWeather(payload []byte, c *client.ClientSockInfo) e func (ch *GameHandler) handleName(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.Name{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case !matched: return errors.New("Failed to match in handleWeather") @@ -255,7 +267,7 @@ func (ch *GameHandler) handleName(payload []byte, c *client.ClientSockInfo) erro func (ch *GameHandler) handleVariable(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.Variable{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case !matched: return errors.New("Failed to match in handleVariable") @@ -269,9 +281,8 @@ func (ch *GameHandler) handleVariable(payload []byte, c *client.ClientSockInfo) } func (ch *GameHandler) handleSwitchSync(payload []byte, c *client.ClientSockInfo) error { - log.Print("SWITCHING") t := clientmessages.SwitchSync{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case !matched: return errors.New("Failed to match in handleSwitchSync") @@ -286,7 +297,7 @@ func (ch *GameHandler) handleSwitchSync(payload []byte, c *client.ClientSockInfo func (ch *GameHandler) handleAnimFrame(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.AnimFrame{} - matched, err := protocol.Marshal(payload, &t, true) + matched, seq, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return err @@ -294,14 +305,14 @@ func (ch *GameHandler) handleAnimFrame(payload []byte, c *client.ClientSockInfo) return errors.New("failed to match in handleAnimFrame") } - c.SyncObject.SetAnimFrame(t.Frame) + c.SyncObject.SetAnimFrame(t.Frame, seq) ch.scheduleChanges(c.SyncObject.UID, c) return nil } func (ch *GameHandler) handleFacing(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.Facing{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return err @@ -316,7 +327,7 @@ func (ch *GameHandler) handleFacing(payload []byte, c *client.ClientSockInfo) er func (ch *GameHandler) handleTypingStatus(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.TypingStatus{} - matched, err := protocol.Marshal(payload, &t, true) + matched, _, err := protocol.Marshal(payload, &t, true) switch { case !matched: return errors.New("Failed to match in handleTypingStatus") @@ -331,7 +342,7 @@ func (ch *GameHandler) handleTypingStatus(payload []byte, c *client.ClientSockIn func (ch *GameHandler) handleMovementAnimSpeed(payload []byte, c *client.ClientSockInfo) error { t := clientmessages.MovementAnimationSpeed{} - matched, err := protocol.Marshal(payload, &t, true) + matched, seq, err := protocol.Marshal(payload, &t, true) switch { case err != nil: return err @@ -339,7 +350,7 @@ func (ch *GameHandler) handleMovementAnimSpeed(payload []byte, c *client.ClientS return errors.New("Failed to match in handleMovementAnimSpeed") } - c.SyncObject.SetMovementAnimationSpeed(t.MovementSpeed) + c.SyncObject.SetMovementAnimationSpeed(t.MovementSpeed, seq) ch.scheduleChanges(c.SyncObject.UID, c) return nil } diff --git a/server/internal/msghandler/servicemux.go b/server/internal/msghandler/servicemux.go index 138dff0..5ab3f9a 100644 --- a/server/internal/msghandler/servicemux.go +++ b/server/internal/msghandler/servicemux.go @@ -2,8 +2,9 @@ package msghandler import ( "errors" - "net" + "fmt" "sync" + "time" "github.com/horahoradev/YNO10k/internal/client" "github.com/panjf2000/gnet" @@ -17,7 +18,7 @@ type ServiceMux struct { gh Handler ch Handler lh Handler - SyncChanMap map[net.Addr]chan struct{} + SyncChanMap map[string]chan struct{} m *sync.Mutex } @@ -27,19 +28,22 @@ func NewServiceMux(gh, ch, lh Handler, cm client.PubSubManager) ServiceMux { gh: gh, ch: ch, lh: lh, - SyncChanMap: make(map[net.Addr]chan struct{}), + SyncChanMap: make(map[string]chan struct{}), m: &sync.Mutex{}, } } -func (sm ServiceMux) HandleMessage(clientPayload []byte, c gnet.Conn, cinfo *client.ClientSockInfo) error { +func (sm *ServiceMux) HandleMessage(clientPayload []byte, c gnet.Conn, cinfo *client.ClientSockInfo) error { log.Debug("Handling service message") + cAddr := c.RemoteAddr().String() + sm.m.Lock() - syncChan, ok := sm.SyncChanMap[c.RemoteAddr()] + // What in the fuck + syncChan, ok := sm.SyncChanMap[cAddr] if !ok { syncChan = make(chan struct{}) - sm.SyncChanMap[c.RemoteAddr()] = syncChan + sm.SyncChanMap[cAddr] = syncChan } sm.m.Unlock() @@ -52,19 +56,28 @@ func (sm ServiceMux) HandleMessage(clientPayload []byte, c gnet.Conn, cinfo *cli return nil } + log.Print("Subscribing...") // This is the servicename packet, use it to initialize the client info cInfo, err := sm.cm.SubscribeClientToRoom(string(clientPayload), c) if err != nil { log.Errorf("Failed to add client for room. Err: %s", err) return err } + log.Debugf("Subscribed client to room %s", string(clientPayload)) // Store the client info with the connection c.SetContext(cInfo) close(syncChan) return nil default: - <-syncChan + select { + case <-time.After(3 * time.Second): + c.Close() + return fmt.Errorf("timed out while waiting to set context") + case <-syncChan: + // This is fine, proceed + } + clientInfo := getClientSockInfo(c) if clientInfo == nil { return errors.New("Clientinfo nil in message handler main code path") diff --git a/server/internal/protocol/protocol.go b/server/internal/protocol/protocol.go index 5739709..ce6dbe9 100644 --- a/server/internal/protocol/protocol.go +++ b/server/internal/protocol/protocol.go @@ -17,10 +17,12 @@ import ( } */ -func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched bool, err error) { +func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched bool, sequenceNumber uint8, err error) { // TODO: detect client socket endianness and correct somewher else lmao + var seqNumber uint8 = 0 if twoBytePrefix { // little endian LOL + seqNumber = uint8(msgbuf[1]) msgbuf = append([]byte{msgbuf[0]}, msgbuf[2:]...) } @@ -31,30 +33,30 @@ func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched boo // Determine match prefix for target struct pref, ok := t.FieldByName("MatchPrefix") if !ok { - return false, errors.New("target missing required attribute MatchPrefix") + return false, 0, errors.New("target missing required attribute MatchPrefix") } if len(msgbuf) == 0 { - return false, errors.New("Empty message") + return false, 0, errors.New("Empty message") } msgPrefix, ok := pref.Tag.Lookup("ynoproto") if !ok { - return false, errors.New("Missing required MatchPrefix annotation ynoproto") + return false, 0, errors.New("Missing required MatchPrefix annotation ynoproto") } msgVal, err := hex.DecodeString(msgPrefix) if err != nil { - return false, fmt.Errorf("Failed to decode matchprefix %s to hex", msgPrefix) + return false, 0, fmt.Errorf("Failed to decode matchprefix %s to hex", msgPrefix) } if len(msgVal) != 1 { - return false, fmt.Errorf("msgVal has an incorrect length of %d", len(msgVal)) + return false, 0, fmt.Errorf("msgVal has an incorrect length of %d", len(msgVal)) } // This doesn't indicate an error; it just didn't match. if msgVal[0] != msgbuf[0] { - return false, nil + return false, 0, nil } hasString := false @@ -68,36 +70,36 @@ func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched boo fieldName := ft.Name if !f.IsValid() { - return false, fmt.Errorf("field %s is not valid", fieldName) + return false, 0, fmt.Errorf("field %s is not valid", fieldName) } if !f.CanSet() { - return false, fmt.Errorf("could not set field %s", fieldName) + return false, 0, fmt.Errorf("could not set field %s", fieldName) } switch f.Kind() { case reflect.Uint16: if len(msgbuf[i:]) < 2 { - return false, fmt.Errorf("invalid length for uint16") + return false, 0, fmt.Errorf("invalid length for uint16") } n := binary.LittleEndian.Uint16(msgbuf[i : i+2]) // Not sure if this is really valid for uint16 if f.OverflowUint(uint64(n)) { - return false, fmt.Errorf("provided value, %d, would overflow if assigned to struct type", n) + return false, 0, fmt.Errorf("provided value, %d, would overflow if assigned to struct type", n) } f.SetUint(uint64(n)) case reflect.Uint32: if len(msgbuf[i:]) < 4 { - return false, fmt.Errorf("invalid length for uint32") + return false, 0, fmt.Errorf("invalid length for uint32") } n := binary.LittleEndian.Uint32(msgbuf[i : i+4]) // Not sure if this is really valid for uint16 if f.OverflowUint(uint64(n)) { - return false, fmt.Errorf("provided value, %d, would overflow if assigned to struct type", n) + return false, 0, fmt.Errorf("provided value, %d, would overflow if assigned to struct type", n) } f.SetUint(uint64(n)) @@ -107,7 +109,7 @@ func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched boo f.SetString(string(msgbuf[i:])) default: - return false, errors.New("Unsupported type used in struct") + return false, 0, errors.New("Unsupported type used in struct") } i += int(fieldSize) @@ -115,8 +117,8 @@ func Marshal(msgbuf []byte, target interface{}, twoBytePrefix bool) (matched boo // Can't use the reflected size of the struct because of word alignment padding if !hasString && uint64(totalFieldSize) != uint64(len(msgbuf)) { - return false, fmt.Errorf("struct size %d did not match len of msgbuf %d", totalFieldSize, len(msgbuf)) + return false, 0, fmt.Errorf("struct size %d did not match len of msgbuf %d", totalFieldSize, len(msgbuf)) } - return true, nil + return true, seqNumber, nil } diff --git a/server/main.go b/server/main.go index d004d70..6981636 100644 --- a/server/main.go +++ b/server/main.go @@ -5,6 +5,7 @@ import ( "context" "net/http" "strings" + "sync" "github.com/horahoradev/YNO10k/internal/client" ynmetrics "github.com/horahoradev/YNO10k/internal/metrics" @@ -57,6 +58,7 @@ type messageServer struct { *gnet.EventServer pool *goroutine.Pool serviceMux msghandler.Handler + m *sync.Mutex } func newMessageServer(pool *goroutine.Pool) messageServer { @@ -67,9 +69,10 @@ func newMessageServer(pool *goroutine.Pool) messageServer { gh := msghandler.NewGameHandler(ps) sMux := msghandler.NewServiceMux(gh, ch, lh, ps) return messageServer{ - serviceMux: sMux, + serviceMux: &sMux, pool: pool, EventServer: &gnet.EventServer{}, + m: &sync.Mutex{}, } } @@ -130,24 +133,6 @@ func (ms messageServer) React(frame []byte, c gnet.Conn) (out []byte, action gne return nil, gnet.None } -/* - - if si.ClientInfo.IsClosed() { - ch.mapLock.Lock() - delete(ch.sockinfoFlushMap, key) - ch.mapLock.Unlock() - // return {type: "disconnect", uuid: socket.syncObject.uid}; - - err := ch.pubsubManager.Broadcast(&servermessages.DisconnectMessage{ - Type: "disconnect", - UUID: si.SyncObject.UID, - }, si) - if err != nil { - log.Errorf("Failed to broadcast disconnect message, continuing...") - } - continue - } -*/ func (ms messageServer) OnClosed(c gnet.Conn, err error) (action gnet.Action) { log.Errorf("Closing connection, err: %s. State: %s. ", err) // Send a disconnect broadcast LOL @@ -171,5 +156,5 @@ func main() { mServ := newMessageServer(p) log.Print("Listening on 443") - log.Fatal(gnet.Serve(mServ, "0.0.0.0:443")) + log.Fatal(gnet.Serve(mServ, "0.0.0.0:443", gnet.WithNumEventLoop(1), gnet.WithMulticore(false))) }