Go项目-海量用户通讯系统
Go项目-海量用户通讯系统
项目开发流程
- 实现客户端登录菜单以及简单的用户登录逻辑
- 实现用户登录(与服务器端进行通信验证用户的信息)
- 客户端可以发送消息的长度,服务器端可以接收消息的长度
- 客户端可以发送消息本身,服务器端可以接收消息
- 改进服务器端和客户端的结构,更易读
- 增加数据库验证,增加一层models,同时实现用户的注册和登录
- 维护用户在线列表
- 客户端发送消息
- 服务器端转发消息
项目需求说明
用户注册、用户登录、显示在线用户列表、群聊(广播)、点对点聊天、离线留言
项目代码编写
实现客户端登录菜单以及简单的用户登录逻辑
package main
import "fmt"
// 定义两个变量,一个表示用户ID,一个表示用户密码
var userId int
var userPwd string
func main() {
// 接收用户的选择
var key int
// 判断是否还能继续显示菜单
var loop = true
// 循环展示菜单
for loop {
fmt.Println("---------------欢迎登录多人聊天系统---------------")
fmt.Println("--------------- 1 登录聊天室")
fmt.Println("--------------- 2 注册用户")
fmt.Println("--------------- 3 退出系统")
fmt.Println("请选择(1-3):")
fmt.Scanln(&key)
switch key {
case 1:
fmt.Println("登录聊天室")
loop = false
case 2:
fmt.Println("注册用户")
case 3:
fmt.Println("退出系统")
loop = false
default:
fmt.Println("输入有误,请重新输入")
}
}
if key == 1 {
fmt.Println("请输入用户ID")
fmt.Scanln(&userId)
fmt.Println("请输入用户密码")
fmt.Scanln(&userPwd)
// 先把登录的函数写在另外一个文件
err := login(userId, userPwd)
if err != nil {
fmt.Println("登录失败")
} else {
fmt.Println("登录成功")
}
} else if key == 2 {
fmt.Println("进行用户注册的逻辑")
}
}登录逻辑的判断首先写在另外的文件中,后续再进行修改
package main
import "fmt"
func login(userId int, userPwd string) (err error) {
fmt.Printf("userId=%d, userPed=%s\n", userId, userPwd)
return nil
}注意这种在同一个包下引用函数的方式需要在src文件夹之外进行编译,然后手动运行
实现用户登录(与服务器端进行通信验证用户的信息)
重点是如何发送包以及如何对包进行校验,同时要保证多线程
消息长度的发送与接收
要对发送的消息进行序列化等操作,首先定义好处理这些数据的结构体
package message
// 确定消息类型
const (
LoginMesType = "LoginMes"
LoginResMesType = "LoginResMes"
)
type Message struct {
Type string `json:"type"` // 消息类型
Data string `json:"data"` // 消息内容
}
// 定义两个消息,后面需要再增加
type LoginMes struct {
UserId int `json:"userId"` // 用户Id
UserPwd string `json:"userPwd"` // 用户密码
UserName string `json:"userName"` // 用户名
}
type LoginResMes struct {
Code int `json:"code"` // 返回的状态码 500 表示用户未注册,200 表示成功
Error string `json:"error"` // 返回错误信息
}客户端发送消息(消息的长度)
package main
import (
"Go-Projects/Mass-Communication-System/common/message"
"encoding/binary"
"encoding/json"
"fmt"
"net"
)
func login(userId int, userPwd string) (err error) {
// fmt.Printf("userId=%d, userPed=%s\n", userId, userPwd)
// return nil
// 连接到服务器端
conn, err := net.Dial("tcp", "localhost:8889")
if err != nil {
fmt.Println("net.Dial err=", err)
return
}
defer conn.Close()
// 准备通过conn发送消息给服务
var mes message.Message
mes.Type = message.LoginMesType
// 创建一个LoginMes结构体
var loginMes message.LoginMes
loginMes.UserId = userId
loginMes.UserPwd = userPwd
// 将loginMes序列化
data, err := json.Marshal(loginMes)
if err != nil {
fmt.Println("json Marshal err=", err)
return
}
mes.Data = string(data)
// 将mes进行序列化
data, err = json.Marshal(mes)
if err != nil {
fmt.Println("json Marshal err=", err)
return
}
// data为发送的消息
// 先把data的长度发送给服务器
var pkgLen = uint32(len(data))
var buf [4]byte
binary.BigEndian.PutUint32(buf[0:4], pkgLen)
// 发送长度
n, err := conn.Write(buf[:4])
if n != 4 || err != nil {
fmt.Println("conn.Write err=", err)
return
}
fmt.Println("客户端发送的消息长度为", len(data))
fmt.Println("客户端发送的消息内容为", string(data))
return
}
服务器端接收消息
package main
import (
"fmt"
"net"
)
func process(conn net.Conn) {
// 延时关闭连接
defer conn.Close()
// 读取客户端发送的信息
for {
buf := make([]byte, 1024*4)
fmt.Println("等待读取客户端发送的数据.....")
n, err := conn.Read(buf[:4])
if n != 4 || err != nil {
fmt.Println("conn.Read err=", err)
return
}
fmt.Println("读到的长度为", buf[:4])
}
}
func main() {
fmt.Println("服务器在8889端口监听.....")
listen, err := net.Listen("tcp", "localhost:8889")
defer listen.Close()
if err != nil {
fmt.Println("net.Listen err=", err)
return
}
// 一旦监听成功,等待客户端连接服务器
for {
fmt.Println("等待客户端连接服务器.....")
conn, err := listen.Accept()
if err != nil {
fmt.Println("listen.Accept err=", err)
}
// 一旦连接成功,则启动一个协程和客户端保持通讯
go process(conn)
}
}
客户端发送消息本身,服务器端进行接收
将服务器端的消息接收封装成一个函数
func readPkg(conn net.Conn) (mes message.Message, err error) {
buf := make([]byte, 1024*4)
fmt.Println("等待读取客户端发送的数据.....")
_, err = conn.Read(buf[:4])
if err != nil {
fmt.Println("conn.Read err=", err)
return
}
// fmt.Println("读到的长度为", buf[:4])
// 转换为一个uint32类型
var pkgLen = binary.BigEndian.Uint32(buf[0:4])
// 发送长度
n, err := conn.Read(buf[:pkgLen])
if n != int(pkgLen) || err != nil {
fmt.Println("conn.Read err=", err)
return
}
// 把pkgLen反序列化成message
err = json.Unmarshal(buf[:pkgLen], &mes)
if err != nil {
fmt.Println("json.Unmarshal err=", err)
return
}
return
}客户端发送消息
// 发送消息本身
_, err = conn.Write(data)
if err != nil {
fmt.Println("conn.Write err=", err)
return
}完成登录的验证功能(相当于服务器发送消息,客户端接收)
服务器端封装一个发送消息的函数
func writePkg(conn net.Conn, data []byte) (err error) {
// 先发送一个长度
var pkgLen = uint32(len(data))
var buf [4]byte
binary.BigEndian.PutUint32(buf[0:4], pkgLen)
// 发送长度
_, err = conn.Write(buf[:4])
if err != nil {
fmt.Println("conn.Write err=", err)
return
}
//发送data本身
n, err := conn.Write(data)
if n != int(pkgLen) || err != nil {
fmt.Println("conn.Write err=", err)
return
}
return
}将这种请求通用化,为后面的其他消息做准备
// 编写serverProcessLogin函数,专门处理登录的请求
func serverProcessLogin(conn net.Conn, mes *message.Message) (err error) {
// 从mes中取出data,并反序列化
var loginMes message.LoginMes
err = json.Unmarshal([]byte(mes.Data), &loginMes)
if err != nil {
fmt.Println("json.Unmarshal error, err=", err)
return
}
// 先声明一个resMes
var resMes message.Message
resMes.Type = message.LoginResMesType
// 声明一个LoginResMes
var loginResMes message.LoginResMes
// 如果用户的id为100,密码为123456,认为合法,否则不合法
if loginMes.UserId == 100 && loginMes.UserPwd == "123456" {
//合法
loginResMes.Code = 200
} else {
//不合法
loginResMes.Code = 500
loginResMes.Error = "该用户不存在,请注册再使用..."
}
// 将loginResMes序列化
data, err := json.Marshal(loginResMes)
if err != nil {
fmt.Println("json.Marshal error, err=", err)
return
}
// 将data赋值给resMes
resMes.Data = string(data)
// 对resMes进行序列化,准备发送
data, err = json.Marshal(resMes)
if err != nil {
fmt.Println("json.Marshal error, err=", err)
return
}
// 发送data,封装到writePkg函数
err = writePkg(conn, data)
return
}
// 根据客户端发送消息种类不同,决定调用哪个函数来实现
func serverProcessMes(conn net.Conn, mes *message.Message) (err error) {
switch mes.Type {
case message.LoginMesType:
// 处理登录的逻辑
err = serverProcessLogin(conn, mes)
case message.RegisterMesType:
// 处理注册的逻辑
default:
fmt.Println("消息类型不存在,无法处理")
}
return
}客户端对消息进行处理
// 处理服务器端返回的消息
mes, err = readPkg(conn)
if err != nil {
fmt.Println("readPkg(conn) error, err=", err)
return
}
// 将mes的data部分反序列化
var loginResMes message.LoginResMes
err = json.Unmarshal([]byte(mes.Data), &loginResMes)
if loginResMes.Code == 200 {
fmt.Println("登录成功")
} else if loginResMes.Code == 500 {
fmt.Println(loginResMes.Error)
}改进服务器端和客户端的结构,更易读
改进主要是将前面编写的函数封装进方法之中,减少不同函数之间参数的传递,通过结构体直接调用即可
客户端的改进增加了一个与服务器端保持联系的函数
// 和服务器端保持通讯
func serverProcessMes(conn net.Conn) {
tf := &utils.Transfer{
Conn: conn,
}
for {
fmt.Println("客户端正在等待读取服务器发送的消息")
mes, err := tf.ReadPkg()
if err != nil {
fmt.Println("tf.ReadPkg err=", err)
return
}
// 如果读取到消息,下一步进行处理
fmt.Println(mes)
}
}增加数据库验证,增加一层models,同时实现用户的注册和登录
MVC开发模式,增加models,从而从数据库中进行读取和接收,验证用户的有效性
models层
package model
import (
"Go-Projects/Mass-Communication-System/common/message"
"encoding/json"
"fmt"
"github.com/gomodule/redigo/redis"
)
// 使用工厂模式创建一个UserDao的实例
func NewUserDao(pool *redis.Pool) (userDao *UserDao) {
userDao = &UserDao{
pool: pool,
}
return
}
// 在服务器启动后初始化一个userDao实例
var (
MyUserDao *UserDao
)
// 定义一个userDao的结构体
type UserDao struct {
pool *redis.Pool
}
// 根据用户id返回user实例
func (ud *UserDao) getUserById(conn redis.Conn, id int) (user *User, err error) {
res, err := redis.String(conn.Do("HGET", "users", id))
if err != nil {
if err == redis.ErrNil {
err = ERROR_USER_NOTEXISTS
}
return
}
user = &User{}
// 把res反序列化成User实例
err = json.Unmarshal([]byte(res), user)
if err != nil {
fmt.Println("json.Unmarshal err=", err)
return
}
return
}
// 完成登录的校验
func (ud *UserDao) Login(userId int, userPwd string) (user *User, err error) {
conn := ud.pool.Get()
defer conn.Close()
user, err = ud.getUserById(conn, userId)
if err != nil {
return
}
if user.UserPwd != userPwd {
err = ERROR_USER_PWD
return
}
return
}
// 注册
func (ud *UserDao) Register(user *message.User) (err error) {
conn := ud.pool.Get()
defer conn.Close()
_, err = ud.getUserById(conn, user.UserId)
if err == nil {
err = ERROR_USER_EXISTS
return
}
// 说明该用户还没有注册过,则可以完成注册
data, err := json.Marshal(user)
if err != nil {
return
}
_, err = conn.Do("HSET", "users", user.UserId, string(data))
if err != nil {
fmt.Println("保存注册用户错误,err=", err)
return
}
return
}
处理注册的请求
// 编写ServerProcessRegister函数,专门处理注册的请求
func (u *UserProcess) ServerProcessRegister(mes *message.Message) (err error) {
// 从mes中取出data,并反序列化
var registerMes message.RegisterMes
err = json.Unmarshal([]byte(mes.Data), ®isterMes)
if err != nil {
fmt.Println("json.Unmarshal error, err=", err)
return
}
// 先声明一个resMes
var resMes message.Message
resMes.Type = message.RegisterResMesType
// 声明一个RegisterResMes
var registerResMes message.RegisterResMes
err = model.MyUserDao.Register(®isterMes.User)
if err != nil {
if err == model.ERROR_USER_EXISTS {
registerResMes.Code = 505
registerResMes.Error = err.Error()
} else {
registerResMes.Code = 506
registerResMes.Error = "注册发生未知错误"
}
} else {
registerResMes.Code = 200
}
// 将loginResMes序列化
data, err := json.Marshal(registerResMes)
if err != nil {
fmt.Println("json.Marshal error, err=", err)
return
}
// 将data赋值给resMes
resMes.Data = string(data)
// 对resMes进行序列化,准备发送
data, err = json.Marshal(resMes)
if err != nil {
fmt.Println("json.Marshal error, err=", err)
return
}
// 发送data,封装到writePkg函数
tf := &utils.Transfer{
Conn: u.Conn,
}
err = tf.WritePkg(data)
return
}维护用户在线列表
完成对当前在线用户的增删改查
package process2
import "fmt"
// 在服务器端实例只有一个,在很多的地方都会使用到
var (
userMgr *UserMgr
)
type UserMgr struct {
onlineUsers map[int]*UserProcess
}
// 完成对userMgr初始化工作
func init() {
userMgr = &UserMgr{
onlineUsers: make(map[int]*UserProcess, 1024),
}
}
// 完成对onlineUsers的增删改查
func (um *UserMgr) AddOnlineUser(up *UserProcess) {
um.onlineUsers[up.UserId] = up
}
func (um *UserMgr) DelOnlineUser(userId int) {
delete(um.onlineUsers, userId)
}
func (um *UserMgr) GetAllOnlineUser() map[int]*UserProcess {
return um.onlineUsers
}
func (um *UserMgr) GetOnlineUserById(userId int) (up *UserProcess, err error) {
up, ok := um.onlineUsers[userId]
if !ok {
err = fmt.Errorf("用户%d不存在", userId)
return
}
return
}
显示当前在线用户列表
// 因为用户登录成功,要将用户放入全局变量中以返回列表
u.UserId = loginMes.UserId
userMgr.AddOnlineUser(u)
// 将当前在线用户的id放入到loginResMes.UsersIds
for id := range userMgr.onlineUsers {
loginResMes.UsersIds = append(loginResMes.UsersIds, id)
}
fmt.Println(user, "登录成功")// 显示当前在线用户列表
fmt.Println("当前在线用户列表如下:")
for _, v := range loginResMes.UsersIds {
fmt.Println("用户id,\t", v)
}服务器端对用户列表进行处理
// 通知所有用户在线
func (u *UserProcess) NotifyOthersOnlineUser(userId int) {
for id, up := range userMgr.onlineUsers {
if id == userId {
continue
}
up.NotifyMeOnline(userId)
}
}
func (u *UserProcess) NotifyMeOnline(userId int) {
var mes message.Message
mes.Type = message.NotifyUserStatusMesType
var notifyUserStatusMes message.NotifyUserStatusMes
notifyUserStatusMes.UserId = userId
notifyUserStatusMes.Status = message.UserOnline
data, err := json.Marshal(notifyUserStatusMes)
if err != nil {
fmt.Println("json.Marshal err=", err)
return
}
mes.Data = string(data)
data, err = json.Marshal(mes)
if err != nil {
fmt.Println("json.Marshal err=", err)
return
}
tf := &utils.Transfer{
Conn: u.Conn,
}
err = tf.WritePkg(data)
if err != nil {
fmt.Println("NotifyMeOnline err=", err)
}
}客户端对用户列表进行处理
package process
import (
"Go-Projects/Mass-Communication-System/common/message"
"fmt"
)
// 客户端要维护的map
var onlineUsers map[int]*message.User = make(map[int]*message.User, 10)
// 在客户端显示当前在线的用户
func outputOnlineUser() {
fmt.Println("当前在线用户列表")
for id, user := range onlineUsers {
fmt.Println(id, user)
}
}
// 处理返回的NotifyUserStatusMes
func updateUserStatus(notifyUserStatusMes *message.NotifyUserStatusMes) {
user, ok := onlineUsers[notifyUserStatusMes.UserId]
if !ok {
user = &message.User{
UserId: notifyUserStatusMes.UserId,
}
}
user.UserStatus = notifyUserStatusMes.Status
onlineUsers[notifyUserStatusMes.UserId] = user
outputOnlineUser()
}
客户端显示用户列表
// 显示当前在线用户列表
fmt.Println("当前在线用户列表如下:")
for _, v := range loginResMes.UsersIds {
fmt.Println("用户id,\t", v)
user := &message.User{
UserId: v,
UserStatus: message.UserOnline,
}
onlineUsers[v] = user
}客户端发送消息
直接调用前面写好的就行,代码很少了
package process
import (
"Go-Projects/Mass-Communication-System/client/utils"
"Go-Projects/Mass-Communication-System/common/message"
"encoding/json"
"fmt"
)
type SmsProecss struct {
}
func (sp *SmsProecss) SendGroupSms(content string) (err error) {
var mes message.Message
mes.Type = message.SmsMesType
var smsMes message.SmsMes
smsMes.Content = content
smsMes.UserId = CurUser.UserId
smsMes.UserStatus = CurUser.UserStatus
data, err := json.Marshal(smsMes)
if err != nil {
fmt.Println("json.Marshal err=", err)
return
}
mes.Data = string(data)
data, err = json.Marshal(mes)
if err != nil {
fmt.Println("json.Marshal err=", err)
return
}
tf := &utils.Transfer{
Conn: CurUser.Conn,
}
err = tf.WritePkg(data)
if err != nil {
fmt.Println("tf.WritePkg err=", err)
return
}
return
}
服务器端转发消息
也是和上面的差不多
package process2
import (
"Go-Projects/Mass-Communication-System/common/message"
"Go-Projects/Mass-Communication-System/server/utils"
"encoding/json"
"fmt"
"net"
)
type SmsProecss struct {
}
func (sp *SmsProecss) SendGroupSms(mes *message.Message) (err error) {
var smsMes message.SmsMes
err = json.Unmarshal([]byte(mes.Data), &smsMes)
if err != nil {
fmt.Println("json.Unmarshal err=", err)
return
}
data, err := json.Marshal(mes)
if err != nil {
fmt.Println("json.Marshal err=", err)
return
}
for id, up := range userMgr.onlineUsers {
if id == smsMes.UserId {
continue
}
sp.SendMesToEachOnlineUser(data, up.Conn)
}
return
}
func (sp *SmsProecss) SendMesToEachOnlineUser(data []byte, conn net.Conn) (err error) {
tf := &utils.Transfer{
Conn: conn,
}
err = tf.WritePkg(data)
if err != nil {
fmt.Println("tf.WritePkg err=", err)
return
}
return
}
Go项目-海量用户通讯系统
https://zhangzhao219.github.io/2022/11/11/Go/Go-Project-Mass-Communication-System/

