Go项目-海量用户通讯系统

Go项目-海量用户通讯系统

项目开发流程

  1. 实现客户端登录菜单以及简单的用户登录逻辑
  2. 实现用户登录(与服务器端进行通信验证用户的信息)
  3. 客户端可以发送消息的长度,服务器端可以接收消息的长度
  4. 客户端可以发送消息本身,服务器端可以接收消息
  5. 改进服务器端和客户端的结构,更易读
  6. 增加数据库验证,增加一层models,同时实现用户的注册和登录
  7. 维护用户在线列表
  8. 客户端发送消息
  9. 服务器端转发消息

项目需求说明

用户注册、用户登录、显示在线用户列表、群聊(广播)、点对点聊天、离线留言

项目代码编写

实现客户端登录菜单以及简单的用户登录逻辑

package main

import "fmt"

// 定义两个变量,一个表示用户ID,一个表示用户密码
var userId int
var userPwd string

func main() {

	// 接收用户的选择
	var key int
	// 判断是否还能继续显示菜单
	var loop = true
	// 循环展示菜单
	for loop {
		fmt.Println("---------------欢迎登录多人聊天系统---------------")
		fmt.Println("---------------   1 登录聊天室")
		fmt.Println("---------------    2 注册用户")
		fmt.Println("---------------    3 退出系统")
		fmt.Println("请选择(1-3):")
		fmt.Scanln(&key)
		switch key {
		case 1:
			fmt.Println("登录聊天室")
			loop = false
		case 2:
			fmt.Println("注册用户")
		case 3:
			fmt.Println("退出系统")
			loop = false
		default:
			fmt.Println("输入有误,请重新输入")
		}
	}
	if key == 1 {
		fmt.Println("请输入用户ID")
		fmt.Scanln(&userId)
		fmt.Println("请输入用户密码")
		fmt.Scanln(&userPwd)
		// 先把登录的函数写在另外一个文件
		err := login(userId, userPwd)
		if err != nil {
			fmt.Println("登录失败")
		} else {
			fmt.Println("登录成功")
		}
	} else if key == 2 {
		fmt.Println("进行用户注册的逻辑")
	}
}

登录逻辑的判断首先写在另外的文件中,后续再进行修改

package main

import "fmt"

func login(userId int, userPwd string) (err error) {
	fmt.Printf("userId=%d, userPed=%s\n", userId, userPwd)
	return nil
}

注意这种在同一个包下引用函数的方式需要在src文件夹之外进行编译,然后手动运行

实现用户登录(与服务器端进行通信验证用户的信息)

重点是如何发送包以及如何对包进行校验,同时要保证多线程

zESgG6.md.png

消息长度的发送与接收

要对发送的消息进行序列化等操作,首先定义好处理这些数据的结构体

package message

// 确定消息类型
const (
	LoginMesType    = "LoginMes"
	LoginResMesType = "LoginResMes"
)

type Message struct {
	Type string `json:"type"` // 消息类型
	Data string `json:"data"` // 消息内容
}

// 定义两个消息,后面需要再增加
type LoginMes struct {
	UserId   int    `json:"userId"`   // 用户Id
	UserPwd  string `json:"userPwd"`  // 用户密码
	UserName string `json:"userName"` // 用户名
}

type LoginResMes struct {
	Code  int    `json:"code"`  // 返回的状态码 500 表示用户未注册,200 表示成功
	Error string `json:"error"` // 返回错误信息
}

客户端发送消息(消息的长度)

package main

import (
	"Go-Projects/Mass-Communication-System/common/message"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"net"
)

