Go语言编程笔记--并发编程

  •  并发能更客观地表现问题模型;
  •  并发可以充分利用CPU核心的优势,提高程序的执行效率;
  •  并发能充分利用CPU与其他硬件设备固有的异步性。

1 传统, 主流的 并发实现模型

 多进程。

多进程是在操作系统层面进行并发的基本模式。同时也是开销最大的模式。

在Linux平台上,很多工具链正是采用这种模式在工作。比如某个Web服务器,它会有专门 的进程负责网络端口的监听和链接管理,还会有专门的进程负责事务和运算。这种方法 的好处在于简单、进程间互不影响,坏处在于系统开销大,因为所有的进程都是由内核 管理的。

 多线程。

多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多的 最有效的一种模式。

目前,我们所见的几乎所有工具链都会使用这种模式。它比多进程 的开销小很多但是其开销依旧比较大,且在高并发模式下,效率会有影响

 基于回调的非阻塞/异步IO。

这种架构的诞生实际上来源于多线程模式的危机。在很多 高并发服务器开发实践中,使用多线程模式会很快耗尽服务器的内存和CPU资源

而这种模式通过事件驱动的方式使用异步IO使服务器持续运转,且尽可能地少用线程降 低开销

它目前在Node.js中得到了很好的实践

但是使用这种模式,编程比多线程要复杂,因为它把流程做了分割,对于问题本身的反应不够自然。

 协程。

协程(Coroutine)本质上是一种用户态线程不需要操作系统来进行抢占式调度, 且在真正的实现中寄存于线程中,因此,系统开销极小,可以有效提高线程的任务并发 性,而避免多线程的缺点。

使用协程的优点是编程简单结构清晰;缺点是需要语言的 支持,如果不支持,则需要用户在程序中自行实现调度器。目前,原生支持协程的语言还很少。

传统并发模型的缺陷

串行的事务具有确定性

线程类并发模式在原先的确定性中引入了不确定性,这种不确定性给程序的行为带来了意外和危害

线程之间通信只能采用共享内存的方式

1 共享内存系统

保证共享内存的有效性
   
我们采取了很多措施,比如加锁等

2 消息传递系统

对线程间共享状态的各种操作都被封装在线程之间传递的消息中 

发送消息时对状态进行复制,并且在消息传递的边界上交出这个状态的所有权

由于需要执行复制操作,所以大多数消息传递的实现在性能上并不优越,但线程中的状态管理工作通常会变得更
为简单。

执行体的概念

操作系统层面有多个概念与之对应,比如操作系统自己掌管的 进程(process)进程内的线程(thread)以及进程内的协程(coroutine,也叫轻量级线程)

与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不 会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。

多数语言在语法层面并不直接支持协程, 而是通过库的方式支持,但用库的方式支持的功能 也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。

如果在这样的轻量级线程中调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程, 从而无法真正达到轻量级线程本身期望达到的目标

goroutine

goroutine是Go语言中的轻量级线程实现,由Go运行时(runtime)管理。

2 goroutine 之间的并发通信

并发编程的难度在于协调,而协调就要通过交流

在工程上,有两种最常见的并发通信模型

`共享数据`

`消息`

1 共享数据 即共享内存 (加锁)

是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享

被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。

在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存。
package main

import "fmt"
import "sync"
import "runtime"

var counter int = 0

func Count(lock *sync.Mutex) {
     lock.Lock()
     counter++
     fmt.Println(z)
     lock.Unlock()
}
func main() {
     lock := &sync.Mutex{}
     
     for i := 0; i < 10; i++ {
         go Count(lock)
     }
     
    for {
         lock.Lock()
         c := counter
         lock.Unlock()
         runtime.Gosched()
         if c >= 10 {
             break
         }
     }
} 


