- 并发能更客观地表现问题模型;
- 并发可以充分利用CPU核心的优势,提高程序的执行效率;
- 并发能充分利用CPU与其他硬件设备固有的异步性。
1 传统, 主流的 并发
实现模型
多进程。
多进程是在操作系统层面
进行并发的基本模式。同时也是开销最大
的模式。
在Linux平台上,很多工具链正是采用这种模式在工作。比如某个Web服务器,它会有专门 的进程负责网络端口的监听和链接管理,还会有专门的进程负责事务和运算。这种方法 的好处在于简单、进程间互不影响,坏处在于系统开销大,因为所有的进程都是由内核 管理的。
多线程。
多线程在大部分操作系统上都属于系统层面
的并发模式,也是我们使用最多的
最有效的一种模式。
目前,我们所见的几乎所有工具链都会使用这种模式。它比多进程
的开销小很多
,但是其开销依旧比较大
,且在高并发模式下,效率会有影响
。
基于回调的非阻塞/异步IO。
这种架构的诞生实际上来源于多线程模式的危机
。在很多
高并发服务器开发实践中,使用多线程模式会很快耗尽服务器的内存和CPU资源
。
而这种模式通过事件驱动的方式
, 使用异步IO
,使服务器持续运转
,且尽可能地少用线程
,降
低开销
。
它目前在Node.js中得到了很好的实践
。
但是使用这种模式,编程比多线程要复杂
,因为它把流程做了分割
,对于问题本身的反应不够自然。
协程。
协程(Coroutine)本质上是一种用户态线程
,不需要操作系统
来进行抢占式调度
,
且在真正的实现中寄存于线程中
,因此,系统开销极小
,可以有效提高线程的任务并发
性,而避免多线程的缺点。
使用协程的优点是编程简单
,结构清晰
;缺点是需要语言的
支持
,如果不支持,则需要用户在程序中自行实现调度器。目前,原生支持协程的语言还很少。
传统并发模型的缺陷
串行的事务具有确定性
线程类并发模式
在原先的确定性中引入了不确定性
,这种不确定性给程序的行为带来了意外和危害
线程之间通信
只能采用共享内存的方式
1 共享内存系统
保证共享内存的有效性
我们采取了很多措施,比如加锁等
2 消息传递系统
对线程间共享状态的各种操作都被封装在线程之间传递的消息中
发送消息时对状态进行复制,并且在消息传递的边界上交出这个状态的所有权
由于需要执行复制操作,所以大多数消息传递的实现在性能上并不优越,但线程中的状态管理工作通常会变得更
为简单。
执行体
的概念
操作系统层面有多个概念与之对应,比如操作系统自己掌管的
进程(process)
、进程内的线程(thread)
以及进程内的协程(coroutine,也叫轻量级线程)
与传统的系统级线程和进程相比,协程
的最大优势在于其“轻量级”
,可以轻松创建上百万个
而不
会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。
多数语言在语法层面并不直接支持协程
, 而是通过库的方式支持,但用库的方式支持的功能
也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。
如果在这样的轻量级线程中调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞
其他的并发执行轻量级线程,
从而无法真正达到轻量级线程本身期望达到的目标
goroutine
goroutine是Go语言
中的轻量级线程实现
,由Go运行时(runtime)管理。
2 goroutine 之间的并发通信
并发编程的难度在于协调,而协调就要通过交流
在工程上,有两种最常见的并发通信模型
:
`共享数据`
`消息`
1 共享数据
即共享内存 (加锁)
是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享
被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。
在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存。
package main
import "fmt"
import "sync"
import "runtime"
var counter int = 0
func Count(lock *sync.Mutex) {
lock.Lock()
counter++
fmt.Println(z)
lock.Unlock()
}
func main() {
lock := &sync.Mutex{}
for i := 0; i < 10; i++ {
go Count(lock)
}
for {
lock.Lock()
c := counter
lock.Unlock()
runtime.Gosched()
if c >= 10 {
break
}
}
}
我们在10个goroutine中共享了变量counter。每个goroutine执行完成后,
将counter的值加1。因为10个goroutine是并发执行的,所以我们还引入了锁,也就是代码中的
lock变量。每次对n的操作,都要先将锁锁住,操作完成后,再将锁打开。在主函数中,使用for
循环来不断检查counter的值(同样需要加锁)。当其值达到10时,说明所有goroutine都执行完
毕了,这时主函数返回,程序退出。
使用共享内存数据的方式, 代码很复杂臃肿
2 通信模型
(消息机制)channel
而非共享内存作为通信方式
不要通过共享内存来通信,而应该通过通信来共享内存
package main
import "fmt"
func Count(ch chan int) {
ch <- 1
fmt.Println("Counting")
}
func main() {
chs := make([]chan int, 10)
for i := 0; i < 10; i++ {
chs[i] = make(chan int)
go Count(chs[i])
}
for _, ch := range(chs) {
<-ch
}
time.Sleep(5 * time.Second)
}
3 channel的用法
var chanName
chan ElementType
var ch chan int
var m map[string] chan bool
初始化
ch := make(chan int)
数据写入
ch <- value
数据读取
value := <-ch
向channel写入数据
通常会导致程序阻塞
,直到有其他goroutine从这个channel中读取数据
。
如果channel之前没有写入数据,那么从channel中读取数据也会导致程序阻塞,直到channel中被写入数据为止
4 select 机制
在Unix时代
,select机制
就已经被引入。
通过调用select()函数
来监控一系列的文件句柄
,一旦其中一个文件句柄发生了IO动作
,该select()调用就会被返回
后来该机制也被用于实现高并发的Socket服务器程序
。
Go语言
直接在语言级别支持select关键字,用于处理异步IO问题
select语句有比较多的限制,其中最大的一条限制就是每个case语句
里必须是一个IO操作
select {
case <- chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
第一个case试图从chan1读取 一个数据并直接忽略读到的数据,
而第二个case则是试图向chan2中写入一个整型数1,如果这
两者都没有成功,则到达default语句。
5 带缓存区的channel
给channel带上缓冲, 达到消息队列的效果
c := make(chan int, 1024)
比如上面这个例子就创建了一个大小 为1024的int类型channel,
在调用make()时将缓冲区大小
作为第二个参数传入即可
写入
即使没有读取方,写入方也可以一直往channel里写入
,在缓冲区被 填完之前都不会阻塞
。
读取
读取数据可以使用与常规非缓冲channel完全一致的方法,
我们也可以使用range关键来实现更为简便的循环读取:
for i := range c {
fmt.Println("Received:", i)
}
6 超时机制
向channel写数据时发现channel已满
,或者从channel试图读取数据时发现channel为空
如果不正确处理这些情况,很可能会导致整个goroutine锁死
Go语言没有提供直接的超时处理机制
, 但是可以通过 select
机制实现
select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况
// 首先,我们实现并执行一个匿名的超时等待函数
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // 等待1秒钟
timeout <- true
}()
// 然后我们把timeout这个channel利用起来
select {
case <-ch:
// 从ch中读取到数据
case <-timeout:
// 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
这样使用select机制可以避免永久等待的问题因为程序会在timeout中获取到一个数据
后继续执行,无论对ch的读取是否还处于等待状态,从而达成1秒超时的效果
7 单向的channel (实践最小权限原则)
单向channel只能用于发送或者接收数据
单向channel变量的声明
var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读取int数据
channel是一个原生类型 支持被传递,还支持类型转换
ch4 := make(chan int)
ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel
下单向channel的用法
func Parse(ch <-chan int) {
for value := range ch {
fmt.Println("Parsing value", value)
}
}
8 关闭 channel
close(ch)
判断是否已经关闭
x, ok := <-ch
类似于 map的值获取
x, ok := m[key]
只需要看第二个bool返回值即可,如果返回值是false则表示ch已经被关闭。
9 让出时间片
可以在每个goroutine中控制何时主动出让
时间片
给其他goroutine,这可以使用runtime包中的Gosched()函数
实现
3 同步 Go的资源锁
1 同步锁
sync.Mutex
当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等到这个goroutine释放该Mutex
写锁(调用Lock()
方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine
独占。
sync.RWMutex
RWMutex相对友好些,是经典的单写多读
模型
在读锁占用的情 况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()
方法;
锁的典型使用模式如下:
var l sync.Mutex
func foo() {
l.Lock()
defer l.Unlock()
//...
}
2 全局唯一性操作 once.Do()
原子性操作一个函数
如果没有once.Do()
,
我们很可能只能添加一个全局的bool变量,在函数setup()的最后
一行将该bool变量设置为true。在对setup()的所有调用之前,需要先判断该bool变量是否已
经被设置为true,如果该值仍然是false,则调用一次setup(),否则应跳过该语句。
但是这样没有保障原子性操作
var a string
var once sync.Once
func setup() {
a = "hello, world"
}
func doprint() {
once.Do(setup)
print(a)
}
func twoprint() {
go doprint()
go doprint()
}
4 实例
涉及到的内容
goroutine生命周期管理
goroutine之间的通信
共享资源访问控制
项目详细需求:
登录游戏
查看房间列表
创建房间
加入房间
进行游戏
房间内聊天
游戏完成,退出房间
退出登录
系统设计:
玩家会话管理系统,用于管理每一位登录的玩家,包括玩家信息和玩家状态
大厅管理
房间管理系统,创建、管理和销毁每一个房间
游戏会话管理系统,管理房间内的所有动作,包括游戏进程和房间内聊天
聊天管理系统,用于接收管理员的广播信息
项目结构:
<cgss>
├─<src>
├─<cg>
├─center.go
├─centerclient.go
├─player.go
├─<ipc>
├─server.go
├─client.go
├─ipc_test.go
├─cgss.go
1 IPC 框架(进程间通信)
server
package ipc
import (
"encoding/json"
"fmt"
)
type Request struct {
Method string "method"
Params string "params"
}
type Response struct {
Code string "code"
Body string "body"
}
type Server interface {
Name() string
Handle(method, params string) *Response
}
type IpcServer struct {
Server
}
func NewIpcServer(server Server) *IpcServer{
return &IpcServer{server}
}
func (server *IpcServer)Connect() chan string {
session := make(chan string, 0)
go func(c chan string) {
for {
request := <-c
if request == "CLOSE"{
break
}
var req Request
err := json.Unmarshal([]byte(request), &req)
if err != nil {
fmt.Println("Invalid request format:", request)
}
resp := server.Handle(req.Method, req.Params)
b, err := json.Marshal(resp)
c <- string(b)
}
fmt.Println("session closed")
}(session)
fmt.Println("A new session has been created successfully.")
return session
}
client
package ipc
import "encoding/json"
type IpcClient struct {
conn chan string
}
func NewIpcClient(server *IpcServer) *IpcClient {
c := server.Connect()
return &IpcClient{c}
}
func (client *IpcClient) Call (method, params string)(resp *Response, err error){
req := &Request{Method: method, Params: params}
var b []byte
b, err = json.Marshal(req)
if err != nil {
return
}
client.conn <- string(b)
str := <-client.conn
var resp1 Response
err = json.Unmarshal([]byte(str), &resp1)
resp = &resp1
return
}
func (client *IpcClient) Close(){
client.conn <- "CLOSE"
}
中央服务器 centerclient
package cg
import (
"cgss/src/ipc"
"errors"
"encoding/json"
)
type CenterClient struct {
*ipc.IpcClient
}
func (client *CenterClient)AddPlayer(player *Player) error {
b, err := json.Marshal(*player)
if err != nil {
return err
}
resp, err := client.Call("addplayer", string(b))
if err == nil&& resp.Code == "200" {
return nil
}
return err
}
func (client *CenterClient)RemovePlayer(name string) error {
ret, _ := client.Call("removeplayer", name)
if ret.Code == "200" {
return nil
}
return errors.New(ret.Code)
}
func (client *CenterClient)ListPlayer(params string)(ps []*Player, err error) {
resp, _ := client.Call("listplayer", params)
if resp.Code != "200" {
err = errors.New(resp.Code)
return
}
err = json.Unmarshal([]byte(resp.Body), &ps)
return
}
func (client *CenterClient)Broadcast(message string) error {
m := &Message{Content: message} // 构造Message结构体
b, err := json.Marshal(m)
if err != nil {
return err
}
resp, _ := client.Call("broadcast", string(b))
if resp.Code == "200" {
return nil
}
return errors.New(resp.Code)
}
中央服务器 center
package cg
import (
"cgss/src/ipc"
"encoding/json"
"errors"
"sync"
)
var _ ipc.Server = &CenterServer{} // 确认实现了Server接口
type Message struct {
From string
To string
Content string
}
type CenterServer struct {
servers map[string] ipc.Server
players []*Player
rooms []*Room
mutex sync.RWMutex
}
func NewCenterServer() *CenterServer {
servers := make(map[string] ipc.Server)
players := make([]*Player, 0)
return &CenterServer{servers:servers, players:players}
}
func (server *CenterServer)addPlayer(params string) error {
player := NewPlayer()
err := json.Unmarshal([]byte(params), &player)
if err != nil {
return err
}
server.mutex.Lock()
defer server.mutex.Unlock()
// 偷懒了,没做重复登录检查
server.players = append(server.players, player)
return nil
}
func (server *CenterServer)removePlayer(params string) error {
server.mutex.Lock()
defer server.mutex.Unlock()
for i, v := range server.players {
if v.Name == params {
if len(server.players) == 1 {
server.players = make([]*Player, 0)
}else if i == len(server.players) - 1 {
server.players = server.players[:i - 1]
}else if i == 0 {
server.players = server.players[1:]
} else {
server.players = append(server.players[:i - 1], server.players[:i +
1]...)
}
return nil
}
}
return errors.New("Player not found.")
}
func (server *CenterServer)listPlayer(params string)(players string, err error) {
server.mutex.RLock()
defer server.mutex.RUnlock()
if len(server.players) > 0 {
b, _ := json.Marshal(server.players)
players = string(b)
} else {
err = errors.New("No player online.")
}
return
}
func (server *CenterServer)broadcast(params string) error {
var message Message
err := json.Unmarshal([]byte(params), &message)
if err != nil {
return err
}
server.mutex.Lock()
defer server.mutex.Unlock()
if len(server.players) > 0 {
for _, player := range server.players {
player.mq <- &message
}
} else {
err = errors.New("No player online.")
}
return err
}
func (server *CenterServer)Handle(method, params string) *ipc.Response {
switch method {
case "addplayer":
err := server.addPlayer(params)
if err != nil {
return &ipc.Response{Code:err.Error()}
}
case "removeplayer":
err := server.removePlayer(params)
if err != nil {
return &ipc.Response{Code:err.Error()}
}
case "listplayer":
players, err := server.listPlayer(params)
if err != nil {
return &ipc.Response{Code:err.Error()}
}
return &ipc.Response{"200", players}
case "broadcast":
err := server.broadcast(params)
if err != nil {
return &ipc.Response{Code:err.Error()}
}
return &ipc.Response{Code:"200"}
default:
return &ipc.Response{Code:"404", Body:method + ":" + params}
}
return &ipc.Response{Code:"200"}
}
func (server *CenterServer)Name() string {
return "CenterServer"
}
中央服务器 player
package cg
import "fmt"
type Player struct {
Name string
Level int
Exp int
Room int
mq chan *Message
}
type Room struct {
Name string
}
func NewPlayer() *Player {
m := make(chan *Message, 1024)
player := &Player{"", 0, 0, 0, m}
go func(p *Player) {
for {
msg := <-p.mq
fmt.Println(p.Name, "received message:", msg.Content)
}
}(player)
return player
}
main 入口
package main
import (
"bufio"
"cgss/src/cg"
"cgss/src/ipc"
"fmt"
"os"
"strconv"
"strings"
)
var centerClient *cg.CenterClient
func startCenterService() error {
server := ipc.NewIpcServer(&cg.CenterServer{})
client := ipc.NewIpcClient(server)
centerClient = &cg.CenterClient{client}
return nil
}
func Help(args []string) int {
fmt.Println(`
Commands:
login <username><level><exp>
logout <username>
send <message>
listplayer
quit(q)
help(h)
`)
return 0
}
func Quit(args []string) int {
return 1
}
func Logout(args []string) int {
if len(args) != 2 {
fmt.Println("USAGE: logout <username>")
return 0
}
centerClient.RemovePlayer(args[1])
return 0
}
func Login(args []string) int {
if len(args) != 4 {
fmt.Println("USAGE: login <username><level><exp>")
return 0
}
level, err := strconv.Atoi(args[2])
if err != nil {
fmt.Println("Invalid Parameter: <level> should be an integer.")
return 0
}
exp, err := strconv.Atoi(args[3])
if err != nil {
fmt.Println("Invalid Parameter: <exp> should be an integer.")
return 0
}
player := cg.NewPlayer()
player.Name = args[1]
player.Level = level
player.Exp = exp
err = centerClient.AddPlayer(player)
if err != nil {
fmt.Println("Failed adding player", err)
}
return 0
}
func ListPlayer(args []string) int {
ps, err := centerClient.ListPlayer("")
if err != nil {
fmt.Println("Failed. ", err)
} else {
for i, v := range ps {
fmt.Println(i + 1, ":", v)
}
}
return 0
}
func Send(args []string) int {
message := strings.Join(args[1:], " ")
err := centerClient.Broadcast(message)
if err != nil {
fmt.Println("Failed.", err)
}
return 0
}
// 将命令和处理函数对应
func GetCommandHandlers() map[string]func(args []string) int {
return map[string]func([]string) int {
"help" : Help,
"h" : Help,
"quit" : Quit,
"q" : Quit,
"login" : Login,
"logout" : Logout,
"listplayer" : ListPlayer,
"send" : Send,
}
}
func main() {
fmt.Println("Casual Game Server Solution")
startCenterService()
Help(nil)
r := bufio.NewReader(os.Stdin)
handlers := GetCommandHandlers()
for { // 循环读取用户输入
fmt.Print("Command> ")
b, _, _ := r.ReadLine()
line := string(b)
tokens := strings.Split(line, " ")
if handler, ok := handlers[tokens[0]]; ok {
ret := handler(tokens)
if ret != 0 {
break
}
} else {
fmt.Println("Unknown command:", tokens[0])
}
}
}