func login(userId int, userPwd string) (err error) {
	// fmt.Printf("userId=%d, userPed=%s\n", userId, userPwd)
	// return nil
	// 连接到服务器端
	conn, err := net.Dial("tcp", "localhost:8889")
	if err != nil {
		fmt.Println("net.Dial err=", err)
		return
	}
	defer conn.Close()
	// 准备通过conn发送消息给服务
	var mes message.Message
	mes.Type = message.LoginMesType
	// 创建一个LoginMes结构体
	var loginMes message.LoginMes
	loginMes.UserId = userId
	loginMes.UserPwd = userPwd
	// 将loginMes序列化
	data, err := json.Marshal(loginMes)
	if err != nil {
		fmt.Println("json Marshal err=", err)
		return
	}
	mes.Data = string(data)
	// 将mes进行序列化
	data, err = json.Marshal(mes)
	if err != nil {
		fmt.Println("json Marshal err=", err)
		return
	}
	// data为发送的消息
	// 先把data的长度发送给服务器
	var pkgLen = uint32(len(data))
	var buf [4]byte
	binary.BigEndian.PutUint32(buf[0:4], pkgLen)
	//  发送长度
	n, err := conn.Write(buf[:4])
	if n != 4 || err != nil {
		fmt.Println("conn.Write err=", err)
		return
	}
	fmt.Println("客户端发送的消息长度为", len(data))
	fmt.Println("客户端发送的消息内容为", string(data))
	return
}

服务器端接收消息

package main

import (
	"fmt"
	"net"
)

func process(conn net.Conn) {
	// 延时关闭连接
	defer conn.Close()
	// 读取客户端发送的信息
	for {
		buf := make([]byte, 1024*4)
		fmt.Println("等待读取客户端发送的数据.....")
		n, err := conn.Read(buf[:4])
		if n != 4 || err != nil {
			fmt.Println("conn.Read err=", err)
			return
		}
		fmt.Println("读到的长度为", buf[:4])
	}

}

func main() {
	fmt.Println("服务器在8889端口监听.....")
	listen, err := net.Listen("tcp", "localhost:8889")
	defer listen.Close()
	if err != nil {
		fmt.Println("net.Listen err=", err)
		return
	}
	// 一旦监听成功,等待客户端连接服务器
	for {
		fmt.Println("等待客户端连接服务器.....")
		conn, err := listen.Accept()
		if err != nil {
			fmt.Println("listen.Accept err=", err)
		}
		// 一旦连接成功,则启动一个协程和客户端保持通讯
		go process(conn)
	}
}

客户端发送消息本身,服务器端进行接收

将服务器端的消息接收封装成一个函数

func readPkg(conn net.Conn) (mes message.Message, err error) {
	buf := make([]byte, 1024*4)
	fmt.Println("等待读取客户端发送的数据.....")
	_, err = conn.Read(buf[:4])
	if err != nil {
		fmt.Println("conn.Read err=", err)
		return
	}
	// fmt.Println("读到的长度为", buf[:4])
	// 转换为一个uint32类型
	var pkgLen = binary.BigEndian.Uint32(buf[0:4])
	//  发送长度
	n, err := conn.Read(buf[:pkgLen])
	if n != int(pkgLen) || err != nil {
		fmt.Println("conn.Read err=", err)
		return
	}
	// 把pkgLen反序列化成message
	err = json.Unmarshal(buf[:pkgLen], &mes)
	if err != nil {
		fmt.Println("json.Unmarshal err=", err)
		return
	}
	return
}

客户端发送消息

// 发送消息本身
_, err = conn.Write(data)
if err != nil {
	fmt.Println("conn.Write err=", err)
	return
}

完成登录的验证功能(相当于服务器发送消息,客户端接收)

服务器端封装一个发送消息的函数

func writePkg(conn net.Conn, data []byte) (err error) {
	// 先发送一个长度
	var pkgLen = uint32(len(data))
	var buf [4]byte
	binary.BigEndian.PutUint32(buf[0:4], pkgLen)
	//  发送长度
	_, err = conn.Write(buf[:4])
	if err != nil {
		fmt.Println("conn.Write err=", err)
		return
	}

	//发送data本身
	n, err := conn.Write(data)
	if n != int(pkgLen) || err != nil {
		fmt.Println("conn.Write err=", err)
		return
	}
	return
}

将这种请求通用化,为后面的其他消息做准备