我们在10个goroutine中共享了变量counter。每个goroutine执行完成后,
将counter的值加1。因为10个goroutine是并发执行的,所以我们还引入了锁,也就是代码中的
lock变量。每次对n的操作,都要先将锁锁住,操作完成后,再将锁打开。在主函数中,使用for
循环来不断检查counter的值(同样需要加锁)。当其值达到10时,说明所有goroutine都执行完
毕了,这时主函数返回,程序退出。


使用共享内存数据的方式, 代码很复杂臃肿

2 通信模型 (消息机制)channel而非共享内存作为通信方式

不要通过共享内存来通信,而应该通过通信来共享内存

package main

import "fmt"

func Count(ch chan int) {
     ch <- 1
     fmt.Println("Counting")
}
func main() {
     chs := make([]chan int, 10)
     for i := 0; i < 10; i++ {
         chs[i] = make(chan int)
         go Count(chs[i])
        }
     for _, ch := range(chs) {
         <-ch
     }
     time.Sleep(5 * time.Second)
} 

3 channel的用法

var chanName chan ElementType

var ch chan int 


var m map[string] chan bool 

初始化

ch := make(chan int) 

数据写入
    ch <- value 
    

数据读取
    value := <-ch 

channel写入数据通常会导致程序阻塞直到有其他goroutine从这个channel中读取数据

如果channel之前没有写入数据,那么从channel中读取数据也会导致程序阻塞,直到channel中被写入数据为止

4 select 机制

Unix时代select机制就已经被引入。

通过调用select()函数监控一系列的文件句柄,一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回

后来该机制也被用于实现高并发的Socket服务器程序

Go语言直接在语言级别支持select关键字,用于处理异步IO问题

select语句有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作

select {
 case <- chan1:
 // 如果chan1成功读到数据,则进行该case处理语句
 case chan2 <- 1:
 // 如果成功向chan2写入数据,则进行该case处理语句
default:
 // 如果上面都没有成功,则进入default处理流程
} 

    第一个case试图从chan1读取 一个数据并直接忽略读到的数据,
    而第二个case则是试图向chan2中写入一个整型数1,如果这
    两者都没有成功,则到达default语句。

5 带缓存区的channel

给channel带上缓冲, 达到消息队列的效果

c := make(chan int, 1024)
比如上面这个例子就创建了一个大小 为1024的int类型channel,

在调用make()时将缓冲区大小作为第二个参数传入即可

写入

即使没有读取方,写入方也可以一直往channel里写入,在缓冲区被 填完之前都不会阻塞

读取

读取数据可以使用与常规非缓冲channel完全一致的方法,

我们也可以使用range关键来实现更为简便的循环读取:

for i := range c {
 fmt.Println("Received:", i)
} 

6 超时机制

向channel写数据时发现channel已满,或者从channel试图读取数据时发现channel为空

如果不正确处理这些情况,很可能会导致整个goroutine锁死

Go语言没有提供直接的超时处理机制 , 但是可以通过 select 机制实现

select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况

// 首先,我们实现并执行一个匿名的超时等待函数
timeout := make(chan bool, 1)

go func() {
 time.Sleep(1e9) // 等待1秒钟
 timeout <- true
}()