// 编写serverProcessLogin函数,专门处理登录的请求
func serverProcessLogin(conn net.Conn, mes *message.Message) (err error) {
	// 从mes中取出data,并反序列化
	var loginMes message.LoginMes
	err = json.Unmarshal([]byte(mes.Data), &loginMes)
	if err != nil {
		fmt.Println("json.Unmarshal error, err=", err)
		return
	}
	// 先声明一个resMes
	var resMes message.Message
	resMes.Type = message.LoginResMesType
	// 声明一个LoginResMes
	var loginResMes message.LoginResMes
	// 如果用户的id为100,密码为123456,认为合法,否则不合法
	if loginMes.UserId == 100 && loginMes.UserPwd == "123456" {
		//合法
		loginResMes.Code = 200
	} else {
		//不合法
		loginResMes.Code = 500
		loginResMes.Error = "该用户不存在,请注册再使用..."
	}
	// 将loginResMes序列化
	data, err := json.Marshal(loginResMes)
	if err != nil {
		fmt.Println("json.Marshal error, err=", err)
		return
	}
	// 将data赋值给resMes
	resMes.Data = string(data)
	// 对resMes进行序列化,准备发送
	data, err = json.Marshal(resMes)
	if err != nil {
		fmt.Println("json.Marshal error, err=", err)
		return
	}
	// 发送data,封装到writePkg函数
	err = writePkg(conn, data)
	return
}

// 根据客户端发送消息种类不同,决定调用哪个函数来实现
func serverProcessMes(conn net.Conn, mes *message.Message) (err error) {
	switch mes.Type {
	case message.LoginMesType:
		// 处理登录的逻辑
		err = serverProcessLogin(conn, mes)
	case message.RegisterMesType:
		// 处理注册的逻辑
	default:
		fmt.Println("消息类型不存在,无法处理")
	}
	return
}

客户端对消息进行处理

// 处理服务器端返回的消息
mes, err = readPkg(conn)
if err != nil {
	fmt.Println("readPkg(conn) error, err=", err)
	return
}
// 将mes的data部分反序列化
var loginResMes message.LoginResMes
err = json.Unmarshal([]byte(mes.Data), &loginResMes)
if loginResMes.Code == 200 {
	fmt.Println("登录成功")
} else if loginResMes.Code == 500 {
	fmt.Println(loginResMes.Error)
}

改进服务器端和客户端的结构,更易读

zEDSBt.png

改进主要是将前面编写的函数封装进方法之中,减少不同函数之间参数的传递,通过结构体直接调用即可

客户端的改进增加了一个与服务器端保持联系的函数

// 和服务器端保持通讯
func serverProcessMes(conn net.Conn) {
	tf := &utils.Transfer{
		Conn: conn,
	}
	for {
		fmt.Println("客户端正在等待读取服务器发送的消息")
		mes, err := tf.ReadPkg()
		if err != nil {
			fmt.Println("tf.ReadPkg err=", err)
			return
		}
		// 如果读取到消息,下一步进行处理
		fmt.Println(mes)
	}
}

增加数据库验证,增加一层models,同时实现用户的注册和登录

MVC开发模式,增加models,从而从数据库中进行读取和接收,验证用户的有效性

models层

package model

import (
	"Go-Projects/Mass-Communication-System/common/message"
	"encoding/json"
	"fmt"

	"github.com/gomodule/redigo/redis"
)

// 使用工厂模式创建一个UserDao的实例
func NewUserDao(pool *redis.Pool) (userDao *UserDao) {
	userDao = &UserDao{
		pool: pool,
	}
	return
}

// 在服务器启动后初始化一个userDao实例
var (
	MyUserDao *UserDao
)

// 定义一个userDao的结构体
type UserDao struct {
	pool *redis.Pool
}

// 根据用户id返回user实例
func (ud *UserDao) getUserById(conn redis.Conn, id int) (user *User, err error) {
	res, err := redis.String(conn.Do("HGET", "users", id))
	if err != nil {
		if err == redis.ErrNil {
			err = ERROR_USER_NOTEXISTS
		}
		return
	}

	user = &User{}

	// 把res反序列化成User实例
	err = json.Unmarshal([]byte(res), user)
	if err != nil {
		fmt.Println("json.Unmarshal err=", err)
		return
	}
	return
}

// 完成登录的校验
func (ud *UserDao) Login(userId int, userPwd string) (user *User, err error) {
	conn := ud.pool.Get()
	defer conn.Close()
	user, err = ud.getUserById(conn, userId)
	if err != nil {
		return
	}
	if user.UserPwd != userPwd {
		err = ERROR_USER_PWD
		return
	}
	return
}

// 注册
func (ud *UserDao) Register(user *message.User) (err error) {
	conn := ud.pool.Get()
	defer conn.Close()
	_, err = ud.getUserById(conn, user.UserId)
	if err == nil {
		err = ERROR_USER_EXISTS
		return
	}
	// 说明该用户还没有注册过,则可以完成注册
	data, err := json.Marshal(user)
	if err != nil {
		return
	}
	_, err = conn.Do("HSET", "users", user.UserId, string(data))
	if err != nil {
		fmt.Println("保存注册用户错误,err=", err)
		return
	}
	return
}

处理注册的请求

// 编写ServerProcessRegister函数,专门处理注册的请求
func (u *UserProcess) ServerProcessRegister(mes *message.Message) (err error) {
	// 从mes中取出data,并反序列化
	var registerMes message.RegisterMes
	err = json.Unmarshal([]byte(mes.Data), &registerMes)
	if err != nil {
		fmt.Println("json.Unmarshal error, err=", err)
		return
	}
	// 先声明一个resMes
	var resMes message.Message
	resMes.Type = message.RegisterResMesType
	// 声明一个RegisterResMes
	var registerResMes message.RegisterResMes

	err = model.MyUserDao.Register(&registerMes.User)

	if err != nil {
		if err == model.ERROR_USER_EXISTS {
			registerResMes.Code = 505
			registerResMes.Error = err.Error()
		} else {
			registerResMes.Code = 506
			registerResMes.Error = "注册发生未知错误"
		}

	} else {
		registerResMes.Code = 200
	}

	// 将loginResMes序列化
	data, err := json.Marshal(registerResMes)
	if err != nil {
		fmt.Println("json.Marshal error, err=", err)
		return
	}
	// 将data赋值给resMes
	resMes.Data = string(data)
	// 对resMes进行序列化,准备发送
	data, err = json.Marshal(resMes)
	if err != nil {
		fmt.Println("json.Marshal error, err=", err)
		return
	}
	// 发送data,封装到writePkg函数
	tf := &utils.Transfer{
		Conn: u.Conn,
	}
	err = tf.WritePkg(data)
	return
}

维护用户在线列表

完成对当前在线用户的增删改查

package process2

import "fmt"

// 在服务器端实例只有一个,在很多的地方都会使用到

var (
	userMgr *UserMgr
)

type UserMgr struct {
	onlineUsers map[int]*UserProcess
}

// 完成对userMgr初始化工作
func init() {
	userMgr = &UserMgr{
		onlineUsers: make(map[int]*UserProcess, 1024),
	}
}

// 完成对onlineUsers的增删改查

func (um *UserMgr) AddOnlineUser(up *UserProcess) {
	um.onlineUsers[up.UserId] = up
}

func (um *UserMgr) DelOnlineUser(userId int) {
	delete(um.onlineUsers, userId)
}

func (um *UserMgr) GetAllOnlineUser() map[int]*UserProcess {
	return um.onlineUsers
}

func (um *UserMgr) GetOnlineUserById(userId int) (up *UserProcess, err error) {
	up, ok := um.onlineUsers[userId]
	if !ok {
		err = fmt.Errorf("用户%d不存在", userId)
		return
	}
	return
}

显示当前在线用户列表

// 因为用户登录成功,要将用户放入全局变量中以返回列表
u.UserId = loginMes.UserId
userMgr.AddOnlineUser(u)
// 将当前在线用户的id放入到loginResMes.UsersIds
for id := range userMgr.onlineUsers {
	loginResMes.UsersIds = append(loginResMes.UsersIds, id)
}
fmt.Println(user, "登录成功")
// 显示当前在线用户列表
fmt.Println("当前在线用户列表如下:")
for _, v := range loginResMes.UsersIds {
	fmt.Println("用户id,\t", v)
}

服务器端对用户列表进行处理

// 通知所有用户在线
func (u *UserProcess) NotifyOthersOnlineUser(userId int) {
	for id, up := range userMgr.onlineUsers {
		if id == userId {
			continue
		}
		up.NotifyMeOnline(userId)
	}

}