// 然后我们把timeout这个channel利用起来
select {
 case <-ch:
 // 从ch中读取到数据
 case <-timeout:
 // 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
这样使用select机制可以避免永久等待的问题因为程序会在timeout中获取到一个数据
后继续执行,无论对ch的读取是否还处于等待状态,从而达成1秒超时的效果

7 单向的channel (实践最小权限原则)

单向channel只能用于发送或者接收数据

单向channel变量的声明

var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读取int数据


channel是一个原生类型 支持被传递,还支持类型转换
ch4 := make(chan int)
ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel

下单向channel的用法

func Parse(ch <-chan int) {
     for value := range ch {
     fmt.Println("Parsing value", value)
 }
}

8 关闭 channel

close(ch)   

判断是否已经关闭

x, ok := <-ch 

类似于 map的值获取

x, ok := m[key]

只需要看第二个bool返回值即可,如果返回值是false则表示ch已经被关闭。

9 让出时间片

可以在每个goroutine中控制何时主动出让 时间片给其他goroutine,这可以使用runtime包中的Gosched()函数实现

3 同步 Go的资源锁

1 同步锁

sync.Mutex

当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等到这个goroutine释放该Mutex

写锁(调用Lock()方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine 独占。

sync.RWMutex

RWMutex相对友好些,是经典的单写多读模型

在读锁占用的情 况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()方法;

锁的典型使用模式如下:

var l sync.Mutex
func foo() {
 l.Lock()
 defer l.Unlock()
 //...
}

2 全局唯一性操作 once.Do()

原子性操作一个函数

如果没有once.Do(), 我们很可能只能添加一个全局的bool变量,在函数setup()的最后 一行将该bool变量设置为true。在对setup()的所有调用之前,需要先判断该bool变量是否已 经被设置为true,如果该值仍然是false,则调用一次setup(),否则应跳过该语句。

但是这样没有保障原子性操作

var a string
var once sync.Once
func setup() {
 a = "hello, world"
}
func doprint() {
 once.Do(setup)
 print(a)
}
func twoprint() {
 go doprint()
 go doprint()
} 

4 实例

涉及到的内容

 goroutine生命周期管理
 goroutine之间的通信
 共享资源访问控制

项目详细需求:

 登录游戏
 查看房间列表
 创建房间
 加入房间
 进行游戏
 房间内聊天
 游戏完成,退出房间
 退出登录

系统设计:

 玩家会话管理系统,用于管理每一位登录的玩家,包括玩家信息和玩家状态
 大厅管理
 房间管理系统,创建、管理和销毁每一个房间
 游戏会话管理系统,管理房间内的所有动作,包括游戏进程和房间内聊天
 聊天管理系统,用于接收管理员的广播信息

项目结构:

<cgss>
 ├─<src>
     ├─<cg>
         ├─center.go
         ├─centerclient.go
         ├─player.go
     ├─<ipc>
        ├─server.go
        ├─client.go
        ├─ipc_test.go
     ├─cgss.go 

1 IPC 框架(进程间通信)

server


package ipc

import (
	"encoding/json"
	"fmt"
)

type Request struct {
	Method string "method"
	Params string "params"
}

type Response struct {
	Code string "code"
	Body string "body"
}


type Server interface {
	Name() string
	Handle(method, params string) *Response
}

type IpcServer struct {
	Server
}


func NewIpcServer(server Server) *IpcServer{
	return &IpcServer{server}
}


func (server *IpcServer)Connect() chan string {
	session := make(chan string, 0)

	go func(c chan string) {
		for {
			request := <-c
			if request == "CLOSE"{
				break
			}
			var req Request
			err := json.Unmarshal([]byte(request), &req)
			if err != nil {
				fmt.Println("Invalid request format:", request)
			}
			resp := server.Handle(req.Method, req.Params)
			b, err := json.Marshal(resp)
			c <- string(b)

		}

		fmt.Println("session closed")

	}(session)

	fmt.Println("A new session has been created successfully.")
	return session
}



client

package ipc

import "encoding/json"

type IpcClient struct {
	conn chan string
}


func NewIpcClient(server *IpcServer) *IpcClient {
	c := server.Connect()
	return &IpcClient{c}
}


func (client *IpcClient) Call (method, params string)(resp *Response, err error){
	req := &Request{Method: method, Params: params}
	var b []byte
	b, err = json.Marshal(req)

	if err != nil {
		return
	}

	client.conn <- string(b)
	str := <-client.conn

	var resp1 Response

	err = json.Unmarshal([]byte(str), &resp1)
	resp = &resp1

	return

}

func (client *IpcClient) Close(){
	client.conn <- "CLOSE"
}

中央服务器 centerclient


package cg
import (
	"cgss/src/ipc"
	"errors"
	"encoding/json"

)
type CenterClient struct {
	*ipc.IpcClient
}

func (client *CenterClient)AddPlayer(player *Player) error {
	b, err := json.Marshal(*player)
	if err != nil {
		return err
	}
	resp, err := client.Call("addplayer", string(b))
	if err == nil&& resp.Code == "200" {
		return nil
	}
	return err
}
func (client *CenterClient)RemovePlayer(name string) error {
	ret, _ := client.Call("removeplayer", name)
	if ret.Code == "200" {
		return nil
	}
	return errors.New(ret.Code)
}
func (client *CenterClient)ListPlayer(params string)(ps []*Player, err error) {
	resp, _ := client.Call("listplayer", params)
	if resp.Code != "200" {
		err = errors.New(resp.Code)
		return
	}
	err = json.Unmarshal([]byte(resp.Body), &ps)
	return
}
func (client *CenterClient)Broadcast(message string) error {
	m := &Message{Content: message} // 构造Message结构体
	b, err := json.Marshal(m)
	if err != nil {
		return err
	}
	resp, _ := client.Call("broadcast", string(b))
	if resp.Code == "200" {
		return nil
	}
	return errors.New(resp.Code)
}





中央服务器 center

package cg

import (
	"cgss/src/ipc"
	"encoding/json"
	"errors"
	"sync"
)

var _ ipc.Server = &CenterServer{} // 确认实现了Server接口

type Message struct {
	From string
	To string
	Content string
}

type CenterServer struct {
	servers map[string] ipc.Server
	players []*Player
	rooms []*Room
	mutex sync.RWMutex
}
func NewCenterServer() *CenterServer {
	servers := make(map[string] ipc.Server)
	players := make([]*Player, 0)
	return &CenterServer{servers:servers, players:players}
}
func (server *CenterServer)addPlayer(params string) error {
	player := NewPlayer()
	err := json.Unmarshal([]byte(params), &player)
	if err != nil {
		return err
	}
	server.mutex.Lock()
	defer server.mutex.Unlock()
	// 偷懒了,没做重复登录检查
	server.players = append(server.players, player)
	return nil
}


func (server *CenterServer)removePlayer(params string) error {
	server.mutex.Lock()
	defer server.mutex.Unlock()
	for i, v := range server.players {
		if v.Name == params {
			if len(server.players) == 1 {
				server.players = make([]*Player, 0)
			}else if i == len(server.players) - 1 {
				server.players = server.players[:i - 1]
			}else if i == 0 {
				server.players = server.players[1:]
			} else {
				server.players = append(server.players[:i - 1], server.players[:i +
					1]...)
			}
			return nil
		}
	}
	return errors.New("Player not found.")
}

func (server *CenterServer)listPlayer(params string)(players string, err error) {
	server.mutex.RLock()
	defer server.mutex.RUnlock()
	if len(server.players) > 0 {
		b, _ := json.Marshal(server.players)
		players = string(b)
	} else {
		err = errors.New("No player online.")
	}
	return
}

func (server *CenterServer)broadcast(params string) error {
	var message Message
	err := json.Unmarshal([]byte(params), &message)
	if err != nil {
		return err
	}
	server.mutex.Lock()
	defer server.mutex.Unlock()
	if len(server.players) > 0 {
		for _, player := range server.players {
			player.mq <- &message
		}
	} else {
		err = errors.New("No player online.")
	}

	return err
}


func (server *CenterServer)Handle(method, params string) *ipc.Response {
	switch method {
	case "addplayer":
		err := server.addPlayer(params)
		if err != nil {
			return &ipc.Response{Code:err.Error()}
		}
	case "removeplayer":
		err := server.removePlayer(params)
		if err != nil {
			return &ipc.Response{Code:err.Error()}
		}
	case "listplayer":
		players, err := server.listPlayer(params)
		if err != nil {
			return &ipc.Response{Code:err.Error()}
		}
		return &ipc.Response{"200", players}
	case "broadcast":
		err := server.broadcast(params)
		if err != nil {
			return &ipc.Response{Code:err.Error()}
		}
		return &ipc.Response{Code:"200"}
	default:
		return &ipc.Response{Code:"404", Body:method + ":" + params}
	}
	return &ipc.Response{Code:"200"}
}
func (server *CenterServer)Name() string {
	return "CenterServer"
}

中央服务器 player

package cg

import "fmt"

type Player struct {
	Name string
	Level int
	Exp  int
	Room int

	mq  chan *Message
}

type Room struct {
	Name string
}


func NewPlayer() *Player {
	m := make(chan *Message, 1024)
	player := &Player{"", 0, 0, 0, m}
	go func(p *Player) {
		for {
			msg := <-p.mq
			fmt.Println(p.Name, "received message:", msg.Content)
		}
	}(player)
	return player
}

main 入口

package main

import (
	"bufio"
	"cgss/src/cg"
	"cgss/src/ipc"
	"fmt"
	"os"
	"strconv"
	"strings"
)
var centerClient  *cg.CenterClient
func startCenterService() error {
	server := ipc.NewIpcServer(&cg.CenterServer{})
	client := ipc.NewIpcClient(server)
	centerClient = &cg.CenterClient{client}
	return nil
}
func Help(args []string) int {
	fmt.Println(`
 Commands:
 login <username><level><exp>
 logout <username>
 send <message>
 listplayer
 quit(q)
 help(h)
 `)
	return 0
}
func Quit(args []string) int {
	return 1
}

func Logout(args []string) int {
	if len(args) != 2 {
		fmt.Println("USAGE: logout <username>")
		return 0
	}
	centerClient.RemovePlayer(args[1])
	return 0
}

func Login(args []string) int {
	if len(args) != 4 {
		fmt.Println("USAGE: login <username><level><exp>")
		return 0
	}
	level, err := strconv.Atoi(args[2])
	if err != nil {
		fmt.Println("Invalid Parameter: <level> should be an integer.")
		return 0
	}
	exp, err := strconv.Atoi(args[3])
	if err != nil {
		fmt.Println("Invalid Parameter: <exp> should be an integer.")
		return 0
	}
	player := cg.NewPlayer()
	player.Name = args[1]
	player.Level = level
	player.Exp = exp
	err = centerClient.AddPlayer(player)
	if err != nil {
		fmt.Println("Failed adding player", err)
	}
	return 0
}
func ListPlayer(args []string) int {
	ps, err := centerClient.ListPlayer("")
	if err != nil {
		fmt.Println("Failed. ", err)
	} else {
		for i, v := range ps {
			fmt.Println(i + 1, ":", v)
		}
	}
	return 0
}
func Send(args []string) int {
	message := strings.Join(args[1:], " ")
	err := centerClient.Broadcast(message)
	if err != nil {
		fmt.Println("Failed.", err)
	}
	return 0
}
// 将命令和处理函数对应
func GetCommandHandlers() map[string]func(args []string) int {
	return map[string]func([]string) int {
		"help" : Help,
		"h" : Help,
		"quit" : Quit,
		"q" : Quit,
		"login" : Login,
		"logout" : Logout,
		"listplayer" : ListPlayer,
		"send" : Send,
	}
}
func main() {
	fmt.Println("Casual Game Server Solution")
	startCenterService()
	Help(nil)
	r := bufio.NewReader(os.Stdin)
	handlers := GetCommandHandlers()
	for { // 循环读取用户输入
		fmt.Print("Command> ")
		b, _, _ := r.ReadLine()
		line := string(b)

		tokens := strings.Split(line, " ")
		if handler, ok := handlers[tokens[0]]; ok {
			ret := handler(tokens)
			if ret != 0 {
				break
			}
		} else {
			fmt.Println("Unknown command:", tokens[0])
		}
	}
}

Buy me a 肥仔水!