func (u *UserProcess) NotifyMeOnline(userId int) {
	var mes message.Message
	mes.Type = message.NotifyUserStatusMesType
	var notifyUserStatusMes message.NotifyUserStatusMes
	notifyUserStatusMes.UserId = userId
	notifyUserStatusMes.Status = message.UserOnline

	data, err := json.Marshal(notifyUserStatusMes)
	if err != nil {
		fmt.Println("json.Marshal err=", err)
		return
	}
	mes.Data = string(data)

	data, err = json.Marshal(mes)
	if err != nil {
		fmt.Println("json.Marshal err=", err)
		return
	}
	tf := &utils.Transfer{
		Conn: u.Conn,
	}
	err = tf.WritePkg(data)
	if err != nil {
		fmt.Println("NotifyMeOnline err=", err)
	}
}

客户端对用户列表进行处理

package process

import (
	"Go-Projects/Mass-Communication-System/common/message"
	"fmt"
)

// 客户端要维护的map
var onlineUsers map[int]*message.User = make(map[int]*message.User, 10)

// 在客户端显示当前在线的用户

func outputOnlineUser() {
	fmt.Println("当前在线用户列表")
	for id, user := range onlineUsers {
		fmt.Println(id, user)
	}
}

// 处理返回的NotifyUserStatusMes
func updateUserStatus(notifyUserStatusMes *message.NotifyUserStatusMes) {

	user, ok := onlineUsers[notifyUserStatusMes.UserId]
	if !ok {
		user = &message.User{
			UserId: notifyUserStatusMes.UserId,
		}
	}
	user.UserStatus = notifyUserStatusMes.Status
	onlineUsers[notifyUserStatusMes.UserId] = user
	outputOnlineUser()
}

客户端显示用户列表

// 显示当前在线用户列表
fmt.Println("当前在线用户列表如下:")
for _, v := range loginResMes.UsersIds {
	fmt.Println("用户id,\t", v)
	user := &message.User{
		UserId:     v,
		UserStatus: message.UserOnline,
	}
	onlineUsers[v] = user
}

客户端发送消息

直接调用前面写好的就行,代码很少了

package process

import (
	"Go-Projects/Mass-Communication-System/client/utils"
	"Go-Projects/Mass-Communication-System/common/message"
	"encoding/json"
	"fmt"
)

type SmsProecss struct {
}

func (sp *SmsProecss) SendGroupSms(content string) (err error) {
	var mes message.Message
	mes.Type = message.SmsMesType

	var smsMes message.SmsMes
	smsMes.Content = content
	smsMes.UserId = CurUser.UserId
	smsMes.UserStatus = CurUser.UserStatus

	data, err := json.Marshal(smsMes)
	if err != nil {
		fmt.Println("json.Marshal err=", err)
		return
	}
	mes.Data = string(data)
	data, err = json.Marshal(mes)
	if err != nil {
		fmt.Println("json.Marshal err=", err)
		return
	}
	tf := &utils.Transfer{
		Conn: CurUser.Conn,
	}
	err = tf.WritePkg(data)
	if err != nil {
		fmt.Println("tf.WritePkg err=", err)
		return
	}
	return
}

服务器端转发消息

也是和上面的差不多

package process2

import (
	"Go-Projects/Mass-Communication-System/common/message"
	"Go-Projects/Mass-Communication-System/server/utils"
	"encoding/json"
	"fmt"
	"net"
)

type SmsProecss struct {
}

func (sp *SmsProecss) SendGroupSms(mes *message.Message) (err error) {

	var smsMes message.SmsMes
	err = json.Unmarshal([]byte(mes.Data), &smsMes)
	if err != nil {
		fmt.Println("json.Unmarshal err=", err)
		return
	}

	data, err := json.Marshal(mes)
	if err != nil {
		fmt.Println("json.Marshal err=", err)
		return
	}

	for id, up := range userMgr.onlineUsers {
		if id == smsMes.UserId {
			continue
		}
		sp.SendMesToEachOnlineUser(data, up.Conn)
	}
	return
}

func (sp *SmsProecss) SendMesToEachOnlineUser(data []byte, conn net.Conn) (err error) {

	tf := &utils.Transfer{
		Conn: conn,
	}
	err = tf.WritePkg(data)
	if err != nil {
		fmt.Println("tf.WritePkg err=", err)
		return
	}
	return
}

Go项目-海量用户通讯系统
https://zhangzhao219.github.io/2022/11/11/Go/Go-Project-Mass-Communication-System/
作者
Zhang Zhao
发布于
2022年11月11日
许可协议